Skip to main content
🤝

이 가이드는 인류와 AI가 함께 만드는 지식입니다.

이 콘텐츠는 Human + AI Partnership 철학 아래 모든 사람이 로봇·AI를 배울 수 있도록 무료로 제공됩니다. 당신의 질문과 기여가 다음 학생의 미래를 바꿉니다.

ROS 2 Multi-Robot Coordination Guide 2026

Coordinate fleets of robots with ROS 2 using centralized control, distributed consensus, and hierarchical architectures. Learn swarm algorithms, synchronized motion, task allocation, and communication patterns for collaborative robotics.

1. Multi-Robot Architecture Patterns

Choose coordination architecture based on system requirements:

2. Centralized Fleet Control

Master node coordinates all robot tasks:

// Fleet controller (C++)
#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/string.hpp>
#include <geometry_msgs/msg/pose.hpp>

class FleetController : public rclcpp::Node {
  std::vector<std::string> robot_names_ = {"robot1", "robot2", "robot3"};
  std::map<std::string, geometry_msgs::msg::Pose> robot_poses_;

 public:
  FleetController() : Node("fleet_controller") {
    // Subscribe to all robot poses
    for (const auto& robot : robot_names_) {
      auto sub = create_subscription<geometry_msgs::msg::Pose>(
        "/" + robot + "/pose", 10,
        [this, robot](const geometry_msgs::msg::Pose::SharedPtr msg) {
          robot_poses_[robot] = *msg;
        });
      pose_subs_.push_back(sub);
    }

    // Create command publishers for each robot
    for (const auto& robot : robot_names_) {
      auto pub = create_publisher<geometry_msgs::msg::Twist>(
        "/" + robot + "/cmd_vel", 10);
      cmd_pubs_[robot] = pub;
    }

    // Timer: coordinate robots
    timer_ = create_wall_timer(
      std::chrono::milliseconds(100),
      [this]() { coordinate_robots(); });
  }

 private:
  void coordinate_robots() {
    // Centralized decision-making
    for (const auto& robot : robot_names_) {
      // Compute optimal command based on all robot states
      auto cmd = compute_formation_command(robot, robot_poses_);
      cmd_pubs_[robot]->publish(cmd);
    }
  }

  geometry_msgs::msg::Twist compute_formation_command(
    const std::string& robot,
    const std::map<std::string, geometry_msgs::msg::Pose>& poses) {
    // Formation control logic (e.g., maintain spacing)
    geometry_msgs::msg::Twist cmd;
    cmd.linear.x = 0.5;  // Placeholder
    return cmd;
  }

  std::map<std::string, rclcpp::Publisher<geometry_msgs::msg::Twist>::SharedPtr> cmd_pubs_;
  std::vector<rclcpp::Subscription<geometry_msgs::msg::Pose>::SharedPtr> pose_subs_;
  rclcpp::TimerBase::SharedPtr timer_;
};

3. Decentralized Decision-Making

Each robot broadcasts state and makes independent decisions:

// Decentralized robot controller (C++)
#include <rclcpp/rclcpp.hpp>
#include <custom_msgs/msg/robot_state.hpp>

class AutonomousRobotController : public rclcpp::Node {
  custom_msgs::msg::RobotState local_state_;
  std::map<std::string, custom_msgs::msg::RobotState> neighbor_states_;

 public:
  AutonomousRobotController(const std::string& robot_id)
    : Node("robot_" + robot_id), robot_id_(robot_id) {

    // Broadcast own state
    state_pub_ = create_publisher<custom_msgs::msg::RobotState>(
      "robot_states", 10);

    // Subscribe to neighbor states
    state_sub_ = create_subscription<custom_msgs::msg::RobotState>(
      "robot_states", 10,
      [this](const custom_msgs::msg::RobotState::SharedPtr msg) {
        neighbor_states_[msg->robot_id] = *msg;
      });

    // Local decision-making loop
    timer_ = create_wall_timer(
      std::chrono::milliseconds(50),
      [this]() { make_decision(); });
  }

 private:
  void make_decision() {
    // Compute collision avoidance based on neighbor positions
    auto cmd = collision_avoidance(local_state_, neighbor_states_);
    local_state_.velocity = cmd;

    // Broadcast updated state
    state_pub_->publish(local_state_);
  }

