服务通信简介

service服务通讯机制是一种双向同步数据传输模式。基于客户端/服务器模型,两部分通信数据类型:一个用于请求,一个用于应答,类似web服务器。ROS中只允许一个节点提供指定命名的服务。

用python实现人脸检测

自定义服务接口

创建chapt4/chapt4_ws/src文件夹,在下面创造chapt4_interfaces

1
ros2 pkg create chapt4_interfaces --build-type ament_cmake --dependencies rosidl_default_generators sensor_msgs --license Apache-2.0

在chapt4_interfaces下面创造srv目录,接着创造FaceDetector.srv,即服务接口

1
2
3
4
5
6
7
8
sensor_msgs/Image image #原始图像
---
int16 number #人脸数量
float32 use_time #识别耗时
int32[] top #位置
int32[] right
int32[] bottom
int32[] left

在CMakeList.txt中注册

1
2
3
4
5
6
7
8
...

rosidl_generate_interfaces(${PROJECT_NAME}
"srv/FaceDetector.srv"
DEPENDENCIES sensor_msgs
)

...

在package.xml添加声明

1
2
3
4
...
<member_of_group>rosidl_interface_packages</member_of_group>

...

编译运行

1
2
3
colcon build#编译
source install/setup.bash
ros2 interface show chapt4_interfaces/srv/FaceDetector

完成服务接口的定义

人脸检测

创建ament_python的功能包

1
ros2 pkg create demo_python_service --build-type ament_python --dependencies rclpy chapt4_interfaces --license Apache-2.0

添加图片的路径

1
2
3
...
('share/' + package_name+"/resource", ['resource/default.jpeg']),
...

在demo_python_service下创建learn_face_detect.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import face_recognition
import cv2
from ament_index_python.packages import get_package_share_directory#获取功能包绝对路径

def main():
default_image_path = get_package_share_directory('demo_python_service')+'/resource/default.jpeg'
image =cv2.imread(default_image_path)
face_locations =face_recognition.face_locations(image,number_of_times_to_upsample=1,model='hog')
for top,right,bottom,left in face_locations:
cv2.rectangle(image,(left,top),(right,bottom),(255,0,0),4)

cv2.imshow('Face Detection',image)
cv2.waitKey(0)

setup.py里添加

1
2
3
4
console_scripts': [ 
'learn_face_detect = demo_python_service.learn_face_detect:main',

],

编译运行

1
2
3
4
colcon build#编译
source install/setup.bash
ros2 run demo_python_service learn_face_detect

结果如下:

定义人脸识别服务

在demo_python_service下创建face_detect_node.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import rclpy
from rclpy.node import Node
from chapt4_interfaces.srv import FaceDetector
from ament_index_python.packages import get_package_share_directory
from cv_bridge import CvBridge # 用于转换格式
import cv2
import face_recognition
import time
from rcl_interfaces.msg import SetParametersResult

class FaceDetectorionNode(Node):
def __init__(self):
super().__init__('face_detection_node')
self.bridge = CvBridge()
self.service = self.create_service(FaceDetector, '/face_detect', self.detect_face_callback)
self.defaut_image_path = get_package_share_directory('demo_python_service')+'/resource/default.jpeg'
self.upsample_times = 1
self.model = "hog"
# 声明和获取参数
self.declare_parameter('face_locations_upsample_times', 1)
self.declare_parameter('face_locations_model', "hog")
self.model = self.get_parameter("face_locations_model").value
self.upsample_times = self.get_parameter("face_locations_upsample_times").value
self.set_parameters([rclpy.Parameter('face_locations_model', rclpy.Parameter.Type.STRING, 'cnn')])
self.add_on_set_parameters_callback(self.parameter_callback)

def parameter_callback(self, parameters):
for parameter in parameters:
self.get_logger().info(
f'参数 {parameter.name} 设置为:{parameter.value}')
if parameter.name == 'face_locations_upsample_times':
self.upsample_times = parameter.value
if parameter.name == 'face_locations_model':
self.mode = parameter.value
return SetParametersResult(successful=True)


