本篇博客主要介绍单节点的多Topic同时收发。
0.前言
之前说过Topic是一种多对多的关系:可以同时有多个Node向同一个Topic发送消息,它同时也可以被多个Node订阅。但这是站在Topic角度而言的“多对多”,如果我们换一个角度,在Node角度,是否也可以实现多对多呢?一个Node是否可以同时接收、发送多个Topic?答案是肯定的。
本篇博客主要来探讨“进阶版”的Topic用法——一个节点(Node
)的多Topic收发在C++及Python中的实现。其实对于这种节点同时收发Topic的需求是非常常见的,如一个Node从一个或多个Topic接收了传感器发来的数据,并对数据进行了处理(如多传感器数据融合),并将处理好的数据发送到另一个Topic上。在整个Node/Topic的关系图中,除非这个节点是起始点或终点,只需要考虑发送或者接收一个或多个Topic,其余其它所有节点都必须要面临同时接收发送的问题。例如如下Node/Topic图(rqt_graph
绘图,圆形表示Node,方形表示Topic)中所示:
节点/mobild_base_nodelet_manager
接收了/mobile_base_nodelet_manager/bond
,同时又发送/mobile_base_nodelet_manager/bond
和/cmd_vel
两个Topic。对于/gazebo
节点而言,同时接收/cmd_vel
和发送/xbot/joint_states
Topic。
所以这在实际应用中是非常现实的需求,学好这个非常重要。但在ROS官网以及相关书籍、博客上找了一下,介绍多Topic收发的相对较少,大部分还是停留在一个节点只做一件事(收或发)的水平,和之前这篇博客一样。所以本篇博客希望可以更加深入学习Topic。
1.Topic查看
在之前也说过如何查看Topic,这里再说一下,ROS中有rostopic
命令,介绍如下:
rostopic is a command-line tool for printing information about ROS Topics.
Commands:
rostopic bw display bandwidth used by topic
rostopic delay display delay of topic from timestamp in header
rostopic echo print messages to screen
rostopic find find topics by type
rostopic hz display publishing rate of topic
rostopic info print information about active topic
rostopic list list active topics
rostopic pub publish data to topic
rostopic type print topic or field type
Type rostopic <command> -h for more detailed usage, e.g. 'rostopic echo -h'
此外如果想要绘制出开头的Node/Topic图可以使用rqt
工具,终端中输入rqt_graph
即可打开。有一些选项可以控制图的显示内容,绘制的图像也可以保存成多种类型。
2.深入讨论std_msgs的类型
在之前这篇博客里也有介绍过ROS中的一些基本数据类型,这里介绍的是ROS中标准消息std_msgs
中的消息类型。一定要注意区分std_msgs
里的String
、Float
等消息类型与C++里或ROS里的基本类型不是一回事,这里介绍的是一个Message
类!所以当新建了一个Message类以后,不能直接对这个Message对象赋值,而是应该对它的成员变量data
赋值,切记。std_msgs官方文档可以看这里。这其中有个相对复杂的MultiArray
类型,它的使用可以看这个网页。
3.Topic多发
Topic的多发其实是非常简单的,之前建立了一个Publisher
对象,那么现在建立多个对象发布就可以了。例如一个节点向自定义的topic_str
发布字符串,向自定义的topic_flt
发布浮点数据,向自定义的topic_arr
发布数组。由于没有特殊的需求与功能,直接依赖roscpp
和std_msgs
就可以了。由于不是基础笔记,所以创建包、添加依赖这种步骤就不说了,如果还不会看这个博客。
另外提醒一下的是下面代码有用到了C++11的语法,ROS默认是C++98,要支持11,需要在CMakeLists.txt
文件里说明,把add_compile_options(-std=c++11)
这一行取消注释即可。下面直接看代码:
# include<ros/ros.h>
# include<std_msgs/String.h>
# include<std_msgs/Float32.h>
# include<std_msgs/Float32MultiArray.h>
int main(int argc, char *argv[])
{
ros::init(argc, argv, "multi_publish");
ros::NodeHandle nh;
ros::Publisher pub1 = nh.advertise<std_msgs::String>("topic_str",1);
ros::Publisher pub2 = nh.advertise<std_msgs::Float32>("topic_flt",1);
ros::Publisher pub3 = nh.advertise<std_msgs::Float32MultiArray>("topic_arr",1);
// 一定要注意,这里发布的String、Float等类型的消息本质是Message,而不是C++里的类型
// 所以新建了一个Message的类型后,不能直接对Message赋值,而是应该对它里面的成员变量data赋值!
std_msgs::String str_msg;
str_msg.data = "this is a string from node";
std_msgs::Float32 flt_msg;
flt_msg.data = 2.3;
std_msgs::Float32MultiArray arr_msg;
std::vector<float> data{1.2,6.8,7.4};
arr_msg.data = data;
ros::Rate rate(1);
while (nh.ok())
{
pub1.publish(str_msg);
pub2.publish(flt_msg);
pub3.publish(arr_msg);
std::cout<<"published str:"<<str_msg.data<<std::endl;
std::cout<<"published flt:"<<flt_msg.data<<std::endl;
std::cout<<"published arr:";
for (size_t i = 0; i < arr_msg.data.size(); i++)
{
std::cout<<arr_msg.data[i];
}
std::cout<<std::endl;
rate.sleep();
}
// 注意spin和spinOnce的区别
// spin让ros一直调用回调函数,ros::spin以下的代码无效
// spinOnce让ros调用回调函数的同时还可以继续执行以下的代码
// 由于这里后面没有代码了,所以两个没有区别
ros::spinOnce();
return 0;
}
4.Topic多收
多收其实也很简单,与多发类似,同时构造多个Subscriber
就可以了。直接上代码如下:
# include<ros/ros.h>
# include<std_msgs/String.h>
# include<std_msgs/Float32.h>
# include<std_msgs/Float32MultiArray.h>
void callback1(const std_msgs::StringConstPtr& str){
std::cout<<"received str:"<<str->data<<std::endl;
}
void callback2(const std_msgs::Float32ConstPtr& flt){
std::cout<<"received float:"<<flt->data<<std::endl;
}
void callback3(const std_msgs::Float32MultiArrayConstPtr& arr){
std::cout<<"received array:";
for (size_t i = 0; i < arr->data.size(); i++)
{
std::cout<<arr->data[i];
}
std::cout<<std::endl;
}
int main(int argc, char *argv[])
{
ros::init(argc, argv, "multi_subscribe");
ros::NodeHandle nh;
ros::Subscriber sub1 = nh.subscribe("topic_str",1,callback1);
ros::Subscriber sub2 = nh.subscribe("topic_flt",1,callback2);
ros::Subscriber sub3 = nh.subscribe("topic_arr",1,callback3);
ros::spin();
return 0;
}
5.Topic同时收发
相比于单纯的Topic多收或多发,同时收发会复杂一些。首先,根据前面知识知道Topic接收是通过NodeHandle
的成员函数subscribe()
和自定义的回调函数实现的,同时回调函数有严格的定义规定:参数只能有一个且必须以const
修饰、参数类型为xxxConstPtr
、参数为引用传递、函数没有返回值。这就意味着单纯的回调函数几乎无法同外界做任何直接的数据交换,数据只能在它内部处理,除了保存到文件以外,其它没有办法输出数据。
解决这个问题的核心就是数据或变量在不同函数之间的共享问题。在C++中对于这种情况有两种办法:一种是采用全局变量,二是类。
第一种办法通过将某个变量定义为全局,就可以实现该变量在多个函数之间的共享,这样就可以在回调函数中使用或修改它,当这个变量再被其它函数调用时,就已经是修改过的数据了,满足了数据共享的要求。
这里再简单介绍一下动态全局变量和全局静态变量的异同。简单来说,定义在所有函数外面(包括main函数)的变量称为全局变量,定义之后所有函数都可以使用,属于静态存储。不仅如此,不同文件间也可以调用,在外部文件中可以通过extern
关键字(对,你没有看错,就是很常见的无法解析的外部符号错误)来声明另一个文件中存在的全局变量,进而调用。而如果在全局变量前面加了static
关键字,就变成了静态全局变量。它的特点是作用于仅限于当前文件,无法在外部文件中调用,相当于限制了动态全局变量的作用域。如果对变量作用域感兴趣可以看这个网页。
第二种办法相比于第一种,以另一种和之前不同的编程思路去解决问题。将数据和回调函数以及其它需要调用数据的函数都写成类的成员,这样也可以做到回调函数修改数据后,类的其它成员函数再次调用时数据已经更新。
在解决了数据共享之后,同时收发这个问题就变得简单了:将publish()
函数放到Subscriber
的回调函数里,这样当节点接收到了数据,首先会调用回调函数,在其中对数据进行处理后,在回调函数中调用publish()
函数,就可以直接将数据发送走了。实现同时收发的关键就在于将publish函数放在回调函数里。
第一种全局变量方法具体代码如下:
# include<ros/ros.h>
# include<std_msgs/String.h>
# include<std_msgs/Float32.h>
// 定义为全局变量
static ros::Subscriber sub1;
static ros::Subscriber sub2;
static ros::Publisher pub1;
static ros::Publisher pub2;
// 回调函数1
void callback1(const std_msgs::Float32ConstPtr& flt){
std_msgs::Float32 pub_flt;
pub_flt.data = flt->data+0.4;
pub1.publish(pub_flt);
std::cout<<"receive flt:"<<flt->data<<std::endl;
std::cout<<"publish flt:"<<pub_flt.data<<std::endl;
};
// 回调函数2
void callback2(const std_msgs::StringConstPtr& str){
std_msgs::String str_msg;
str_msg.data = str->data+" hahaha";
pub2.publish(str_msg);
std::cout<<"receive str:"<<str->data<<std::endl;
std::cout<<"publish str:"<<str_msg.data<<std::endl;
}
int main(int argc, char *argv[])
{
// 初始化ROS并指定节点名称
ros::init(argc, argv, "subscribe_publish2");
// 创建节点句柄
ros::NodeHandle nh;
// 利用节点句柄对sub和pub初始化
sub1 = nh.subscribe("topic_flt",1,callback1);
sub2 = nh.subscribe("topic_str",1,callback2);
pub1 = nh.advertise<std_msgs::Float32>("processed_flt", 1);
pub2 = nh.advertise<std_msgs::String>("processed_str",1);
// 循环执行
ros::spin();
return 0;
}
第二种类方法具体代码如下:
# include<ros/ros.h>
# include<std_msgs/String.h>
# include<std_msgs/Float32.h>
class Processer
{
// 为了保持C++类数据封装的特性,将成员变量写成私有
// 当然如果是public也是完全没问题的,只是有点破坏数据的封装性
private:
ros::NodeHandle nh;
ros::Subscriber sub1;
ros::Subscriber sub2;
ros::Publisher pub1;
ros::Publisher pub2;
public:
// 构造函数中对sub和pub进行初始化
Processer(){
// 注意订阅函数的写法,第三个参数要写成&+类名+callback形式,以及最后增加this
sub1 = nh.subscribe("topic_flt",1,&Processer::callback1,this);
sub2 = nh.subscribe("topic_str",1,&Processer::callback2,this);
pub1 = nh.advertise<std_msgs::Float32>("processed_flt", 1);
pub2 = nh.advertise<std_msgs::String>("processed_str",1);
};
// 回调函数1,作为类的成员函数
void callback1(const std_msgs::Float32ConstPtr& flt){
std_msgs::Float32 pub_flt;
pub_flt.data = flt->data+0.4;
pub1.publish(pub_flt);
std::cout<<"receive flt:"<<flt->data<<std::endl;
std::cout<<"publish flt:"<<pub_flt.data<<std::endl;
};
// 回调函数2,作为类的成员函数
void callback2(const std_msgs::StringConstPtr& str){
std_msgs::String str_msg;
str_msg.data = str->data+" hahaha";
pub2.publish(str_msg);
std::cout<<"receive str:"<<str->data<<std::endl;
std::cout<<"publish str:"<<str_msg.data<<std::endl;
}
};
int main(int argc, char *argv[])
{
// 初始化ROS并指定节点名称
ros::init(argc, argv, "subscribe_publish1");
// // 新建一个处理类对象,程序会自动调用构造函数初始化sub、pub
Processer p;
// 循环执行
ros::spin();
return 0;
}
运行发送与接收节点,利用rqt
画图如下:
在这里multi_publish
节点同时发布了topic_arr
、topic_str
、topic_flt
三个Topic,同时subscribe_publish2
节点同时接收了topic_str
、topic_flt
两个Topic,并经过处理,将处理后的数据又发送到processed_flt
和processed_str
两个Topic上,实现了一个节点的多个Topic同时收发。运行终端如下:
以上所有功能也用Python实现了一遍,代码这里就不贴了,基本大同小异,需要注意的地方都加了注释,全部代码放到了Github,点击查看。
6.参考资料
- [1]https://answers.ros.org/question/59725/publishing-to-a-topic-via-subscriber-callback-function/
- [2]http://wiki.ros.org/cv_bridge/Tutorials/UsingCvBridgeToConvertBetweenROSImagesAndOpenCVImages
- [3]http://wiki.ros.org/cv_bridge/Tutorials/ConvertingBetweenROSImagesAndOpenCVImagesPython
本文作者原创,未经许可不得转载,谢谢配合