ROS 2 Action Server Guide 2026Server · Client · Feedback · Cancel
Complete 2026 guide to ROS 2 Actions — the right choice for long-running robot tasks like navigation, manipulation, and computation. Covers defining a custom .action file, writing an async Python action server with goal acceptance, feedback streaming, and cancel handling, writing the action client, and using CLI tools.
Topic vs Service vs Action
| Feature | Topic | Service | Action |
|---|---|---|---|
| Use case | One-way publish, fire-and-forget | Request-response, brief call | Long-running task with feedback and cancellation |
| Feedback | No | No | Yes — streamed while executing |
| Cancellation | No | No | Yes — client can cancel any time |
| Result | No | Synchronous response | Async result on completion |
| Blocking | No | Yes (synchronous) | No (fully async) |
| Examples | /cmd_vel, /scan, /tf | /set_parameters, /clear_costmap | /navigate_to_pose, /compute_path_to_pose, /gripper_command |
Define a Custom Action
Define a Custom Action (.action file)
Action files have three sections separated by ---: Goal (sent by client), Result (sent when done), Feedback (sent periodically).
# action/Navigate.action
#
# Goal — what the client asks for
geometry_msgs/PoseStamped target_pose
float32 speed # m/s
---
# Result — returned once the action finishes (success or abort)
bool success
float32 total_distance_traveled # metres
string message
---
# Feedback — streamed while the action is executing
float32 distance_remaining # metres
float32 current_speed # m/s
string current_state # e.g. "PLANNING", "NAVIGATING", "RECOVERY"⚡ After creating the .action file, add it to CMakeLists.txt via rosidl_generate_interfaces() and rebuild.
CMakeLists.txt — Register the Action
Add the action to your package so it gets built into a usable interface.
cmake_minimum_required(VERSION 3.8)
project(my_robot_interfaces)
find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)
find_package(geometry_msgs REQUIRED)
rosidl_generate_interfaces(
${PROJECT_NAME}
"action/Navigate.action"
DEPENDENCIES geometry_msgs
)
ament_export_dependencies(rosidl_default_runtime)
ament_package()Action Server
Action Server (Python)
The server accepts goals, runs the long task in a thread, sends periodic feedback, and returns a result.
#!/usr/bin/env python3
"""Navigation action server — demonstrates goal, feedback, result, and preemption."""
import time
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer, CancelResponse, GoalResponse
from rclpy.callback_groups import ReentrantCallbackGroup
from my_robot_interfaces.action import Navigate
class NavigationActionServer(Node):
def __init__(self):
super().__init__("navigation_action_server")
self._action_server = ActionServer(
self,
Navigate,
"navigate",
execute_callback=self._execute_callback,
goal_callback=self._goal_callback,
cancel_callback=self._cancel_callback,
callback_group=ReentrantCallbackGroup(),
)
self.get_logger().info("Navigation action server ready.")
def _goal_callback(self, goal_request):
"""Accept or reject a goal before execution starts."""
self.get_logger().info(f"Received goal: {goal_request.target_pose}")
# Reject if speed is unrealistic
if goal_request.speed > 2.0:
self.get_logger().warn("Rejecting goal: speed > 2.0 m/s")
return GoalResponse.REJECT
return GoalResponse.ACCEPT
def _cancel_callback(self, goal_handle):
"""Accept or reject a cancel request."""
self.get_logger().info("Cancel request received.")
return CancelResponse.ACCEPT
async def _execute_callback(self, goal_handle):
"""Main execution — runs in a worker thread via ReentrantCallbackGroup."""
self.get_logger().info("Executing navigation goal...")
feedback_msg = Navigate.Feedback()
total_distance = 10.0 # pretend 10 m to travel
distance_traveled = 0.0
speed = goal_handle.request.speed
while distance_traveled < total_distance:
# Check if the client requested cancellation
if goal_handle.is_cancel_requested:
goal_handle.canceled()
result = Navigate.Result()
result.success = False
result.total_distance_traveled = distance_traveled
result.message = "Navigation cancelled by client."
self.get_logger().info("Goal cancelled.")
return result
# Simulate travel
step = speed * 0.1 # 100 ms timestep
distance_traveled = min(distance_traveled + step, total_distance)
# Publish feedback
feedback_msg.distance_remaining = total_distance - distance_traveled
feedback_msg.current_speed = speed
feedback_msg.current_state = "NAVIGATING"
goal_handle.publish_feedback(feedback_msg)
time.sleep(0.1)
# Success
goal_handle.succeed()
result = Navigate.Result()
result.success = True
result.total_distance_traveled = distance_traveled
result.message = "Navigation complete."
self.get_logger().info("Goal succeeded.")
return result
def main():
rclpy.init()
node = NavigationActionServer()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == "__main__":
main()⚠️ Use ReentrantCallbackGroup so the server can handle new goals while one is executing. Without it, the execute callback blocks all other callbacks.
Action Client
Action Client (Python)
The client sends a goal, subscribes to feedback, and waits for the result — with optional cancellation.
#!/usr/bin/env python3
"""Navigation action client — sends a goal, prints feedback, handles result."""
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from rclpy.task import Future
from geometry_msgs.msg import PoseStamped
from my_robot_interfaces.action import Navigate
class NavigationActionClient(Node):
def __init__(self):
super().__init__("navigation_action_client")
self._action_client = ActionClient(self, Navigate, "navigate")
def send_goal(self, x: float, y: float, speed: float = 0.5):
"""Send a navigation goal and wait for server availability."""
self.get_logger().info("Waiting for action server...")
self._action_client.wait_for_server()
goal_msg = Navigate.Goal()
goal_msg.target_pose = PoseStamped()
goal_msg.target_pose.header.frame_id = "map"
goal_msg.target_pose.header.stamp = self.get_clock().now().to_msg()
goal_msg.target_pose.pose.position.x = x
goal_msg.target_pose.pose.position.y = y
goal_msg.target_pose.pose.orientation.w = 1.0
goal_msg.speed = speed
self.get_logger().info(f"Sending goal: ({x}, {y}) at {speed} m/s")
# send_goal_async returns a future for the GoalHandle
send_goal_future = self._action_client.send_goal_async(
goal_msg,
feedback_callback=self._feedback_callback,
)
send_goal_future.add_done_callback(self._goal_response_callback)
def _goal_response_callback(self, future: Future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().error("Goal rejected!")
return
self.get_logger().info("Goal accepted.")
# Get the result asynchronously
result_future = goal_handle.get_result_async()
result_future.add_done_callback(self._result_callback)
def _feedback_callback(self, feedback_msg):
fb = feedback_msg.feedback
self.get_logger().info(
f"[{fb.current_state}] remaining: {fb.distance_remaining:.2f} m "
f"@ {fb.current_speed:.1f} m/s"
)
def _result_callback(self, future: Future):
result = future.result().result
status = future.result().status
if result.success:
self.get_logger().info(
f"SUCCESS: travelled {result.total_distance_traveled:.2f} m. "
f"{result.message}"
)
else:
self.get_logger().warn(f"FAILED: {result.message}")
rclpy.shutdown()
def main():
rclpy.init()
client = NavigationActionClient()
client.send_goal(x=5.0, y=3.0, speed=0.8)
rclpy.spin(client)
if __name__ == "__main__":
main()Cancel a Goal Mid-Execution
Cancel a Goal Mid-Execution
Send a cancel request from the client after the goal has been accepted.
import asyncio
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from my_robot_interfaces.action import Navigate
from geometry_msgs.msg import PoseStamped
class CancellableClient(Node):
def __init__(self):
super().__init__("cancellable_client")
self._client = ActionClient(self, Navigate, "navigate")
self._goal_handle = None
async def navigate_and_cancel(self):
self._client.wait_for_server()
goal = Navigate.Goal()
goal.target_pose = PoseStamped()
goal.target_pose.header.frame_id = "map"
goal.target_pose.pose.orientation.w = 1.0
goal.speed = 0.3
# Send goal
future = self._client.send_goal_async(goal)
self._goal_handle = await future
self.get_logger().info("Goal sent, cancelling in 2 seconds...")
await asyncio.sleep(2.0)
# Send cancel request
cancel_future = self._goal_handle.cancel_goal_async()
await cancel_future
self.get_logger().info("Cancel sent.")
def main():
rclpy.init()
node = CancellableClient()
asyncio.run(node.navigate_and_cancel())
rclpy.shutdown()CLI Tools
Action CLI Tools
Send goals, list action servers, and inspect action types from the terminal.
# List all action servers
ros2 action list
# Show action type
ros2 action type /navigate
# Show action server info
ros2 action info /navigate --count
# Send a goal from CLI (one-liner)
ros2 action send_goal /navigate my_robot_interfaces/action/Navigate \
"target_pose: {header: {frame_id: map}, pose: {position: {x: 5.0, y: 3.0}, orientation: {w: 1.0}}}
speed: 0.5" \
--feedback
# Check action interface definition
ros2 interface show my_robot_interfaces/action/Navigate
# List built-in action types (Nav2 uses these)
ros2 interface list --only-actions | grep nav2