Skip to main content
🤝

이 가이드는 인류와 AI가 함께 만드는 지식입니다.

이 콘텐츠는 Human + AI Partnership 철학 아래 모든 사람이 로봇·AI를 배울 수 있도록 무료로 제공됩니다. 당신의 질문과 기여가 다음 학생의 미래를 바꿉니다.

ROS 2 군집 로봇 가이드 2026

ROS 2로 분산 제어 군집 로봇 개발. 100~1000개 에이전트 조율, 협력 알고리즘, 통신 토폴로지, 장애 복원력.

1. 군집 로봇 아키텍처

분산 멀티-에이전트 시스템:

2. 분산 제어 알고리즘

개별 에이전트 제어 규칙:

class SwarmRobot : public rclcpp::Node {
 public:
  struct AgentState {
    int id;
    geometry_msgs::msg::Point position;
    geometry_msgs::msg::Twist velocity;
    std::vector<int> neighbors;  // 통신 범위 내 이웃
  };

  SwarmRobot(int robot_id) : Node("robot_" + std::to_string(robot_id)),
                              agent_id_(robot_id) {
    // 이웃 감지 (근거리 무선)
    neighbor_sub_ = create_subscription<std_msgs::msg::String>(
      "broadcast/neighbor_list", 10,
      [this](const std_msgs::msg::String::SharedPtr msg) {
        parse_neighbor_list(*msg);
      });

    // 속도 제어 명령
    vel_pub_ = create_publisher<geometry_msgs::msg::Twist>(
      "cmd_vel", 10);

    // 이웃과 통신 (메시지 전파)
    msg_pub_ = create_publisher<std_msgs::msg::String>(
      "broadcast/messages", 10);

    // 제어 루프
    control_timer_ = create_wall_timer(
      std::chrono::milliseconds(100),
      [this]() { execute_control_rule(); });
  }

 private:
  void execute_control_rule() {
    // 세 가지 기본 규칙 (Boid model)
    auto separation = compute_separation_force();  // 충돌 회피
    auto alignment = compute_alignment_force();    // 방향 정렬
    auto cohesion = compute_cohesion_force();      // 군집 유지

    // 가중치 조합
    double w_sep = 1.5, w_ali = 1.0, w_coh = 1.0;
    geometry_msgs::msg::Twist cmd;

    cmd.linear.x = (separation.x + alignment.x + cohesion.x) / 3.0;
    cmd.linear.y = (separation.y + alignment.y + cohesion.y) / 3.0;

    // 최대 속도 제한
    double max_speed = 0.5;  // m/s
    double speed = std::sqrt(cmd.linear.x*cmd.linear.x +
                             cmd.linear.y*cmd.linear.y);
    if (speed > max_speed) {
      cmd.linear.x *= max_speed / speed;
      cmd.linear.y *= max_speed / speed;
    }

    vel_pub_->publish(cmd);
  }

  geometry_msgs::msg::Vector3 compute_separation_force() {
    // 이웃과의 거리 > threshold면 반발력
    geometry_msgs::msg::Vector3 force;
    double separation_distance = 0.5;  // 50cm

    for (int neighbor_id : state_.neighbors) {
      auto neighbor_pos = get_neighbor_position(neighbor_id);
      double dx = state_.position.x - neighbor_pos.x;
      double dy = state_.position.y - neighbor_pos.y;
      double dist = std::sqrt(dx*dx + dy*dy);

      if (dist < separation_distance && dist > 0.01) {
        force.x += dx / dist;
        force.y += dy / dist;
      }
    }
    return force;
  }

  geometry_msgs::msg::Vector3 compute_alignment_force() {
    // 이웃의 평균 방향으로 정렬
    geometry_msgs::msg::Vector3 force;
    if (state_.neighbors.empty()) return force;

    for (int neighbor_id : state_.neighbors) {
      auto neighbor_vel = get_neighbor_velocity(neighbor_id);
      force.x += neighbor_vel.linear.x;
      force.y += neighbor_vel.linear.y;
    }

    force.x /= state_.neighbors.size();
    force.y /= state_.neighbors.size();
    return force;
  }

  geometry_msgs::msg::Vector3 compute_cohesion_force() {
    // 이웃의 중심으로 이동
    geometry_msgs::msg::Vector3 force;
    if (state_.neighbors.empty()) return force;

    double cx = 0, cy = 0;
    for (int neighbor_id : state_.neighbors) {
      auto neighbor_pos = get_neighbor_position(neighbor_id);
      cx += neighbor_pos.x;
      cy += neighbor_pos.y;
    }

    cx /= state_.neighbors.size();
    cy /= state_.neighbors.size();

    force.x = cx - state_.position.x;
    force.y = cy - state_.position.y;
    return force;
  }

  geometry_msgs::msg::Point get_neighbor_position(int neighbor_id) {
    // 메시지 버퍼에서 이웃 위치 조회
    return geometry_msgs::msg::Point();
  }