def detect_face_callback(self, request, response):
if request.image.data:
cv_image = self.bridge.imgmsg_to_cv2(
request.image)
else:
cv_image = cv2.imread(self.defaut_image_path)
start_time = time.time()
self.get_logger().info('加载完图像,开始检测')
face_locations = face_recognition.face_locations(cv_image, number_of_times_to_upsample=self.upsample_times, model=self.model)
end_time = time.time()
self.get_logger().info(f'检测完成,耗时{end_time-start_time}')
response.number = len(face_locations)
response.use_time = end_time - start_time
for top, right, bottom, left in face_locations:
response.top.append(top)
response.right.append(right)
response.bottom.append(bottom)
response.left.append(left)
return response


def main(args=None):
rclpy.init(args=args)
node = FaceDetectorionNode()
rclpy.spin(node)
rclpy.shutdown()

在setup.py中配置

1
2
3
4
5
entry_points={
'console_scripts': [
...
'face_detect_node = demo_python_service.face_detect_node:main',
...

编译启动

人脸识别服务端

在demo_python_service下创建face_detect_client_node.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import rclpy
from rclpy.node import Node
from chapt4_interfaces.srv import FaceDetector
from sensor_msgs.msg import Image
from ament_index_python.packages import get_package_share_directory
import cv2
from cv_bridge import CvBridge
from rcl_interfaces.srv import SetParameters
from rcl_interfaces.msg import Parameter, ParameterValue, ParameterType


class FaceDetectorClient(Node):
def __init__(self):
super().__init__('face_detect_client')
self.client = self.create_client(FaceDetector, '/face_detect')
self.bridge = CvBridge()
self.test1_image_path = get_package_share_directory(
'demo_python_service')+'/resource/test1.jpg'
self.image = cv2.imread(self.test1_image_path)

def send_request(self):
# 1.判断服务是否上线
while self.client.wait_for_service(timeout_sec=1.0) is False:
self.get_logger().info(f'等待服务端上线....')
# 2.构造 Request
request = FaceDetector.Request()
request.image = self.bridge.cv2_to_imgmsg(self.image)
# 3.发送并 spin 等待服务处理完成
future = self.client.call_async(request)
rclpy.spin_until_future_complete(self, future)
# 4.根据处理结果
response = future.result()
self.get_logger().info(
f'接收到响应: 图像中共有:{response.number}张脸,耗时{response.use_time}')
# 注释show_face_locations,防止显示堵塞无法多次请求
self.show_face_locations(response)

def call_set_parameters(self, parameters):
# 1. 创建一个客户端,并等待服务上线
client = self.create_client(
SetParameters, '/face_detection_node/set_parameters')
while not client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('等待参数设置服务端上线....')
# 2. 创建请求对象
request = SetParameters.Request()
request.parameters = parameters
# 3. 异步调用、等待并返回响应结果
future = client.call_async(request)
rclpy.spin_until_future_complete(self, future)
response = future.result()
return response

def update_detect_model(self,model):
# 1.创建一个参数对象
param = Parameter()
param.name = "face_locations_model"
# 2.创建参数值对象并赋值
new_model_value = ParameterValue()
new_model_value.type = ParameterType.PARAMETER_STRING
new_model_value.string_value = model
param.value = new_model_value
# 3.请求更新参数并处理
response = self.call_set_parameters([param])
for result in response.results:
if result.successful:
self.get_logger().info(f'参数 {param.name} 设置为{model}')
else:
self.get_logger().info(f'参数设置失败,原因为:{result.reason}')


def show_face_locations(self, response):
for i in range(response.number):
top = response.top[i]
right = response.right[i]
bottom = response.bottom[i]
left = response.left[i]
cv2.rectangle(self.image, (left, top),
(right, bottom), (255, 0, 0), 2)

cv2.imshow('Face Detection Result', self.image)
cv2.waitKey(0)


def main(args=None):
rclpy.init(args=args)

face_detect_client = FaceDetectorClient()
face_detect_client.update_detect_model('hog')
face_detect_client.send_request()
face_detect_client.update_detect_model('cnn')
face_detect_client.send_request()
rclpy.spin(face_detect_client)
rclpy.shutdown()

在resource下添加新的图片test1,

然后配置setup.py

1
2
3
4
5
6
7
8
9
(         ...
('share/' + package_name+"/resource", ['resource/test1.jpg']),
],
...
'console_scripts': [
...
'face_detect_client_node = demo_python_service.face_detect_client_node:main',
],
},

编译运行