10.1.3 action 通信实现 B python

优质
小牛编辑
125浏览
2023-12-01

3.1.3 action通信自定义action文件调用(Python)

需求:

创建两个ROS 节点,服务器和客户端,客户端可以向服务器发送目标数据N(一个整型数据)服务器会计算 1 到 N 之间所有整数的和,这是一个循环累加的过程,返回给客户端,这是基于请求响应模式的,又已知服务器从接收到请求到产生响应是一个耗时操作,每累加一次耗时0.1s,为了良好的用户体验,需要服务器在计算过程中,每累加一次,就给客户端响应一次百分比格式的执行进度,使用 action实现。

流程:

  1. 编写action服务端实现;
  2. 编写action客户端实现;
  3. 编辑CMakeLists.txt;
  4. 编译并执行。

0.vscode配置

需要像之前自定义 msg 实现一样配置settings.json 文件,如果以前已经配置且没有变更工作空间,可以忽略,如果需要配置,配置方式与之前相同:

{
    "python.autoComplete.extraPaths": [
        "/opt/ros/noetic/lib/python3/dist-packages",
        "/xxx/yyy工作空间/devel/lib/python3/dist-packages"
    ]
}

1.服务端

#! /usr/bin/env python
import rospy
import actionlib
from demo01_action.msg import *
"""
    需求:
        创建两个ROS 节点,服务器和客户端,
        客户端可以向服务器发送目标数据N(一个整型数据)服务器会计算 1 到 N 之间所有整数的和,
        这是一个循环累加的过程,返回给客户端,这是基于请求响应模式的,
        又已知服务器从接收到请求到产生响应是一个耗时操作,每累加一次耗时0.1s,
        为了良好的用户体验,需要服务器在计算过程中,
        每累加一次,就给客户端响应一次百分比格式的执行进度,使用 action实现。
    流程:
        1.导包
        2.初始化 ROS 节点
        3.使用类封装,然后创建对象
        4.创建服务器对象
        5.处理请求数据产生响应结果,中间还要连续反馈
        6.spin
"""

class MyActionServer:
    def __init__(self):
        #SimpleActionServer(name, ActionSpec, execute_cb=None, auto_start=True)
        self.server = actionlib.SimpleActionServer("addInts",AddIntsAction,self.cb,False)
        self.server.start()
        rospy.loginfo("服务端启动")


    def cb(self,goal):
        rospy.loginfo("服务端处理请求:")
        #1.解析目标值
        num = goal.num
        #2.循环累加,连续反馈
        rate = rospy.Rate(10)
        sum = 0
        for i in range(1,num + 1):
            # 累加
            sum = sum + i
            # 计算进度并连续反馈
            feedBack = i / num
            rospy.loginfo("当前进度:%.2f",feedBack)

            feedBack_obj = AddIntsFeedback()
            feedBack_obj.progress_bar = feedBack
            self.server.publish_feedback(feedBack_obj)
            rate.sleep()
        #3.响应最终结果
        result = AddIntsResult()
        result.result = sum        
        self.server.set_succeeded(result)
        rospy.loginfo("响应结果:%d",sum)
if __name__ == "__main__":
    rospy.init_node("action_server_p")
    server = MyActionServer()
    rospy.spin()

PS:

可以先配置CMakeLists.tx文件并启动上述action服务端,然后通过 rostopic 查看话题,向action相关话题发送消息,或订阅action相关话题的消息。

2.客户端

#! /usr/bin/env python

import rospy
import actionlib
from demo01_action.msg import *

"""
    需求:
        创建两个ROS 节点,服务器和客户端,
        客户端可以向服务器发送目标数据N(一个整型数据)服务器会计算 1 到 N 之间所有整数的和,
        这是一个循环累加的过程,返回给客户端,这是基于请求响应模式的,
        又已知服务器从接收到请求到产生响应是一个耗时操作,每累加一次耗时0.1s,
        为了良好的用户体验,需要服务器在计算过程中,
        每累加一次,就给客户端响应一次百分比格式的执行进度,使用 action实现。
    流程:
        1.导包
        2.初始化 ROS 节点
        3.创建 action Client 对象
        4.等待服务
        5.组织目标对象并发送
        6.编写回调, 激活、连续反馈、最终响应
        7.spin
"""

def done_cb(state,result):
    if state == actionlib.GoalStatus.SUCCEEDED:
        rospy.loginfo("响应结果:%d",result.result)

def active_cb():
    rospy.loginfo("服务被激活....")


def fb_cb(fb):
    rospy.loginfo("当前进度:%.2f",fb.progress_bar)

if __name__ == "__main__":
    # 2.初始化 ROS 节点
    rospy.init_node("action_client_p")
    # 3.创建 action Client 对象
    client = actionlib.SimpleActionClient("addInts",AddIntsAction)
    # 4.等待服务
    client.wait_for_server()
    # 5.组织目标对象并发送
    goal_obj = AddIntsGoal()
    goal_obj.num = 10
    client.send_goal(goal_obj,done_cb,active_cb,fb_cb)
    # 6.编写回调, 激活、连续反馈、最终响应
    # 7.spin
    rospy.spin()

3.编辑配置文件

先为 Python 文件添加可执行权限:chmod +x *.py

catkin_install_python(PROGRAMS
  scripts/action01_server_p.py
  scripts/action02_client_p.py
  DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION}
)

4.执行

首先启动 roscore,然后分别启动action服务端与action客户端,最终运行结果与案例类似。