  std::string robot_id_;
  rclcpp::Publisher<custom_msgs::msg::RobotState>::SharedPtr state_pub_;
  rclcpp::Subscription<custom_msgs::msg::RobotState>::SharedPtr state_sub_;
  rclcpp::TimerBase::SharedPtr timer_;
};

4. Swarm Behavior Algorithms

Implement flocking, swarming, and consensus algorithms:

// Flocking algorithm
class FlockingBehavior {
 public:
  geometry_msgs::msg::Twist compute(
    const geometry_msgs::msg::Pose& own_pose,
    const std::vector<geometry_msgs::msg::Pose>& neighbor_poses,
    const geometry_msgs::msg::Twist& target_velocity) {

    // Three forces: separation, alignment, cohesion
    auto separation = compute_separation(own_pose, neighbor_poses);
    auto alignment = compute_alignment(neighbor_poses, target_velocity);
    auto cohesion = compute_cohesion(own_pose, neighbor_poses);

    // Weighted combination
    geometry_msgs::msg::Twist result;
    result.linear.x = 0.3 * separation.linear.x +
                      0.3 * alignment.linear.x +
                      0.4 * cohesion.linear.x;
    result.angular.z = 0.5 * separation.angular.z +
                       0.5 * alignment.angular.z;
    return result;
  }

 private:
  geometry_msgs::msg::Twist compute_separation(
    const geometry_msgs::msg::Pose& own,
    const std::vector<geometry_msgs::msg::Pose>& neighbors) {
    geometry_msgs::msg::Twist result;
    for (const auto& neighbor : neighbors) {
      double dx = neighbor.position.x - own.position.x;
      double dy = neighbor.position.y - own.position.y;
      double dist_sq = dx*dx + dy*dy;
      if (dist_sq < 1.0) {  // Minimum distance
        result.linear.x -= dx / (dist_sq + 0.1);
        result.linear.y -= dy / (dist_sq + 0.1);
      }
    }
    return result;
  }

  geometry_msgs::msg::Twist compute_alignment(
    const std::vector<geometry_msgs::msg::Pose>& neighbors,
    const geometry_msgs::msg::Twist& target) {
    geometry_msgs::msg::Twist avg;
    for (const auto& _ : neighbors) {
      avg.linear.x += target.linear.x;
    }
    if (!neighbors.empty()) {
      avg.linear.x /= neighbors.size();
    }
    return avg;
  }

  geometry_msgs::msg::Twist compute_cohesion(
    const geometry_msgs::msg::Pose& own,
    const std::vector<geometry_msgs::msg::Pose>& neighbors) {
    geometry_msgs::msg::Twist center;
    for (const auto& neighbor : neighbors) {
      center.linear.x += neighbor.position.x;
      center.linear.y += neighbor.position.y;
    }
    if (!neighbors.empty()) {
      center.linear.x /= neighbors.size();
      center.linear.y /= neighbors.size();
      center.linear.x -= own.position.x;
      center.linear.y -= own.position.y;
    }
    return center;
  }
};

5. Task Allocation & Load Balancing

Distribute tasks across multiple robots:

// Task allocator
class TaskAllocator : public rclcpp::Node {
 public:
  TaskAllocator() : Node("task_allocator") {
    task_request_sub_ = create_subscription<custom_msgs::msg::Task>(
      "task_requests", 10,
      [this](const custom_msgs::msg::Task::SharedPtr task) {
        allocate_task(*task);
      });

    robot_status_sub_ = create_subscription<custom_msgs::msg::RobotStatus>(
      "robot_status", 10,
      [this](const custom_msgs::msg::RobotStatus::SharedPtr status) {
        robot_workload_[status->robot_id] = status->current_load;
      });

    task_assignment_pub_ = create_publisher<custom_msgs::msg::TaskAssignment>(
      "task_assignments", 10);
  }

