话题通信的意义

为了获取周围的信息(传感器、相关数据),ROS2引入了专门的订阅发布机制,是ROS中最常用的通信方式之一。

在命令行里发布话题

此处以小海龟模拟器为例

1
2
3
4
ros2 node info /turtlesim #用于查看节点的详细信息 
#Subscribers:其订阅的所有话题;
#Publishers:节点发布的所有话题

1
2
ros2 topic echo /turtle1/pose#用于输出话题数据的命令

1
ros2 topic info /turtle1/cmd_vel -v#用于查看某个话题的具体信息
1
ros2 interface show geometry_msgs/msg/Twist#用于输出接口的定义内容
1
2
ros2 topic pub /turtle1/cmd_vel geometry_msgs/msg/Twist "{linear: {x: 1.0}}"
#以线速度1.0m/s向右直行

使用Python发布与订阅话题

使用Python发布话题

新建一个文件夹,vscode打开,然后该文件夹下创建topic_ws,在下面创建src目录,打开终端,创建demo_python_topic功能包

1
2
ros2 pkg create demo_python_topic --build-type ament_python --dependencies rclpy example_interfaces --license Apache-2.0
#创建demo_python_topic功能包

创建novel_pub_node.py文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import rclpy
from rclpy.node import Node
import requests
from example_interfaces.msg import String
from queue import Queue

class NovelPubNode(Node):
def __init__(self,node_name):
super().__init__(node_name)
self.novels_queue_=Queue()#创建队列,存放小说
self.novel_publisher_=self.create_publisher(String,'novel',10)
#创建话题发布者,发布小说
self.timer_=self.create_timer(5,self.timer_callback)#创建定时器

def download_novel(self,url):
response =requests.get(url)
response.encoding = 'utf-8'
self.get_logger().info(f'下载完成:{url}')
for line in response.text.splitlines():
self.novels_queue_.put(line)

def timer_callback(self):
if self.novels_queue_.qsize() >0 :
msg = String()#实例化一个信息p[p]
msg.data = self.novels_queue_.get()#
self.novel_publisher_.publish(msg)
self.get_logger().info(f'发布了一行小说:{msg.data}')
def main():
rclpy.init()
node =NovelPubNode('novel_pub')
node.download_novel('http://localhost:8000/novel1.txt')
rclpy.spin(node)
rclpy.shutdown()


在setup.py中添加启动语句

1
2
3
4
5
6
#...
entry_points={
'console_scripts': [
'novel_pub_node = demo_python_topic.novel_pub_node:main',
],
#在这里添加构建语句

新建一个终端(相当于一个是发布者,一个是订阅者)

1
python3 -m http.server 8000#打开http

编译,运行

1
2
3
colcon build#编译
source install/setup.bash
ros2 run demo_python_topic novel_pub_node#运行程序

然后就可以下载src目录下构建的novel1.txt文件里的内容

1
2
3
4
5
6
[INFO] [1756389267.525765835] [novel_pub]: 下载完成:http://localhost:8000/novel1.txt
[INFO] [1756389272.173843085] [novel_pub]: 发布了一行小说:这是一个美丽的故事开始。
[INFO] [1756389277.174012442] [novel_pub]: 发布了一行小说:主人公踏上了冒险的旅程。
[INFO] [1756389282.181176548] [novel_pub]: 发布了一行小说:他遇到了许多有趣的人物。
8[INFO] [1756389287.174946628] [novel_pub]: 发布了一行小说:每个章节都充满了惊喜和挑战。
[INFO] [1756389292.173441667] [novel_pub]: 发布了一行小说:故事逐渐展开,情节越来越精彩。

使用Python订阅话题

创建novel_pub_node.py文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import espeakng
import rclpy
from rclpy.node import Node
import threading
from example_interfaces.msg import String
from queue import Queue
import time

class NovelSubNode(Node):
def __init__(self,node_name):
super().__init__(node_name)
self.novels_queue_=Queue()#创建队列,存放小说
self.novel_subscriber_=self.create_subscription(String,'novel',self.novel_callback,10)
self.speech_thread_ =threading.Thread(target=self.speak_thread)
self.speech_thread_.start()

def novel_callback(self,msg):
self.novels_queue_.put(msg.data)
def speak_thread(self):
speaker = espeakng.Speaker()
speaker.voice = 'zh'
while rclpy.ok():
if self.novels_queue_.qsize()>0:
text = self.novels_queue_.get()
self.get_logger().info(f'正在朗读:{text}')
speaker.say(text)
speaker.wait()
else:
time.sleep(1)


def main(args=None):
rclpy.init(args=args)
node =NovelSubNode('novel_read')

rclpy.spin(node)
rclpy.shutdown()


在setup.py中添加启动语句

1
2
3
4
5
6
7
#...
entry_points={
'console_scripts': [
'novel_pub_node = demo_python_topic.novel_pub_node:main',\
'novel_sub_node = demo_python_topic.novel_sub_node:main',
],
#在这里添加构建语句

查看发布者和接收者的命令

1
2
ros2 topic list -v

可以看到

1
2
3
4
5
6
7
Published topics:
* /novel [example_interfaces/msg/String] 1 publisher

...

Subscribed topics:
* /novel [example_interfaces/msg/String] 1 subscriber

至此,已经完成python的所有的发布与订阅流程。

可参考其他文章:

(20 封私信 / 80 条消息) ROS2之第2讲–话题(topic)程序的编写 - 知乎