ROS 2 Service & Client Guide 2026
Define custom .srv files, write Python service servers and async clients with call_async, implement C++ servers, use wait_for_service, and test with CLI tools.
How ROS 2 Services Work
A service is a synchronous request/reply channel. A server node registers a service name and waits for requests. A client node sends a request and waits for the response. Unlike topics, services are one-to-one: one client talks to one server per call. Under the hood they use DDS request/reply patterns with guaranteed delivery.
Define a Custom Service (.srv file)
A .srv file has a Request section and a Response section separated by ---. Place it in a srv/ subdirectory of your package.
# srv/FindObject.srv
# ─── Request ───────────────────────────────────────────────
string object_name # name of the object to search
float32 search_radius # search radius in meters
bool use_cache # check cache before scanning
---
# ─── Response ──────────────────────────────────────────────
bool found
float32 x
float32 y
float32 z
string frame_id # coordinate frame where object was found⚡ Types available: bool, int8/16/32/64, uint8/16/32/64, float32/64, string, byte, char, and any built-in message type (geometry_msgs/Point, etc.)
Register the .srv in CMakeLists.txt
For C++ packages, use rosidl_generate_interfaces to compile the .srv into Python and C++ bindings. For Python-only packages, you still need ament_cmake.
find_package(rosidl_default_generators REQUIRED)
find_package(geometry_msgs REQUIRED) # if your .srv uses geometry_msgs types
rosidl_generate_interfaces(
${PROJECT_NAME}
"srv/FindObject.srv"
DEPENDENCIES geometry_msgs # list all packages whose types you import
)
ament_export_dependencies(rosidl_default_runtime)⚡ Add rosidl_default_generators to <build_depend> and rosidl_default_runtime to <exec_depend> in package.xml.
Python Service Server
Subclass Node, call create_service, and implement the callback. The callback receives (request, response) and must return the response.
import rclpy
from rclpy.node import Node
from my_interfaces.srv import FindObject # your generated service type
class FindObjectServer(Node):
def __init__(self):
super().__init__("find_object_server")
# create_service(ServiceType, service_name, callback)
self._srv = self.create_service(
FindObject,
"find_object",
self._handle_request,
)
self.get_logger().info("FindObject service ready")
def _handle_request(
self,
request: FindObject.Request,
response: FindObject.Response,
) -> FindObject.Response:
self.get_logger().info(
f"Searching for '{request.object_name}' "
f"within {request.search_radius:.1f}m"
)
# --- your real search logic here ---
if request.object_name == "cup":
response.found = True
response.x = 1.2
response.y = 0.3
response.z = 0.8
response.frame_id = "map"
else:
response.found = False
return response # MUST return the response object
def main(args=None):
rclpy.init(args=args)
node = FindObjectServer()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()Python Async Service Client (call_async)
The recommended pattern: call_async() returns a Future. Add a done callback or use rclpy.spin_until_future_complete() to wait for the result.
import sys
import rclpy
from rclpy.node import Node
from my_interfaces.srv import FindObject
class FindObjectClient(Node):
def __init__(self):
super().__init__("find_object_client")
# Create a client for the FindObject service
self._client = self.create_client(FindObject, "find_object")
# Block until the service is available (with timeout)
if not self._client.wait_for_service(timeout_sec=5.0):
self.get_logger().error("Service 'find_object' not available — is the server running?")
raise RuntimeError("service unavailable")
def send_request(self, object_name: str, radius: float) -> None:
req = FindObject.Request()
req.object_name = object_name
req.search_radius = radius
req.use_cache = True
# Non-blocking call — returns a Future immediately
future = self._client.call_async(req)
# Register callback to be called when the future completes
future.add_done_callback(self._response_callback)
def _response_callback(self, future) -> None:
try:
result = future.result()
if result.found:
self.get_logger().info(
f"Found at ({result.x:.2f}, {result.y:.2f}, {result.z:.2f}) "
f"in '{result.frame_id}'"
)
else:
self.get_logger().info("Object not found")
except Exception as e:
self.get_logger().error(f"Service call failed: {e}")
def main(args=None):
rclpy.init(args=args)
client = FindObjectClient()
client.send_request("cup", 2.0)
rclpy.spin(client) # processes the callback when future completes
client.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()⚡ For one-shot scripts (not spinning nodes), use rclpy.spin_until_future_complete(node, future) instead of add_done_callback + spin.
Synchronous-style Call with spin_until_future_complete
For scripts that need a blocking call without a persistent executor, spin_until_future_complete blocks until the Future is done.
def call_service_blocking(object_name: str, radius: float):
rclpy.init()
node = rclpy.create_node("temp_client")
client = node.create_client(FindObject, "find_object")
# Wait up to 3 seconds for the server
if not client.wait_for_service(timeout_sec=3.0):
node.get_logger().error("Service not available")
node.destroy_node()
rclpy.shutdown()
return None
req = FindObject.Request()
req.object_name = object_name
req.search_radius = radius
future = client.call_async(req)
rclpy.spin_until_future_complete(node, future, timeout_sec=5.0)
node.destroy_node()
rclpy.shutdown()
if future.done():
return future.result() # FindObject.Response
else:
print("Service call timed out")
return None
result = call_service_blocking("cup", 2.0)
if result and result.found:
print(f"Found at {result.x}, {result.y}, {result.z}")C++ Service Server (brief)
The C++ API mirrors Python: create_service with a lambda or member function, fill in response fields, return void.
#include "rclcpp/rclcpp.hpp"
#include "my_interfaces/srv/find_object.hpp"
using FindObject = my_interfaces::srv::FindObject;
class FindObjectServer : public rclcpp::Node {
public:
FindObjectServer() : Node("find_object_server_cpp") {
srv_ = this->create_service<FindObject>(
"find_object",
[this](const FindObject::Request::SharedPtr req,
FindObject::Response::SharedPtr res) {
RCLCPP_INFO(this->get_logger(),
"Searching for '%s'", req->object_name.c_str());
if (req->object_name == "cup") {
res->found = true;
res->x = 1.2f;
res->y = 0.3f;
res->z = 0.8f;
res->frame_id = "map";
} else {
res->found = false;
}
// No return needed — response is passed by shared_ptr
});
RCLCPP_INFO(this->get_logger(), "FindObject C++ service ready");
}
private:
rclcpp::Service<FindObject>::SharedPtr srv_;
};
int main(int argc, char **argv) {
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<FindObjectServer>());
rclcpp::shutdown();
return 0;
}⚡ Link with: ament_target_dependencies(find_object_server rclcpp my_interfaces)
CLI Tools for Services
ROS 2 CLI lets you inspect services, see their types, and call them manually — essential for debugging without writing a client.
# List all active services
ros2 service list
# List with types shown
ros2 service list -t
# Get the type of a specific service
ros2 service type /find_object
# → my_interfaces/srv/FindObject
# Show the .srv definition (request/response layout)
ros2 interface show my_interfaces/srv/FindObject
# Call a service from the CLI (YAML syntax for request)
ros2 service call /find_object my_interfaces/srv/FindObject \
"{object_name: cup, search_radius: 2.0, use_cache: true}"
# Find which nodes provide/use a service
ros2 service info /find_object
# Check if a service is available (exits 0 if yes)
ros2 service list | grep find_objectwait_for_service and Retry Pattern
Always call wait_for_service() before the first call_async(). Loop with a timeout for robust startup sequencing.
import time
def robust_wait(client, node, total_secs: float = 30.0, poll_secs: float = 1.0) -> bool:
"""Wait for service up to total_secs, logging every poll_secs."""
deadline = time.monotonic() + total_secs
while time.monotonic() < deadline:
if client.wait_for_service(timeout_sec=poll_secs):
return True
node.get_logger().warn(
f"Service '{client.srv_name}' not yet available — retrying…"
)
node.get_logger().error(
f"Service '{client.srv_name}' unavailable after {total_secs:.0f}s"
)
return False
# Usage
client = node.create_client(FindObject, "find_object")
if robust_wait(client, node, total_secs=30.0):
future = client.call_async(req)
else:
raise RuntimeError("service never came up")⚡ In launch files, use DelayAction or RegisterEventHandler to wait for server nodes to finish configuring before launching clients.
Service vs Topic — When to Use Each
Services are for synchronous request/reply. Topics are for continuous data streams. Using the wrong primitive causes reliability or performance issues.
Use a SERVICE when:
✅ Caller needs a result / confirmation (query, command with ACK)
✅ One-time configuration changes (set PID gains, change mode)
✅ Trigger an action and wait for completion
✅ Small number of callers, infrequent calls
Use a TOPIC when:
✅ Sensor data (LiDAR, camera, IMU) — high frequency, many subscribers
✅ Odometry, transforms — continuous state broadcast
✅ Visualisation data — fire-and-forget
✅ Publisher doesn't care if anyone is listening
Use an ACTION when:
✅ Long-running task (navigation, arm motion) + feedback needed
✅ Caller wants to cancel mid-flight
✅ Progress reporting during execution
Anti-patterns:
❌ Service for 100 Hz sensor data — blocks the executor
❌ Topic for commands that need ACK — no delivery guarantee
❌ Topic + global variable for request/reply — race conditionCLI & API Cheatsheet
| Command / API | Description |
|---|---|
| ros2 service list | List all active services |
| ros2 service list -t | List with type annotations |
| ros2 service type /name | Get service interface type |
| ros2 interface show pkg/srv/Type | Show .srv definition |
| ros2 service call /name pkg/srv/Type '{...}' | Call service from CLI (YAML request body) |
| ros2 service info /name | Show providers and clients |
| client.wait_for_service(timeout_sec=N) | Block until service available |
| client.call_async(req) | Non-blocking async call → Future |
| rclpy.spin_until_future_complete(node, future) | Block until Future completes |
| future.add_done_callback(fn) | Register result callback |