 private:
  void allocate_task(const custom_msgs::msg::Task& task) {
    // Find least-loaded robot
    std::string best_robot = "";
    double min_load = std::numeric_limits<double>::max();

    for (const auto& [robot_id, load] : robot_workload_) {
      if (load < min_load) {
        min_load = load;
        best_robot = robot_id;
      }
    }

    if (!best_robot.empty()) {
      custom_msgs::msg::TaskAssignment assignment;
      assignment.task_id = task.task_id;
      assignment.robot_id = best_robot;
      task_assignment_pub_->publish(assignment);
    }
  }

  std::map<std::string, double> robot_workload_;
  rclcpp::Publisher<custom_msgs::msg::TaskAssignment>::SharedPtr task_assignment_pub_;
  rclcpp::Subscription<custom_msgs::msg::Task>::SharedPtr task_request_sub_;
  rclcpp::Subscription<custom_msgs::msg::RobotStatus>::SharedPtr robot_status_sub_;
};

6. Synchronized Motion Control

Synchronize trajectories across multiple robots:

# Multi-robot motion synchronization (Python)
import rclpy
from geometry_msgs.msg import Twist
from nav_msgs.msg import Path
import numpy as np

class SynchronizedMotionController:
    def __init__(self, robot_ids):
        self.robot_ids = robot_ids
        self.robot_paths = {rid: None for rid in robot_ids}
        self.robot_progress = {rid: 0.0 for rid in robot_ids}

    def compute_synchronized_velocities(self):
        """Adjust velocities to keep robots synchronized"""
        avg_progress = np.mean(list(self.robot_progress.values()))

        velocities = {}
        for rid in self.robot_ids:
            # Speed up if behind, slow down if ahead
            progress_error = avg_progress - self.robot_progress[rid]
            scale_factor = 1.0 + 0.1 * progress_error
            velocities[rid] = self._base_velocity * scale_factor

        return velocities

7. Distributed State & Consensus

Use consensus algorithms for distributed decision-making:

# Consensus algorithm (Average Consensus)
class AverageConsensusNode:
    def __init__(self, robot_id, initial_value, neighbors):
        self.robot_id = robot_id
        self.value = initial_value
        self.neighbors = neighbors

    def consensus_step(self, neighbor_values):
        """Single consensus iteration (gossip algorithm)"""
        # Average own value with neighbors
        self.value = (self.value + sum(neighbor_values)) / (len(neighbor_values) + 1)

    def broadcast_state(self):
        """Send current value to neighbors"""
        return self.value

# After K iterations, all nodes converge to the average of initial values
# Robust to communication delays and node failures

8. Communication Patterns

Reliable inter-robot messaging:

9. Collision Avoidance

Multi-robot collision avoidance using velocity obstacles:

# Velocity Obstacle (VO) for collision avoidance
class VelocityObstacle:
    def __init__(self, robot_pos, robot_vel, robot_radius):
        self.robot_pos = robot_pos
        self.robot_vel = robot_vel
        self.radius = robot_radius

    def compute_safe_velocity(self, obstacle_pos, obstacle_vel, obstacle_radius):
        """Compute velocity to avoid collision with moving obstacle"""
        # Relative position and velocity
        rel_pos = obstacle_pos - self.robot_pos
        rel_vel = obstacle_vel - self.robot_vel

        # Compute velocity obstacle cone
        theta = np.arcsin((self.radius + obstacle_radius) / np.linalg.norm(rel_pos))

        # Find safe velocity outside the cone
        # (simplified: choose perpendicular velocity if collision imminent)
        if np.dot(rel_vel, rel_pos) < 0:  # Approaching
            return self._compute_avoidance_velocity(rel_pos)
        return self.robot_vel

10. Monitoring & Diagnostics

Monitor multi-robot system health:

# Multi-robot diagnostics
ros2 topic list | grep robot  # All robot topics
ros2 topic hz /robot1/pose /robot2/pose  # Synchronization check

# Network latency between robots
ros2 run tf2_tools view_frames  # Visualize TF tree (all robots)

# Swarm metrics
- Average inter-robot distance
- Formation cohesion
- Task completion rate
- Message throughput

Key Takeaways

Choose centralized control for tightly coordinated tasks (formation flying) or decentralized for robustness and scalability (swarm exploration). Combine techniques: hierarchical control for large fleets, consensus for robustness, swarm algorithms for emergence. Test extensively in simulation before deploying on physical robots.