跳转至

自定义message

在 ROS 通信协议中,数据载体是一个较为重要组成部分,ROS 中通过 std_msgs 封装了一些原生的数据类型,比如:String、Int32、Int64、Char、Bool、Empty.... 但是,这些数据一般只包含一个 data 字段,结构的单一意味着功能上的局限性,当传输一些复杂的数据,比如: 激光雷达的信息... std_msgs 由于描述性较差而显得力不从心,这种场景下可以使用自定义的消息类型

msgs只是简单的文本文件,每行具有字段类型和字段名称,可以使用的字段类型有:

  • int8, int16, int32, int64 (或者无符号类型: uint*)
  • float32, float64
  • string
  • time, duration
  • other msg files
  • variable-length array[] and fixed-length array[C]

ROS中还有一种特殊类型:Header​,标头包含时间戳和ROS中常用的坐标帧信息。会经常看到msg文件的第一行具有Header标头​。


需求: 创建自定义消息,该消息包含人的信息:姓名、身高、年龄等。

流程:

  1. 按照固定格式创建 msg 文件
  2. 编辑配置文件
  3. 编译生成可以被 Python 或 C++ 调用的中间文件

创建msg文件

这里以上一篇文章的工作空间和功能包为例

进入包目录,创建msg​ 文件夹:

mkdir msg

msg​ 文件夹中创建 Person.msg​ 文件:

1
2
3
string name
float32 height
int32 age

编辑CMakeLists.txt

编辑功能包下的 CMakeLists.txt​,修改find_package​, add_message_files​, generate_messages​,告诉 CMake 如何生成 .msg​ 消息代码

## 查找依赖包,添加 message_generation 用于生成自定义消息
find_package(catkin REQUIRED COMPONENTS
  std_msgs
  message_generation
# 需要加入 message_generation,必须有 std_msgs
)

## 指定要生成的消息文件
add_message_files(
  FILES
  Person.msg # 自定义的消息定义文件
)

## 指定消息生成时依赖哪些包(比如 std_msgs)
generate_messages(
  DEPENDENCIES
  std_msgs
)

## 声明这个 catkin 包导出依赖,包括消息运行时依赖
catkin_package(
  CATKIN_DEPENDS roscpp rospy std_msgs message_runtime
)

编辑package.xml

编辑功能包下的 package.xml​,添加message_generation​, message_runtime​ 依赖,告诉 ROS 构建系统你的包需要这些功能支持

1
2
3
4
5
<!-- 生成阶段依赖:编译时需要用 message_generation 来生成 .h/.py 消息文件 -->
<build_depend>message_generation</build_depend>

<!-- 安装后运行时需要用到的依赖 -->
<exec_depend>message_runtime</exec_depend>

编译

回到工作空间根目录并编译:

catkin_make

编译成功后,Python 需要调用的中间文件(工作空间/devel/lib/python3/dist-packages/包名/msg)

image

后续在 Python中调用相关 msg 时,就是从这些中间文件调用的:

from my_package.msg import Person

发布方

topic_msg_pub.py:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import rospy
from my_package.msg import Person  # 导入自定义消息类型

def talker():
    # 创建一个发布者,发布到话题 'person_info',消息类型为 Person,队列长度为10
    pub = rospy.Publisher('person_info', Person, queue_size=10)

    # 初始化节点名为 'person_talker',anonymous=True 表示允许多个同名节点同时运行
    rospy.init_node('person_talker', anonymous=True)

    # 设置发布频率为 10Hz(每秒10次)
    rate = rospy.Rate(10)

    while not rospy.is_shutdown():
        # 创建一个 Person 消息对象并赋值
        person = Person()
        person.name = "张三"
        person.height = 1.75
        person.age = 25

        # 打印日志信息
        rospy.loginfo("发布消息: 姓名: %s, 身高: %.2f, 年龄: %d", person.name, person.height, person.age)

        # 发布消息
        pub.publish(person)

        # 等待下一个循环
        rate.sleep()

if __name__ == '__main__':
    try:
        talker()
    except rospy.ROSInterruptException:
        pass  # 若程序中断(Ctrl+C),则退出

订阅方

topic_msg_sub.py:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import rospy
from my_package.msg import Person  # 导入自定义消息类型

# 回调函数:当收到消息时自动调用
def person_callback(msg):
    rospy.loginfo("接收到消息:")
    rospy.loginfo("姓名: %s", msg.name)
    rospy.loginfo("身高: %.2f 米", msg.height)
    rospy.loginfo("年龄: %d 岁", msg.age)

def listener():
    # 初始化节点,anonymous=True 允许多个同名节点共存
    rospy.init_node('person_listener', anonymous=True)

    # 创建一个订阅者,订阅话题 'person_info',消息类型为 Person,回调函数为 person_callback
    rospy.Subscriber('person_info', Person, person_callback)

    # 打印启动信息,spin() 保持程序运行直到关闭
    rospy.loginfo("订阅者已启动,等待消息...")
    rospy.spin()

if __name__ == '__main__':
    listener()

提权

为python文件添加可执行权限:

chmod +x *.py

运行

终端1:

roscore

终端2:

rosrun topic_ros topic_msg_pub.py

image

看到的 name: "\u5F20\u4E09"​ 是因为 ROS 的日志系统 rospy.loginfo()​ 默认是按 Unicode 编码输出的,终端没有自动把 Unicode 解码成中文显示。

手动解码:

echo -e "\u5F20\u4E09"

image

终端3:

rosrun topic_ros topic_msg_sub.py

image

rqt_graph

rqt_graph

image