服务通信简介
service服务通讯机制是一种双向同步数据传输模式。基于客户端/服务器模型,两部分通信数据类型:一个用于请求,一个用于应答,类似web服务器。ROS中只允许一个节点提供指定命名的服务。
用python实现人脸检测
自定义服务接口
创建chapt4/chapt4_ws/src文件夹,在下面创造chapt4_interfaces
1 ros2 pkg create chapt4_interfaces --build-type ament_cmake --dependencies rosidl_default_generators sensor_msgs --license Apache-2.0
在chapt4_interfaces下面创造srv目录,接着创造FaceDetector.srv,即服务接口
1 2 3 4 5 6 7 8 sensor_msgs/Image image #原始图像 --- int16 number #人脸数量 float32 use_time #识别耗时 int32[] top #位置 int32[] right int32[] bottom int32[] left
在CMakeList.txt中注册
1 2 3 4 5 6 7 8 ... rosidl_generate_interfaces(${PROJECT_NAME} "srv/FaceDetector.srv" DEPENDENCIES sensor_msgs ) ...
在package.xml添加声明
1 2 3 4 ... <member_of_group>rosidl_interface_packages</member_of_group> ...
编译运行
1 2 3 colcon build#编译 source install/setup.bash ros2 interface show chapt4_interfaces/srv/FaceDetector
完成服务接口的定义
人脸检测
创建ament_python的功能包
1 ros2 pkg create demo_python_service --build-type ament_python --dependencies rclpy chapt4_interfaces --license Apache-2.0
添加图片的路径
1 2 3 ... ('share/' + package_name+"/resource", ['resource/default.jpeg']), ...
在demo_python_service下创建learn_face_detect.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import face_recognitionimport cv2from ament_index_python.packages import get_package_share_directorydef main (): default_image_path = get_package_share_directory('demo_python_service' )+'/resource/default.jpeg' image =cv2.imread(default_image_path) face_locations =face_recognition.face_locations(image,number_of_times_to_upsample=1 ,model='hog' ) for top,right,bottom,left in face_locations: cv2.rectangle(image,(left,top),(right,bottom),(255 ,0 ,0 ),4 ) cv2.imshow('Face Detection' ,image) cv2.waitKey(0 )
setup.py里添加
1 2 3 4 console_scripts': [ ' learn_face_detect = demo_python_service.learn_face_detect:main', ],
编译运行
1 2 3 4 colcon build source install/setup.bash ros2 run demo_python_service learn_face_detect
结果如下:
定义人脸识别服务
在demo_python_service下创建face_detect_node.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 import rclpyfrom rclpy.node import Nodefrom chapt4_interfaces.srv import FaceDetector from ament_index_python.packages import get_package_share_directoryfrom cv_bridge import CvBridge import cv2import face_recognitionimport timefrom rcl_interfaces.msg import SetParametersResultclass FaceDetectorionNode (Node ): def __init__ (self ): super ().__init__('face_detection_node' ) self.bridge = CvBridge() self.service = self.create_service(FaceDetector, '/face_detect' , self.detect_face_callback) self.defaut_image_path = get_package_share_directory('demo_python_service' )+'/resource/default.jpeg' self.upsample_times = 1 self.model = "hog" self.declare_parameter('face_locations_upsample_times' , 1 ) self.declare_parameter('face_locations_model' , "hog" ) self.model = self.get_parameter("face_locations_model" ).value self.upsample_times = self.get_parameter("face_locations_upsample_times" ).value self.set_parameters([rclpy.Parameter('face_locations_model' , rclpy.Parameter.Type .STRING, 'cnn' )]) self.add_on_set_parameters_callback(self.parameter_callback) def parameter_callback (self, parameters ): for parameter in parameters: self.get_logger().info( f'参数 {parameter.name} 设置为:{parameter.value} ' ) if parameter.name == 'face_locations_upsample_times' : self.upsample_times = parameter.value if parameter.name == 'face_locations_model' : self.mode = parameter.value return SetParametersResult(successful=True ) def detect_face_callback (self, request, response ): if request.image.data: cv_image = self.bridge.imgmsg_to_cv2( request.image) else : cv_image = cv2.imread(self.defaut_image_path) start_time = time.time() self.get_logger().info('加载完图像,开始检测' ) face_locations = face_recognition.face_locations(cv_image, number_of_times_to_upsample=self.upsample_times, model=self.model) end_time = time.time() self.get_logger().info(f'检测完成,耗时{end_time-start_time} ' ) response.number = len (face_locations) response.use_time = end_time - start_time for top, right, bottom, left in face_locations: response.top.append(top) response.right.append(right) response.bottom.append(bottom) response.left.append(left) return response def main (args=None ): rclpy.init(args=args) node = FaceDetectorionNode() rclpy.spin(node) rclpy.shutdown()
在setup.py中配置
1 2 3 4 5 entry_points={ 'console_scripts' : [ ... 'face_detect_node = demo_python_service.face_detect_node:main' , ...
编译启动
人脸识别服务端
在demo_python_service下创建face_detect_client_node.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 import rclpyfrom rclpy.node import Nodefrom chapt4_interfaces.srv import FaceDetectorfrom sensor_msgs.msg import Imagefrom ament_index_python.packages import get_package_share_directoryimport cv2from cv_bridge import CvBridgefrom rcl_interfaces.srv import SetParametersfrom rcl_interfaces.msg import Parameter, ParameterValue, ParameterTypeclass FaceDetectorClient (Node ): def __init__ (self ): super ().__init__('face_detect_client' ) self.client = self.create_client(FaceDetector, '/face_detect' ) self.bridge = CvBridge() self.test1_image_path = get_package_share_directory( 'demo_python_service' )+'/resource/test1.jpg' self.image = cv2.imread(self.test1_image_path) def send_request (self ): while self.client.wait_for_service(timeout_sec=1.0 ) is False : self.get_logger().info(f'等待服务端上线....' ) request = FaceDetector.Request() request.image = self.bridge.cv2_to_imgmsg(self.image) future = self.client.call_async(request) rclpy.spin_until_future_complete(self, future) response = future.result() self.get_logger().info( f'接收到响应: 图像中共有:{response.number} 张脸,耗时{response.use_time} ' ) self.show_face_locations(response) def call_set_parameters (self, parameters ): client = self.create_client( SetParameters, '/face_detection_node/set_parameters' ) while not client.wait_for_service(timeout_sec=1.0 ): self.get_logger().info('等待参数设置服务端上线....' ) request = SetParameters.Request() request.parameters = parameters future = client.call_async(request) rclpy.spin_until_future_complete(self, future) response = future.result() return response def update_detect_model (self,model ): param = Parameter() param.name = "face_locations_model" new_model_value = ParameterValue() new_model_value.type = ParameterType.PARAMETER_STRING new_model_value.string_value = model param.value = new_model_value response = self.call_set_parameters([param]) for result in response.results: if result.successful: self.get_logger().info(f'参数 {param.name} 设置为{model} ' ) else : self.get_logger().info(f'参数设置失败,原因为:{result.reason} ' ) def show_face_locations (self, response ): for i in range (response.number): top = response.top[i] right = response.right[i] bottom = response.bottom[i] left = response.left[i] cv2.rectangle(self.image, (left, top), (right, bottom), (255 , 0 , 0 ), 2 ) cv2.imshow('Face Detection Result' , self.image) cv2.waitKey(0 ) def main (args=None ): rclpy.init(args=args) face_detect_client = FaceDetectorClient() face_detect_client.update_detect_model('hog' ) face_detect_client.send_request() face_detect_client.update_detect_model('cnn' ) face_detect_client.send_request() rclpy.spin(face_detect_client) rclpy.shutdown()
在resource下添加新的图片test1,
然后配置setup.py
1 2 3 4 5 6 7 8 9 ( ... ('share/' + package_name+"/resource" , ['resource/test1.jpg' ]), ], ... 'console_scripts' : [ ... 'face_detect_client_node = demo_python_service.face_detect_client_node:main' , ], },
编译运行