이 가이드는 인류와 AI가 함께 만드는 지식입니다.
이 콘텐츠는 Human + AI Partnership 철학 아래 모든 사람이 로봇·AI를 배울 수 있도록 무료로 제공됩니다. 당신의 질문과 기여가 다음 학생의 미래를 바꿉니다.
ROS 2 Multi-Robot Coordination Guide 2026
Coordinate fleets of robots with ROS 2 using centralized control, distributed consensus, and hierarchical architectures. Learn swarm algorithms, synchronized motion, task allocation, and communication patterns for collaborative robotics.
1. Multi-Robot Architecture Patterns
Choose coordination architecture based on system requirements:
- Centralized: Master node coordinates all robots (low latency, single point of failure)
- Decentralized: Each robot makes decisions independently (robust, requires consensus)
- Hierarchical: Leader-follower with local swarms (scalable, mixed control)
- Consensus-based: Distributed algorithm agrees on state (resilient, requires communication)
2. Centralized Fleet Control
Master node coordinates all robot tasks:
// Fleet controller (C++)
#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/string.hpp>
#include <geometry_msgs/msg/pose.hpp>
class FleetController : public rclcpp::Node {
std::vector<std::string> robot_names_ = {"robot1", "robot2", "robot3"};
std::map<std::string, geometry_msgs::msg::Pose> robot_poses_;
public:
FleetController() : Node("fleet_controller") {
// Subscribe to all robot poses
for (const auto& robot : robot_names_) {
auto sub = create_subscription<geometry_msgs::msg::Pose>(
"/" + robot + "/pose", 10,
[this, robot](const geometry_msgs::msg::Pose::SharedPtr msg) {
robot_poses_[robot] = *msg;
});
pose_subs_.push_back(sub);
}
// Create command publishers for each robot
for (const auto& robot : robot_names_) {
auto pub = create_publisher<geometry_msgs::msg::Twist>(
"/" + robot + "/cmd_vel", 10);
cmd_pubs_[robot] = pub;
}
// Timer: coordinate robots
timer_ = create_wall_timer(
std::chrono::milliseconds(100),
[this]() { coordinate_robots(); });
}
private:
void coordinate_robots() {
// Centralized decision-making
for (const auto& robot : robot_names_) {
// Compute optimal command based on all robot states
auto cmd = compute_formation_command(robot, robot_poses_);
cmd_pubs_[robot]->publish(cmd);
}
}
geometry_msgs::msg::Twist compute_formation_command(
const std::string& robot,
const std::map<std::string, geometry_msgs::msg::Pose>& poses) {
// Formation control logic (e.g., maintain spacing)
geometry_msgs::msg::Twist cmd;
cmd.linear.x = 0.5; // Placeholder
return cmd;
}
std::map<std::string, rclcpp::Publisher<geometry_msgs::msg::Twist>::SharedPtr> cmd_pubs_;
std::vector<rclcpp::Subscription<geometry_msgs::msg::Pose>::SharedPtr> pose_subs_;
rclcpp::TimerBase::SharedPtr timer_;
};3. Decentralized Decision-Making
Each robot broadcasts state and makes independent decisions:
// Decentralized robot controller (C++)
#include <rclcpp/rclcpp.hpp>
#include <custom_msgs/msg/robot_state.hpp>
class AutonomousRobotController : public rclcpp::Node {
custom_msgs::msg::RobotState local_state_;
std::map<std::string, custom_msgs::msg::RobotState> neighbor_states_;
public:
AutonomousRobotController(const std::string& robot_id)
: Node("robot_" + robot_id), robot_id_(robot_id) {
// Broadcast own state
state_pub_ = create_publisher<custom_msgs::msg::RobotState>(
"robot_states", 10);
// Subscribe to neighbor states
state_sub_ = create_subscription<custom_msgs::msg::RobotState>(
"robot_states", 10,
[this](const custom_msgs::msg::RobotState::SharedPtr msg) {
neighbor_states_[msg->robot_id] = *msg;
});
// Local decision-making loop
timer_ = create_wall_timer(
std::chrono::milliseconds(50),
[this]() { make_decision(); });
}
private:
void make_decision() {
// Compute collision avoidance based on neighbor positions
auto cmd = collision_avoidance(local_state_, neighbor_states_);
local_state_.velocity = cmd;
// Broadcast updated state
state_pub_->publish(local_state_);
}
std::string robot_id_;
rclcpp::Publisher<custom_msgs::msg::RobotState>::SharedPtr state_pub_;
rclcpp::Subscription<custom_msgs::msg::RobotState>::SharedPtr state_sub_;
rclcpp::TimerBase::SharedPtr timer_;
};4. Swarm Behavior Algorithms
Implement flocking, swarming, and consensus algorithms:
// Flocking algorithm
class FlockingBehavior {
public:
geometry_msgs::msg::Twist compute(
const geometry_msgs::msg::Pose& own_pose,
const std::vector<geometry_msgs::msg::Pose>& neighbor_poses,
const geometry_msgs::msg::Twist& target_velocity) {
// Three forces: separation, alignment, cohesion
auto separation = compute_separation(own_pose, neighbor_poses);
auto alignment = compute_alignment(neighbor_poses, target_velocity);
auto cohesion = compute_cohesion(own_pose, neighbor_poses);
// Weighted combination
geometry_msgs::msg::Twist result;
result.linear.x = 0.3 * separation.linear.x +
0.3 * alignment.linear.x +
0.4 * cohesion.linear.x;
result.angular.z = 0.5 * separation.angular.z +
0.5 * alignment.angular.z;
return result;
}
private:
geometry_msgs::msg::Twist compute_separation(
const geometry_msgs::msg::Pose& own,
const std::vector<geometry_msgs::msg::Pose>& neighbors) {
geometry_msgs::msg::Twist result;
for (const auto& neighbor : neighbors) {
double dx = neighbor.position.x - own.position.x;
double dy = neighbor.position.y - own.position.y;
double dist_sq = dx*dx + dy*dy;
if (dist_sq < 1.0) { // Minimum distance
result.linear.x -= dx / (dist_sq + 0.1);
result.linear.y -= dy / (dist_sq + 0.1);
}
}
return result;
}
geometry_msgs::msg::Twist compute_alignment(
const std::vector<geometry_msgs::msg::Pose>& neighbors,
const geometry_msgs::msg::Twist& target) {
geometry_msgs::msg::Twist avg;
for (const auto& _ : neighbors) {
avg.linear.x += target.linear.x;
}
if (!neighbors.empty()) {
avg.linear.x /= neighbors.size();
}
return avg;
}
geometry_msgs::msg::Twist compute_cohesion(
const geometry_msgs::msg::Pose& own,
const std::vector<geometry_msgs::msg::Pose>& neighbors) {
geometry_msgs::msg::Twist center;
for (const auto& neighbor : neighbors) {
center.linear.x += neighbor.position.x;
center.linear.y += neighbor.position.y;
}
if (!neighbors.empty()) {
center.linear.x /= neighbors.size();
center.linear.y /= neighbors.size();
center.linear.x -= own.position.x;
center.linear.y -= own.position.y;
}
return center;
}
};5. Task Allocation & Load Balancing
Distribute tasks across multiple robots:
// Task allocator
class TaskAllocator : public rclcpp::Node {
public:
TaskAllocator() : Node("task_allocator") {
task_request_sub_ = create_subscription<custom_msgs::msg::Task>(
"task_requests", 10,
[this](const custom_msgs::msg::Task::SharedPtr task) {
allocate_task(*task);
});
robot_status_sub_ = create_subscription<custom_msgs::msg::RobotStatus>(
"robot_status", 10,
[this](const custom_msgs::msg::RobotStatus::SharedPtr status) {
robot_workload_[status->robot_id] = status->current_load;
});
task_assignment_pub_ = create_publisher<custom_msgs::msg::TaskAssignment>(
"task_assignments", 10);
}
private:
void allocate_task(const custom_msgs::msg::Task& task) {
// Find least-loaded robot
std::string best_robot = "";
double min_load = std::numeric_limits<double>::max();
for (const auto& [robot_id, load] : robot_workload_) {
if (load < min_load) {
min_load = load;
best_robot = robot_id;
}
}
if (!best_robot.empty()) {
custom_msgs::msg::TaskAssignment assignment;
assignment.task_id = task.task_id;
assignment.robot_id = best_robot;
task_assignment_pub_->publish(assignment);
}
}
std::map<std::string, double> robot_workload_;
rclcpp::Publisher<custom_msgs::msg::TaskAssignment>::SharedPtr task_assignment_pub_;
rclcpp::Subscription<custom_msgs::msg::Task>::SharedPtr task_request_sub_;
rclcpp::Subscription<custom_msgs::msg::RobotStatus>::SharedPtr robot_status_sub_;
};6. Synchronized Motion Control
Synchronize trajectories across multiple robots:
# Multi-robot motion synchronization (Python)
import rclpy
from geometry_msgs.msg import Twist
from nav_msgs.msg import Path
import numpy as np
class SynchronizedMotionController:
def __init__(self, robot_ids):
self.robot_ids = robot_ids
self.robot_paths = {rid: None for rid in robot_ids}
self.robot_progress = {rid: 0.0 for rid in robot_ids}
def compute_synchronized_velocities(self):
"""Adjust velocities to keep robots synchronized"""
avg_progress = np.mean(list(self.robot_progress.values()))
velocities = {}
for rid in self.robot_ids:
# Speed up if behind, slow down if ahead
progress_error = avg_progress - self.robot_progress[rid]
scale_factor = 1.0 + 0.1 * progress_error
velocities[rid] = self._base_velocity * scale_factor
return velocities7. Distributed State & Consensus
Use consensus algorithms for distributed decision-making:
# Consensus algorithm (Average Consensus)
class AverageConsensusNode:
def __init__(self, robot_id, initial_value, neighbors):
self.robot_id = robot_id
self.value = initial_value
self.neighbors = neighbors
def consensus_step(self, neighbor_values):
"""Single consensus iteration (gossip algorithm)"""
# Average own value with neighbors
self.value = (self.value + sum(neighbor_values)) / (len(neighbor_values) + 1)
def broadcast_state(self):
"""Send current value to neighbors"""
return self.value
# After K iterations, all nodes converge to the average of initial values
# Robust to communication delays and node failures8. Communication Patterns
Reliable inter-robot messaging:
- Broadcast: Publish to all robots (map, state, goals)
- Point-to-point: Service calls for task requests
- Gossip: Peer-to-peer state propagation (consensus)
- Leader election: Raft or Bully algorithm for coordinator selection
9. Collision Avoidance
Multi-robot collision avoidance using velocity obstacles:
# Velocity Obstacle (VO) for collision avoidance
class VelocityObstacle:
def __init__(self, robot_pos, robot_vel, robot_radius):
self.robot_pos = robot_pos
self.robot_vel = robot_vel
self.radius = robot_radius
def compute_safe_velocity(self, obstacle_pos, obstacle_vel, obstacle_radius):
"""Compute velocity to avoid collision with moving obstacle"""
# Relative position and velocity
rel_pos = obstacle_pos - self.robot_pos
rel_vel = obstacle_vel - self.robot_vel
# Compute velocity obstacle cone
theta = np.arcsin((self.radius + obstacle_radius) / np.linalg.norm(rel_pos))
# Find safe velocity outside the cone
# (simplified: choose perpendicular velocity if collision imminent)
if np.dot(rel_vel, rel_pos) < 0: # Approaching
return self._compute_avoidance_velocity(rel_pos)
return self.robot_vel10. Monitoring & Diagnostics
Monitor multi-robot system health:
# Multi-robot diagnostics
ros2 topic list | grep robot # All robot topics
ros2 topic hz /robot1/pose /robot2/pose # Synchronization check
# Network latency between robots
ros2 run tf2_tools view_frames # Visualize TF tree (all robots)
# Swarm metrics
- Average inter-robot distance
- Formation cohesion
- Task completion rate
- Message throughputKey Takeaways
Choose centralized control for tightly coordinated tasks (formation flying) or decentralized for robustness and scalability (swarm exploration). Combine techniques: hierarchical control for large fleets, consensus for robustness, swarm algorithms for emergence. Test extensively in simulation before deploying on physical robots.