话题通信
话题通信是ROS中使用频率最高的一种通信模式,话题通信是基于发布订阅模式的,也即:一个节点发布消息,另一个节点订阅该消息。话题通信的应用场景也极其广泛,比如下面一个常见场景:
机器人在执行导航功能,使用的传感器是激光雷达,机器人会采集激光雷达感知到的信息并计算,然后生成运动控制信息驱动机器人底盘运动。
在上述场景中,就不止一次使用到了话题通信。
- 以激光雷达信息的采集处理为例,在 ROS 中有一个节点需要时时的发布当前雷达采集到的数据,导航模块中也有节点会订阅并解析雷达数据。
- 再以运动消息的发布为例,导航模块会根据传感器采集的数据时时的计算出运动控制信息并发布给底盘,底盘也可以有一个节点订阅运动信息并最终转换成控制电机的脉冲信号。
以此类推,像雷达、摄像头、GPS.... 等等一些传感器数据的采集,也都是使用了话题通信,换言之,话题通信适用于不断更新的数据传输相关的应用场景。
理论模型
话题通信实现模型是比较复杂的,该模型如下图所示,该模型中涉及到三个角色:
- ROS Master (管理者)
- Talker (发布者)
- Listener (订阅者)
ROS Master 负责保管 Talker 和 Listener 注册的信息,并匹配话题相同的 Talker 与 Listener,帮助 Talker 与 Listener 建立连接,连接建立后,Talker 可以发布消息,且发布的消息会被 Listener 订阅。

整个流程由以下步骤实现:
0.Talker注册
Talker启动后,会通过RPC在 ROS Master 中注册自身信息,其中包含所发布消息的话题名称。ROS Master 会将节点的注册信息加入到注册表中。
1.Listener注册
Listener启动后,也会通过RPC在 ROS Master 中注册自身信息,包含需要订阅消息的话题名。ROS Master 会将节点的注册信息加入到注册表中。
2.ROS Master实现信息匹配
ROS Master 会根据注册表中的信息匹配Talker 和 Listener,并通过 RPC 向 Listener 发送 Talker 的 RPC 地址信息。
3.Listener向Talker发送请求
Listener 根据接收到的 RPC 地址,通过 RPC 向 Talker 发送连接请求,传输订阅的话题名称、消息类型以及通信协议(TCP/UDP)。
4.Talker确认请求
Talker 接收到 Listener 的请求后,也是通过 RPC 向 Listener 确认连接信息,并发送自身的 TCP 地址信息。
5.Listener与Talker件里连接
Listener 根据步骤4 返回的消息使用 TCP 与 Talker 建立网络连接。
6.Talker向Listener发送消息
连接建立后,Talker 开始向 Listener 发布消息。
注意1:上述实现流程中,前五步使用的 RPC协议,最后两步使用的是 TCP 协议
注意2: Talker 与 Listener 的启动无先后顺序要求
注意3: Talker 与 Listener 都可以有多个
注意4: Talker 与 Listener 连接建立后,不再需要 ROS Master。也即,即便关闭ROS Master,Talker 与 Listern 照常通信。
实现流程
在模型实现中,ROS master 不需要实现,而连接的建立也已经被封装了,需要关注的关键点有三个:
- 发布方
- 接收方
- 数据(此处为普通文本)
流程:
- 编写发布方实现;
- 编写订阅方实现;
- 为python文件添加可执行权限;
- 执行
需求:编写发布订阅实现,要求发布方以10HZ(每秒10次)的频率发布文本消息,订阅方订阅消息并将消息内容打印输出。
首先先创建工作空间并初始化:
| mkdir -p test_ros/src
cd test_ros
catkin_make
cd src
code .
|
新建功能包:
| cd src
catkin_create_pkg topic_ros roscpp rospy std_msgs
code .
|
在功能包里新建scripts
目录
发布方
编写发布订阅实现,要求发布方以10HZ(每秒10次)的频率发布文本消息,订阅方订阅消息并将消息内容打印输出。
在scripts
目录下新建topic_pub.py
:
| #! /usr/bin/env python
#encoding=utf-8
import rospy
from std_msgs import String
def talker():
rospy.init_node('chatter')
pub = rospy.Publisher('talker_node',String,queue_size=10)
rate = rospy.Rate(10)
while not rospy.is_shutdown():
msg = "Hello ROS"
rospy.loginfo(f"Publishing:{msg}")
pub.publish(msg)
rate.sleep()
if __name__ == "__main__":
talker()
|
解释:
| #!/usr/bin/env python
# 使用环境变量中的解释器运行该脚本(通常是 Python)
import rospy # 导入 ROS Python 客户端库
from std_msgs.msg import String # 导入标准字符串消息类型
def talker():
# 创建一个 Publisher(发布者)对象
# 话题名为 'chatter',消息类型为 String,消息队列长度为 10
pub = rospy.Publisher('chatter', String, queue_size=10)
# 初始化一个节点,节点名为 'talker_node'
# anonymous=True 表示如果已有同名节点,自动加上后缀以避免冲突
rospy.init_node('talker_node', anonymous=True)
# 创建一个 Rate 对象,频率为 10Hz(每秒10次)
rate = rospy.Rate(10)
count = 0 # 用于记录发布了多少条消息
while not rospy.is_shutdown():
# 构造要发送的消息
msg = f"Hello ROS {count}"
# 打印日志,方便调试
rospy.loginfo(f"Publishing: {msg}")
# 发布消息到 'chatter' 话题
pub.publish(msg)
# 睡眠一段时间,保证发布频率是 10Hz
rate.sleep()
count += 1 # 计数器增加
# 程序入口
if __name__ == '__main__':
try:
talker() # 启动发布者
except rospy.ROSInterruptException:
pass # 如果按 Ctrl+C 结束节点,不报错退出
|
订阅方
编写发布订阅实现,要求发布方以10HZ(每秒10次)的频率发布文本消息,订阅方订阅消息并将消息内容打印输出。
在scripts
目录下新建topic_sub.py
:
| #! /usr/bin/env python
#encoding=utf-8
import rospy
from std_msgs import String
def callback(msg):
rospy.loginfo(f"Received:{msg.data}")
def listener():
rospy.init_node('listener_node')
rospy.Subscriber('chatter',String,callback)
rospy.spin()
if __name__ == "__main__":
listener()
|
解释:
| #!/usr/bin/env python
# 使用环境变量中的解释器运行该脚本
import rospy # 导入 ROS Python 客户端库
from std_msgs.msg import String # 导入标准字符串消息类型
# 定义回调函数:每收到一条消息,就会调用该函数
def callback(msg):
# msg.data 是接收到的字符串内容
rospy.loginfo(f"Received: {msg.data}")
def listener():
# 初始化订阅者节点,名称为 'listener_node'
rospy.init_node('listener_node', anonymous=True)
# 创建 Subscriber(订阅者),订阅 'chatter' 话题,消息类型为 String
# 收到新消息时,自动调用 callback() 函数处理
rospy.Subscriber('chatter', String, callback)
# 保持节点运行,等待回调函数执行
rospy.spin()
# 程序入口
if __name__ == '__main__':
listener() # 启动订阅者
|
添加可执行权限
在scripts
目录中打开终端,输入:
运行
终端1:
终端2:
| rosrun topic_ros topic_pub.py
|
终端3:
| rosrun topic_ros topic_sub.py
|
rqt_graph