  geometry_msgs::msg::Twist get_neighbor_velocity(int neighbor_id) {
    return geometry_msgs::msg::Twist();
  }

  void parse_neighbor_list(const std_msgs::msg::String& msg) {
    // 근거리 무선에서 수신한 이웃 목록 파싱
    // 형식: "neighbors: 2,5,8"
  }

  int agent_id_;
  AgentState state_;
  rclcpp::Subscription<std_msgs::msg::String>::SharedPtr neighbor_sub_;
  rclcpp::Publisher<geometry_msgs::msg::Twist>::SharedPtr vel_pub_;
  rclcpp::Publisher<std_msgs::msg::String>::SharedPtr msg_pub_;
  rclcpp::TimerBase::SharedPtr control_timer_;
};

3. 통신 토폴로지

군집 통신 네트워크:

class SwarmCommunicationNetwork : public rclcpp::Node {
 public:
  enum TopologyType {
    BROADCAST,     // 1-to-N (중앙 집중)
    MESH,          // 모든 쌍 연결 (작은 규모용)
    GOSSIP,        // 무작위 이웃 통신 (스케일링)
    HIERARCHICAL   // 계층 구조 (100+ 로봇)
  };

  void setup_gossip_protocol(int swarm_size) {
    // Gossip 알고리즘: O(log N) 수렴
    // 각 로봇이 무작위 이웃과 정보 교환

    // 1. 이웃 선택
    int neighbor_count = std::max(3, (int)std::log2(swarm_size));

    timer_ = create_wall_timer(
      std::chrono::milliseconds(500),
      [this, neighbor_count]() {
        // 무작위로 neighbor_count개 로봇 선택
        auto random_neighbors = select_random_neighbors(neighbor_count);

        // 상태 정보 전송
        for (int neighbor : random_neighbors) {
          send_state_to_neighbor(neighbor);
        }

        // 수신된 상태 병합
        merge_received_states();
      });
  }

  void setup_hierarchical_topology(int swarm_size) {
    // 계층 구조: 1000+ 로봇용
    // 레벨 1: 개별 로봇 (100개 그룹)
    // 레벨 2: 그룹 리더 (10개)
    // 레벨 3: 스웜 리더 (1개)

    int level1_size = 100;
    int level2_size = swarm_size / level1_size;
    int level3_size = 1;

    RCLCPP_INFO(get_logger(),
      "Hierarchical topology: L1=%d, L2=%d, L3=%d",
      level1_size, level2_size, level3_size);

    // 각 레벨에서 집계 및 결정
    // 하향식 명령 전파
  }

 private:
  std::vector<int> select_random_neighbors(int count) {
    std::vector<int> neighbors;
    // 무작위 이웃 선택
    return neighbors;
  }

  void send_state_to_neighbor(int neighbor_id) {
    // 상태 메시지 전송
  }

  void merge_received_states() {
    // 수신한 상태와 병합
  }

  rclcpp::TimerBase::SharedPtr timer_;
};

4. 장애 복원력 (Fault Tolerance)

로봇 장애 대응:

class FaultTolerantSwarm : public rclcpp::Node {
 public:
  void detect_and_recover_failures() {
    // 1. 이웃이 응답 없음 감지 (heartbeat timeout)
    // 2. 동적으로 이웃 목록 재구성
    // 3. 작업 재분배

    heartbeat_timer_ = create_wall_timer(
      std::chrono::milliseconds(500),
      [this]() {
        check_neighbor_health();
      });
  }

 private:
  void check_neighbor_health() {
    auto now = this->now();

    for (auto it = neighbor_timestamps_.begin();
         it != neighbor_timestamps_.end(); ) {
      int neighbor_id = it->first;
      rclcpp::Time last_seen = it->second;

      if ((now - last_seen).seconds() > 2.0) {  // 2초 타임아웃
        RCLCPP_WARN(get_logger(), "Neighbor %d failed", neighbor_id);

        // 이웃 제거 및 작업 재분배
        remove_neighbor(neighbor_id);
        redistribute_tasks();

        it = neighbor_timestamps_.erase(it);
      } else {
        ++it;
      }
    }
  }

  void remove_neighbor(int neighbor_id) {
    // 네트워크에서 제거
  }

  void redistribute_tasks() {
    // 손실된 로봇의 작업을 다른 로봇에 할당
  }

  std::map<int, rclcpp::Time> neighbor_timestamps_;
  rclcpp::TimerBase::SharedPtr heartbeat_timer_;
};

5. 군집 작업 시나리오

실제 응용:

6. 성능 지표 (KPI)

7. 실제 배포 체크리스트

8. 국제 표준

핵심 정리

ROS 2 기반 군집 로봇은 분산 제어, 에머전스, 장애 복원력을 통해 100~1000개 에이전트가 중앙 제어 없이 자동으로 협력하는 새로운 패러다임을 실현합니다.