当前位置: 首页 > 工具软件 > ROS > 使用案例 >

ROS单线程与多线程处理

焦光霁
2023-12-01

ROS wiki:http://wiki.ros.org/
ROS API:https://docs.ros.org/en/api/roscpp/html/index.html
ROS单线程与多线程处理:http://wiki.ros.org/roscpp/Overview/Callbacks%20and%20Spinning
在ros应用中可以根据自己的需要使用线程;ros内部在后台使用线程管理网络、任务调度等工作,ros内部使用的线程不会暴露给我们自己的应用(就是说ROS不会让他内部的线程处理我们的业务需求);但ros允许我们自己的定义的回调函数在我们自定义的任意线程中被调用。ros提供了如下几种方法创建线程,并在线程中调用自定义的回调函数;

1. 单线程轮询

单线程轮询是ros提供的最常用也是最简单的一种在单线程中使用回调的方法;通过使用ros::spin()方法,实例代码如下:
使用ros::spin()的方式调用回调函数

// 1. 包含头文件
#include "ros/ros.h"
#include "std_msgs/String.h"

// 回调函数
void doMsg(const std_msgs::String::ConstPtr &msg_p)
{
    ROS_INFO("我听见:%s", msg_p->data.c_str());
}

int main(int argc, char *argv[])
{
    setlocale(LC_ALL, "");
    // 2.初始化 ROS 节点:命名唯一
    ros::init(argc, argv, "listener");
    // 3.创建 ROS 句柄
    ros::NodeHandle nh;
    // 4.创建 订阅者 对象, chatter:主题
    ros::Subscriber sub = nh.subscribe<std_msgs::String>("chatter", 10, doMsg);
    // 6. 设置循环调用回调函数
    ros::spin(); // 循环读取接收的数据,并调用回调函数处理
    return 0;
}

在上面的例子中,所有的用户回调函数都会在ros::spin()内部被调用。ros::spin()将会一直运行直到调用ros::shutdown或者按下Ctrl+C将它所在的节点关闭为止。

自己实现spin()

#include <ros/callback_queue.h>
ros::NodeHandle nh;
while (ros::ok())
{	
	ros::getGlobalCallbackQueue()->callAvailable(ros::WallDuration(0.1));
}

上面代码的意思是:以0.1秒的时间间隔进行轮询,等待回调函数被调用;

单线程中另一种处理回调函数的方式是使用ros::spinOnce():
ros::spinOnce()会在会某一个时间点上调用所有正在等待运行的回调函数;ros::spinOnce()不会一直阻塞等待。

#include <sstream>
#include "ros/ros.h"
#include "std_msgs/String.h"

int main(int argc, char *argv[]) 
{
    // 设置编码
    setlocale(LC_ALL, "");

    // 初始ros节点:命名唯一
    // 参数1和参数2 后期为节点传值会使用
    // 参数3是节点名称,是唯一标识符,需要保证运行后,在ros网络拓补中唯一
    ros::init(argc, argv, "talker");

    // 实例化ROS句柄
    ros::NodeHandle nh; // 该类封装了ROS中的一些常用功能

    // 实例化发布者对象
    // 泛型: 发布的消息类型
    // 参数1: 要发布的话题
    // 参数2: 队列中最大保存的消息数,超出此阈值,先进的先销毁(时间早的先销毁)
    ros::Publisher pub = nh.advertise<std_msgs::String>("chatter", 10);

    // 5.组织被发布的数据,并编写逻辑发布数据
    // 数据(动态组织)
    std_msgs::String msg;
    std::string msg_front = "hello 你好!"; // 消息前缀
    int count = 0;                         // 消息计算器

    // 发布频率(1秒10次)
    ros::Rate rate(10);

    // 节点永久运行
    while (ros::ok) 
    {
        // 使用stringstream拼接字符串编号
        std::stringstream ss;
        ss << msg_front << count;
        msg.data = ss.str();
        // 发布消息
        pub.publish(msg);
        ROS_INFO("发送的消息:%s", msg.data.c_str());
        // 休眠时间 = 1/频率
        rate.sleep();
        // 循环结束前,让count自增
        count++;
        // 官方建议
        ros::spinOnce();
    }

    return 0;
}

编写一个自己的spinOnce()也很简单,只要将ros::WallDuration()中的时间间隔设置为0即可:

#include <ros/callback_queue.h>
ros::getGlobalCallbackQueue()->callAvailable(ros::WallDuration(0));

注意: spin()和spinOnce()是为单线程应用而设计的,在多线程的调用中没有被优化。如果你的应用中需要在多线程中进行轮询,那么请看下面的多线程轮询章节。

2.多线程轮询

ROS提供了2中方式支持多个线程调用回调函数:使用ros::MultiThreadedSpinner和ros::AsyncSpinner;

ros::MultiThreadedSpinner
ros::MultiThreadSpinner是一个阻塞式的spin,类似于ros::spin()。但可以在它的构造函数中指定线程数,如果没有指定(或设置为0),它将为每个CPU核心使用一个线程。

ros::MultiThreadedSpinner spinner(4); // Use 4 threads
spinner.spin(); // spin() will not return until the node has been shutdown

ros::AsyncSpinner
一个更有用的线程微调器是AsyncSpinner。它不是一个阻塞的spin()调用,而是调用start()和stop(),并且会在它被销毁时自动停止。AsyncSpinner与上面MultiThreadedSpinner示例的等效用法是:

ros::AsyncSpinner spinner(4); // Use 4 threads
spinner.start();
ros::waitForShutdown(); // Wait for this node to be shutdown, whether through Ctrl-C, ros::shutdown(), or similar. 

请注意,ros::waitForShutdown()函数不会自己旋转,因此上面的示例总共会旋转4个线程。

3. CallbackQueue::callAvailable()和callOne()

CallbackQueue类有两种方法调用其中的回调函数:callAvailable()和callOne()。callAvailable()将获取队列中当前的所有内容并调用它们。callOne()将简单地调用队列上最早的回调函数。
callAvailable()和callOne()都可以接受一个可选的超时,即它们在返回回调之前等待回调变为可用的时间。如果这个值为零,并且队列中没有回调函数,该方法将立即返回。
通过ROS 0.10时,缺省超时时间为0.1秒。ROS 0.11为缺省值0。

4. 高级用法:使用自定义回调队列

在上面的spin()实现中使用了对ros::getGlobalCallbackQueue()的调用。默认情况下,所有的回调函数都被赋值到全局队列中,然后由ros::spin()或其他方法处理。Roscpp还允许分配自定义的回调队列并分别为它们服务。这可以在以下两种粒度中实现:

  1. 每个subscribe()、advertise()、advertiseService()等等。
  2. 每NodeHandle

(1) 可以使用那些采用*Options结构的调用的高级版本。有关更多信息,请参阅API文档
(2) 比较常见的方式:

ros::NodeHandle nh;
nh.setCallbackQueue(&my_callback_queue);

setCallbackQueue()接口设置NodeHandle使用的默认回调队列。
使用setCallbackQueue设置指定的回调队列后,所有通过该NodeHandle为订阅、发布、服务等注册的回调函数将由该队列触发管理。如果将NULL(默认值)作为setCallbackQueue的参数会导致使用全局队列(由ros::spin()和ros::spinOnce()服务)。

这使得所有订阅、服务、定时器等回调函数都通过my_callback_queue而不是roscpp的默认队列。这意味着ros::spin()和ros::spinOnce()不会调用这些回调函数。相反,必须分别为该队列提供服务。可以使用ros::CallbackQueue::callAvailable()和ros::CallbackQueue::callOne()方法来手动完成:

my_callback_queue.callAvailable(ros::WallDuration());
// 或者,my_callback_queue.callOne(ros::WallDuration())只调用一个回调,而不是所有可用的回调

各种*Spinner对象也可以接受一个指向回调队列的指针,而不是默认值:

ros::AsyncSpinner spinner(0, &my_callback_queue);
spinner.start();

或者

ros::MultiThreadedSpinner spinner(0);
spinner.spin(&my_callback_queue);

5. 总结

将回调分离到不同的队列可能是有用的,原因有很多。一些例子包括:

  • 长时间运行的服务。为一个服务分配它自己的回调队列,并在一个单独的线程中进行服务,这意味着服务保证不会阻塞其他回调。
  • 线程特定的计算开销很高的回调。与长时间运行的服务案例类似,这允许您执行特定的回调,同时为应用程序的其余部分保持单线程回调的简单性。

6. 一个使用自定义回调队列的例子

基于一款使用ros的底盘进行二次开发,驱动部分通过ros节点对外提供底盘上Actuator、Lidar2D、Imu、Sonar等传感器的数据;需求是通过订阅底盘提供的ros节点上的各个传感器msg将数据收到后按自己的接口重新包装成自己的driver,实现代码如下:

#include "geometry_msgs/Twist.h"
#include "nav_msgs/Odometry.h"
#include "ros/callback_queue.h"
#include "ros/ros.h"
#include "sensor_msgs/Imu.h"
#include "sensor_msgs/LaserScan.h"
#include "std_msgs/UInt16MultiArray.h"
#include <thread>

class RobotDevice
{
public:
    RobotDevice()
    {
        pub_actuator_ = nh_.advertise<geometry_msgs::Twist>(rostopic_set_actuator_.c_str(), 1);
        thread_ = new std::thread(&RobotDevice::Run, this);
    }

    ~RobotDevice()
    {
        ros::shutdown();
        if (thread_)
        {
            thread_->join();
            delete thread_;
            thread_ = nullptr;
        }
    }

    void PubCmdVelManual(const float data[8])
    {
        memset(&vel_msg_, 0, sizeof(vel_msg_));
        vel_msg_.linear.x = data[0];
        vel_msg_.angular.z = data[1];
        DBUG("SetCtrl linear.x %0.3f, angualr.z %0.3f", data[0], data[1]);
        pub_actuator_.publish(vel_msg_);
    }

private:
    geometry_msgs::Twist vel_msg_;
    ros::NodeHandle nh_;
    ros::Subscriber sub_imu_;
    ros::Subscriber sub_laser_;
    ros::Subscriber sub_actuator_;
    ros::Subscriber sub_sonar_;
    ros::Publisher pub_actuator_;
    ros::CallbackQueue queue_;
    std::thread *thread_ = nullptr;
    const uint32_t ranger_finder_num_ = 7;
    const std::string rostopic_get_imu_ = "/imu";
    const std::string rostopic_get_sonar_ = "/sonar";
    const std::string rostopic_get_laser_ = "/scan_head";
    const std::string rostopic_get_actuator_ = "/odom";
    const std::string rostopic_set_actuator_ = "/cmd_vel_manual";

    void Run()
    {
        nh_.setCallbackQueue(&queue_);
        sub_imu_ = nh_.subscribe(rostopic_get_imu_.c_str(), 10, &RobotDevice::ImuSubCallback, this);
        sub_actuator_ = nh_.subscribe(rostopic_get_actuator_.c_str(), 10, &RobotDevice::ActuatorSubCallback, this);
        sub_laser_ = nh_.subscribe(rostopic_get_laser_.c_str(), 10, &RobotDevice::LaserSubCallback, this);
        sub_sonar_ = nh_.subscribe(rostopic_get_sonar_.c_str(), 10, &RobotDevice::SonarSubCallback, this);
        ros::AsyncSpinner spinner(0, &queue_);
        spinner.start();
        ros::waitForShutdown();
    }

    void ImuSubCallback(const sensor_msgs::Imu::ConstPtr &msg)
    {
        // DBUG("%s %d", __FUNCTION__, __LINE__);
    }

    void ActuatorSubCallback(const nav_msgs::Odometry::ConstPtr &msg)
    {
        // DBUG("%s %d", __FUNCTION__, __LINE__);
    }

    void LaserSubCallback(const sensor_msgs::LaserScan::ConstPtr &msg)
    {
        // DBUG("%s %d", __FUNCTION__, __LINE__);
    }

    void SonarSubCallback(const std_msgs::UInt16MultiArray::ConstPtr &msg)
    {
        // DBUG("%s %d", __FUNCTION__, __LINE__);
    }
};

int main(int argc, char *argv[])
{
	ros::init(argc, argv, "robot_device");
	RobotDevice device;
	ros::Rate rate(10);
	while (ros::ok())
	{
		rate.sleep();
	}
	return 0;
}
 类似资料: