Skip to main content
🔗 ServicesROS 2 · June 2026

ROS 2 Service & Client Guide 2026

Define custom .srv files, write Python service servers and async clients with call_async, implement C++ servers, use wait_for_service, and test with CLI tools.

How ROS 2 Services Work

A service is a synchronous request/reply channel. A server node registers a service name and waits for requests. A client node sends a request and waits for the response. Unlike topics, services are one-to-one: one client talks to one server per call. Under the hood they use DDS request/reply patterns with guaranteed delivery.

1

Define a Custom Service (.srv file)

A .srv file has a Request section and a Response section separated by ---. Place it in a srv/ subdirectory of your package.

text
# srv/FindObject.srv
# ─── Request ───────────────────────────────────────────────
string object_name          # name of the object to search
float32 search_radius       # search radius in meters
bool    use_cache            # check cache before scanning

---

# ─── Response ──────────────────────────────────────────────
bool    found
float32 x
float32 y
float32 z
string  frame_id            # coordinate frame where object was found

Types available: bool, int8/16/32/64, uint8/16/32/64, float32/64, string, byte, char, and any built-in message type (geometry_msgs/Point, etc.)

2

Register the .srv in CMakeLists.txt

For C++ packages, use rosidl_generate_interfaces to compile the .srv into Python and C++ bindings. For Python-only packages, you still need ament_cmake.

cmake
find_package(rosidl_default_generators REQUIRED)
find_package(geometry_msgs REQUIRED)   # if your .srv uses geometry_msgs types

rosidl_generate_interfaces(
  ${PROJECT_NAME}
  "srv/FindObject.srv"
  DEPENDENCIES geometry_msgs           # list all packages whose types you import
)

ament_export_dependencies(rosidl_default_runtime)

Add rosidl_default_generators to <build_depend> and rosidl_default_runtime to <exec_depend> in package.xml.

3

Python Service Server

Subclass Node, call create_service, and implement the callback. The callback receives (request, response) and must return the response.

python
import rclpy
from rclpy.node import Node
from my_interfaces.srv import FindObject   # your generated service type


class FindObjectServer(Node):
    def __init__(self):
        super().__init__("find_object_server")

        # create_service(ServiceType, service_name, callback)
        self._srv = self.create_service(
            FindObject,
            "find_object",
            self._handle_request,
        )
        self.get_logger().info("FindObject service ready")

    def _handle_request(
        self,
        request: FindObject.Request,
        response: FindObject.Response,
    ) -> FindObject.Response:
        self.get_logger().info(
            f"Searching for '{request.object_name}' "
            f"within {request.search_radius:.1f}m"
        )

        # --- your real search logic here ---
        if request.object_name == "cup":
            response.found   = True
            response.x       = 1.2
            response.y       = 0.3
            response.z       = 0.8
            response.frame_id = "map"
        else:
            response.found = False

        return response          # MUST return the response object


def main(args=None):
    rclpy.init(args=args)
    node = FindObjectServer()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()


if __name__ == "__main__":
    main()
4

Python Async Service Client (call_async)

The recommended pattern: call_async() returns a Future. Add a done callback or use rclpy.spin_until_future_complete() to wait for the result.

python
import sys
import rclpy
from rclpy.node import Node
from my_interfaces.srv import FindObject


class FindObjectClient(Node):
    def __init__(self):
        super().__init__("find_object_client")

        # Create a client for the FindObject service
        self._client = self.create_client(FindObject, "find_object")

        # Block until the service is available (with timeout)
        if not self._client.wait_for_service(timeout_sec=5.0):
            self.get_logger().error("Service 'find_object' not available — is the server running?")
            raise RuntimeError("service unavailable")

    def send_request(self, object_name: str, radius: float) -> None:
        req = FindObject.Request()
        req.object_name   = object_name
        req.search_radius = radius
        req.use_cache     = True

        # Non-blocking call — returns a Future immediately
        future = self._client.call_async(req)
        # Register callback to be called when the future completes
        future.add_done_callback(self._response_callback)

    def _response_callback(self, future) -> None:
        try:
            result = future.result()
            if result.found:
                self.get_logger().info(
                    f"Found at ({result.x:.2f}, {result.y:.2f}, {result.z:.2f}) "
                    f"in '{result.frame_id}'"
                )
            else:
                self.get_logger().info("Object not found")
        except Exception as e:
            self.get_logger().error(f"Service call failed: {e}")


def main(args=None):
    rclpy.init(args=args)
    client = FindObjectClient()
    client.send_request("cup", 2.0)
    rclpy.spin(client)          # processes the callback when future completes
    client.destroy_node()
    rclpy.shutdown()


if __name__ == "__main__":
    main()

For one-shot scripts (not spinning nodes), use rclpy.spin_until_future_complete(node, future) instead of add_done_callback + spin.

5

Synchronous-style Call with spin_until_future_complete

For scripts that need a blocking call without a persistent executor, spin_until_future_complete blocks until the Future is done.

python
def call_service_blocking(object_name: str, radius: float):
    rclpy.init()
    node = rclpy.create_node("temp_client")
    client = node.create_client(FindObject, "find_object")

    # Wait up to 3 seconds for the server
    if not client.wait_for_service(timeout_sec=3.0):
        node.get_logger().error("Service not available")
        node.destroy_node()
        rclpy.shutdown()
        return None

    req = FindObject.Request()
    req.object_name   = object_name
    req.search_radius = radius

    future = client.call_async(req)
    rclpy.spin_until_future_complete(node, future, timeout_sec=5.0)

    node.destroy_node()
    rclpy.shutdown()

    if future.done():
        return future.result()    # FindObject.Response
    else:
        print("Service call timed out")
        return None


result = call_service_blocking("cup", 2.0)
if result and result.found:
    print(f"Found at {result.x}, {result.y}, {result.z}")
6

C++ Service Server (brief)

The C++ API mirrors Python: create_service with a lambda or member function, fill in response fields, return void.

cpp
#include "rclcpp/rclcpp.hpp"
#include "my_interfaces/srv/find_object.hpp"

using FindObject = my_interfaces::srv::FindObject;

class FindObjectServer : public rclcpp::Node {
public:
  FindObjectServer() : Node("find_object_server_cpp") {
    srv_ = this->create_service<FindObject>(
      "find_object",
      [this](const FindObject::Request::SharedPtr req,
             FindObject::Response::SharedPtr res) {
        RCLCPP_INFO(this->get_logger(),
                    "Searching for '%s'", req->object_name.c_str());
        if (req->object_name == "cup") {
          res->found    = true;
          res->x        = 1.2f;
          res->y        = 0.3f;
          res->z        = 0.8f;
          res->frame_id = "map";
        } else {
          res->found = false;
        }
        // No return needed — response is passed by shared_ptr
      });
    RCLCPP_INFO(this->get_logger(), "FindObject C++ service ready");
  }

private:
  rclcpp::Service<FindObject>::SharedPtr srv_;
};

int main(int argc, char **argv) {
  rclcpp::init(argc, argv);
  rclcpp::spin(std::make_shared<FindObjectServer>());
  rclcpp::shutdown();
  return 0;
}

Link with: ament_target_dependencies(find_object_server rclcpp my_interfaces)

7

CLI Tools for Services

ROS 2 CLI lets you inspect services, see their types, and call them manually — essential for debugging without writing a client.

bash
# List all active services
ros2 service list

# List with types shown
ros2 service list -t

# Get the type of a specific service
ros2 service type /find_object
# → my_interfaces/srv/FindObject

# Show the .srv definition (request/response layout)
ros2 interface show my_interfaces/srv/FindObject

# Call a service from the CLI (YAML syntax for request)
ros2 service call /find_object my_interfaces/srv/FindObject \
  "{object_name: cup, search_radius: 2.0, use_cache: true}"

# Find which nodes provide/use a service
ros2 service info /find_object

# Check if a service is available (exits 0 if yes)
ros2 service list | grep find_object
8

wait_for_service and Retry Pattern

Always call wait_for_service() before the first call_async(). Loop with a timeout for robust startup sequencing.

python
import time

def robust_wait(client, node, total_secs: float = 30.0, poll_secs: float = 1.0) -> bool:
    """Wait for service up to total_secs, logging every poll_secs."""
    deadline = time.monotonic() + total_secs
    while time.monotonic() < deadline:
        if client.wait_for_service(timeout_sec=poll_secs):
            return True
        node.get_logger().warn(
            f"Service '{client.srv_name}' not yet available — retrying…"
        )
    node.get_logger().error(
        f"Service '{client.srv_name}' unavailable after {total_secs:.0f}s"
    )
    return False

# Usage
client = node.create_client(FindObject, "find_object")
if robust_wait(client, node, total_secs=30.0):
    future = client.call_async(req)
else:
    raise RuntimeError("service never came up")

In launch files, use DelayAction or RegisterEventHandler to wait for server nodes to finish configuring before launching clients.

9

Service vs Topic — When to Use Each

Services are for synchronous request/reply. Topics are for continuous data streams. Using the wrong primitive causes reliability or performance issues.

text
Use a SERVICE when:
  ✅ Caller needs a result / confirmation (query, command with ACK)
  ✅ One-time configuration changes (set PID gains, change mode)
  ✅ Trigger an action and wait for completion
  ✅ Small number of callers, infrequent calls

Use a TOPIC when:
  ✅ Sensor data (LiDAR, camera, IMU) — high frequency, many subscribers
  ✅ Odometry, transforms — continuous state broadcast
  ✅ Visualisation data — fire-and-forget
  ✅ Publisher doesn't care if anyone is listening

Use an ACTION when:
  ✅ Long-running task (navigation, arm motion) + feedback needed
  ✅ Caller wants to cancel mid-flight
  ✅ Progress reporting during execution

Anti-patterns:
  ❌ Service for 100 Hz sensor data — blocks the executor
  ❌ Topic for commands that need ACK — no delivery guarantee
  ❌ Topic + global variable for request/reply — race condition

CLI & API Cheatsheet

Command / APIDescription
ros2 service listList all active services
ros2 service list -tList with type annotations
ros2 service type /nameGet service interface type
ros2 interface show pkg/srv/TypeShow .srv definition
ros2 service call /name pkg/srv/Type '{...}'Call service from CLI (YAML request body)
ros2 service info /nameShow providers and clients
client.wait_for_service(timeout_sec=N)Block until service available
client.call_async(req)Non-blocking async call → Future
rclpy.spin_until_future_complete(node, future)Block until Future completes
future.add_done_callback(fn)Register result callback