ROS AsyncSpinner的多线程行为

neb*_*ant 4 c++ multithreading callback ros

我试图理解ROS中的AsyncSpinner是如何工作的,因为我可能会有一些误解.你可以在这里找到类似的问题.

如此处所示,其定义提到:

异步微调器:生成几个线程(可配置),它们将并行执行回调,而不会阻塞调用它的线程.启动/停止方法允许控制何时开始处理回调以及何时停止回调.

而官方的文档中这里的AsyncSpinning也说作为一种多线程纺纱.

所以说,我有一个非常简单的例子,发布者和订阅者使用AsyncSpinner来测试多线程行为.

#include "ros/ros.h"
#include "std_msgs/String.h"

int main(int argc, char **argv)
{
  ros::init(argc, argv, "publisher");
  ros::NodeHandle nh;

  ros::Publisher chatter_pub = nh.advertise<std_msgs::String>("chatter", 1000);

  ros::Rate loop_rate(10);
  while (ros::ok())
  {
    std_msgs::String msg;
    msg.data = "hello world";

    chatter_pub.publish(msg);

    ros::spinOnce();

    loop_rate.sleep();
  }

  return 0;
}
Run Code Online (Sandbox Code Playgroud)

以及定义和使用微调器的订户:

#include "ros/ros.h"
#include "std_msgs/String.h"
#include <boost/thread.hpp>

int count = 0;

void chatterCallback(const std_msgs::String::ConstPtr& msg)
{
  count++;
  ROS_INFO("Subscriber %i callback: I heard %s", count, msg->data.c_str());
  sleep(1);
}

int main(int argc, char **argv)
{
  ros::init(argc, argv, "subscriber");
  ros::NodeHandle nh;

  ros::Subscriber sub = nh.subscribe("chatter", 1000, chatterCallback);

  ros::AsyncSpinner spinner(boost::thread::hardware_concurrency());
  ros::Rate r(10);

  spinner.start();
  ros::waitForShutdown();

  return 0;
}
Run Code Online (Sandbox Code Playgroud)

当我运行这两个程序时,我得到以下输出:

[ INFO] [1517215527.481856914]: Subscriber 1 callback: I heard hello world
[ INFO] [1517215528.482005146]: Subscriber 2 callback: I heard hello world
[ INFO] [1517215529.482204798]: Subscriber 3 callback: I heard hello world
Run Code Online (Sandbox Code Playgroud)

正如您所看到的,回调每秒运行一次,并且没有其他回调被并行调用.我知道全局回调队列正在实现,因为如果我停止发布者,订阅者将继续弹出队列中累积的消息.

我知道我不应该阻止一个回调但是在上面的定义中注意到这不会停止它被调用的线程,我猜它们都不是由微调器创建的.我是否因为阻止回调而阻止下一次回调?有什么我误解了吗?我有点困惑,无法证明回调并行运行.也许你有另一个例子?

Fru*_*erg 7

简短回答:

默认情况下,ROS回调是线程安全的.这意味着注册的回调只能由一个线程处理,并且禁用并发调用.第二个线程无法同时访问同一个回调.

如果你注册了第二个回调,你会看到微调器像预期的那样工作,多个线程同时调用你的回调.

ros::Subscriber sub1 = nh.subscribe("chatter", 1000, chatterCallback);
ros::Subscriber sub2 = nh.subscribe("chatter", 1000, chatterCallback);
Run Code Online (Sandbox Code Playgroud)

扩展答案:

异步微调试图调用回调队列快可作为回调的速度允许.如果回调已经在进程中(由另一个线程),CallResult则为TryAgain.这意味着稍后将开始新的尝试.

此锁实现使用变量allow_concurrent_callbacks_,这意味着此行为是可选的.

解:

通过设置正确的SubscribeOptions.allow_concurrent_callbacks可以允许并发调用,默认情况下为false.因此,您需要定义自己的SubscribeOptions.以下是您需要订阅并允许并发回调调用的代码:

ros::SubscribeOptions ops;
ops.template init<std_msgs::String>("chatter", 1000, chatterCallback);
ops.transport_hints = ros::TransportHints();
ops.allow_concurrent_callbacks = true;
ros::Subscriber sub = nh.subscribe(ops);
Run Code Online (Sandbox Code Playgroud)

  • 只是注意:*线程安全*具有略微不同的含义.如果回调不访问共享数据或者使用适当的同步机制来访问此类数据,则回调只是线程安全的.确实,默认情况下,没有两个线程可以同时运行相同的回调. (2认同)