当前位置: 首页 > 工具软件 > go-cyber > 使用案例 >

apollo源码解读4:/cyber/task 模块

商璞
2023-12-01

在这里就不贴源代码,太占空间了,源码连接:https://github.com/ApolloAuto/apollo/tree/master/cyber/task

cyber下面的task包是使用cyber协程的入口包,共有5个文件如下:
BUILD文件:构建文件
task_manager.h文件:任务管理头文件
task_manager.cc文件:任务管理主文件
task_test.cc文件:任务管理测试文件

task.h文件对task_manager封装使用的文件
构建文件和测试文件不再赘述,和其他包类似,比较容易理解,先来看看task.h文文件,该文件主要写了在apollo::cyber命名空间下的4个全局函数
Async函数:使用cyber协程的函数,参数要传入函数名字以及函数的实参,返回的是std::future类型。主体内容:判断当前是否是实时模式,如果是则将任务加入到taskmanager任务队列里面等待执行,如果否则使用std::async创建该任务(系统底层会新建线程)
Yield函数:判断是否存在协程,如果存在在放弃当前协程执行时间片,否则放弃当前线程执行时间片
SleepFor函数:执行休眠时间,参数类型std::chrono::duration,会判断是否存在协程,存在则协程休眠,否则线程休眠,实测的时候感觉休眠时间并不准确,原因待研究
USleep函数:同Sleep函数,参数类型useconds_t即微秒,执行体上看两者应该是完全一样的,只不过参数不同,执行效果亦不准确
直接使用linux自带的usleep函数和使用上述两个函数有什么差别呢?
使用linux自带的休眠函数,协程会一直固定在某个线程上面执行,并不会在cyber的线程池上切换。使用SleepFor或者USleep函数休眠之后唤醒可能在线程池里面的其他线程执行,那么如果部执行休眠呢?答案是也会固定在某个线程上面,这样来看,线程的切换执行协程原因是cyber检测到了协程的阻塞,也让整个协程休眠了,执行系统的休眠函数由于没有做hook之类的操作cyber检测不到休眠操作,在cyber看来该协程一直在运行,所以没有切换线程去执行协程。

task_manager.h文件和task_manager.cc文件组成的TaskManager类对协程任务进行添加和执行的封装体
该类使用了DECLARE_SINGLETON宏进行单例化,特点不用在头文件里面手写构造函数,禁用了拷贝构造函数和赋值构造函数,提供一个静态的Instance函数返回实例,提供一个静态的CleanUp对该类进行资源清理,需要创建一个Shutdown函数用以清理自定义的成员资源。观测一下TaskManager有哪些成员及含义
num_threads_:实际干活的线程个数,该值从scheduler包获取,用以创建和定义对应的同等数量的tasks_。
task_queue_size_:本类写死的值1000,含义是任务队列的数量,实际等待任务超过这个数量会怎么样?源代码没写这个逻辑,大概猜测是任务超过1000概率很小,甚至不可能,整体来看这个值没必要做成类成员
stop_:服务状态标志,正常工作判断是falsse状态,Shutdown函数会将该值置为true
tasks_:类型为std::vector<uint64_t>,vector里面存应该是协程id,数量等同于线程数量,代码上来看协程数量写死了等同于线程数量(其实感觉这个满难理解的,协程线程之间的关系不应该是M->N之间的关系吗?就像Go的GPM模型那样,协程数量可以成千上万个吗?,此处清晰的看到了协程数量等同于线程数量:根据默认配置文件为16
task_queue_:任务队列,是一个等待执行体的集合,说直白一点,一大堆等待执行的函数,添加任务的时候加入到这个队列里面,之后从队列里面拿出来并执行
构造函数逻辑:初始化task_queue_和task_queue_size_成员,num_threads_从scheduler包里取出来,创建RoutineFactory回调函数为不断的从task_queue_取出执行体进行执行,取不到了就挂起协程。创建等同线程数量的协程,并用tasks_记录所有的协程id,这些id会在Enqueue里面用到
析构函数逻辑:调用Shutdown函数
Shutdown函数:将stop_置为true,清除掉所有协程
Enqueue函数:用std::packaged_task将参里面的函数和参数封装,加入到task_queue_队列里面,然后遍历tasks_,scheduler对每一个task进行NotifyTask。有一说一这个函数理解起来难度挺大的,用packaged_task封装目的是什么?scheduler::Instance()->NotifyTask(task);的意义什么?目前尚不能完全理解,后期理解了再更新

附带一份测试代码main.cc

#include <iostream>

#include "cyber/cyber.h"

int id = 1;
void fun() {
  while (apollo::cyber::OK()) {
    //并发检测代码
    static std::set<decltype(std::this_thread::get_id())> thread_set;
    thread_set.insert(std::this_thread::get_id());
    std::cout << id++ << "  "
              << "thread_id:" << std::this_thread::get_id()
              << "  thread cout:" << thread_set.size();
    static std::set<apollo::cyber::croutine::CRoutine*> croutine_set;
    apollo::cyber::croutine::CRoutine* r =
        apollo::cyber::croutine::CRoutine::GetCurrentRoutine();
    croutine_set.insert(r);
    std::cout << "  croutine_id:" << r->id() << "  croutine name:" << r->name()
              << "  processor_id:" << r->processor_id()
              << "  croutine count: " << croutine_set.size() << std::endl;

    //如果使用系统usleep函数,那么只会有一个线程,使用下面两个cyber提供的睡眠,则是16线程执行任务
    //usleep(1000 * 100);
    //std::this_thread::sleep_for(std::chrono::microseconds{1000*100});

    // cyber这个sleep感觉睡眠时间不正确,参数小于1000*1000的时候大概率会睡眠0.333秒
    apollo::cyber::USleep(1000);

    // SleepFor也是不准确,大概率都会睡眠1秒
    //apollo::cyber::SleepFor(std::chrono::microseconds(1000));
  }
}

int main(int argc, char* argv[]) {
  apollo::cyber::Init(argv[0]);

  // #if 0
  //   //方式一:使用CreateTask自定义协程名字(好像只有在cyber库里面使用,modules模块没人用)
  //   //效果:16个线程随即线程执行task
  //   apollo::cyber::scheduler::Instance()->CreateTask(fun, "test_cyber");
  // #else
  //   //方式二:Async函数创建协程(modules模块大量使用)
  //   //效果:16个线程随即线程执行task,协程名字:/internal/task数字
  //   apollo::cyber::Async(fun);
  // #endif

  for (int a = 1; a <= 10; a++) {
    apollo::cyber::Async(fun);
  }

  apollo::cyber::WaitForShutdown();
  return 0;
}

对应的BUILD文件:

load("@rules_cc//cc:defs.bzl", "cc_binary")
load("//tools/install:install.bzl", "install")

package(default_visibility = ["//visibility:public"])

cc_binary(
    name = "tt",
    includes = [
        ".",
    ],
    linkopts = [
        "-pthread",
    ],
    srcs = glob(
        ["*.cc"],
    ),
    deps = [
        "//cyber",
    ],
)

编译和运行的命令:

bazel run  //modules/tools/tt:tt

 类似资料: