抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

Multi-Agent(多智能体)系统是AI Agent发展的高级形态,通过多个专业Agent的协作来完成复杂任务。相比单一Agent,Multi-Agent系统具有更强的任务分解能力、容错性和可扩展性。本文详细介绍Multi-Agent系统的架构设计、协作模式和实践方法。

为什么需要Multi-Agent

单Agent的局限性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
┌─────────────────────────────────────────────────────────────────┐
│ 单Agent的挑战 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. 能力边界 │
│ ├── 单个LLM的上下文窗口有限 │
│ ├── 难以同时精通所有领域 │
│ └── 复杂任务超出单Agent处理能力 │
│ │
│ 2. 可靠性问题 │
│ ├── 单点故障风险 │
│ ├── 长任务容易中断 │
│ └── 错误难以自我纠正 │
│ │
│ 3. 扩展性限制 │
│ ├── 无法并行处理 │
│ ├── 难以动态增加能力 │
│ └── 性能瓶颈明显 │
│ │
│ 举例: │
│ 任务:"开发一个完整的Web应用" │
│ │
│ 单Agent需要: │
│ - 理解需求 → 设计架构 → 写前端 → 写后端 → │
│ - 写数据库 → 测试 → 部署 → 文档 │
│ │
│ 问题:上下文爆炸、专业度不够、效率低下 │
│ │
└─────────────────────────────────────────────────────────────────┘

Multi-Agent的优势

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
┌─────────────────────────────────────────────────────────────────┐
│ Multi-Agent优势 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 同样的任务:"开发一个完整的Web应用" │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 协调Agent │ │
│ │ 任务分解 → 分配 → 整合 │ │
│ └────────────────────────┬──────────────────────────────────┘ │
│ │ │
│ ┌──────────────────┼───────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 前端 │ │ 后端 │ │ 测试 │ │
│ │ Agent │ │ Agent │ │ Agent │ │
│ │ │ │ │ │ │ │
│ │ React │ │ Python │ │ 自动化 │ │
│ │ Vue │ │ Node.js │ │ 测试 │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ 优势: │
│ ✓ 专业分工:每个Agent专注擅长领域 │
│ ✓ 并行处理:多个Agent同时工作 │
│ ✓ 容错性强:单个失败不影响整体 │
│ ✓ 易于扩展:添加新Agent即可增加能力 │
│ │
└─────────────────────────────────────────────────────────────────┘

Multi-Agent架构模式

1. 中心化架构(Orchestrator模式)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
┌─────────────────────────────────────────────────────────────────┐
│ 中心化架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────┐ │
│ │ Orchestrator │ │
│ │ (协调者Agent) │ │
│ │ │ │
│ │ - 任务分解 │ │
│ │ - Agent调度 │ │
│ │ - 结果整合 │ │
│ └────────┬─────────┘ │
│ │ │
│ ┌───────────────────┼───────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Worker │ │ Worker │ │ Worker │ │
│ │ Agent A │ │ Agent B │ │ Agent C │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ 特点: │
│ - 中央控制,流程清晰 │
│ - 易于监控和管理 │
│ - 协调者是瓶颈和单点 │
│ │
│ 适用:任务流程固定、需要强一致性的场景 │
│ │
└─────────────────────────────────────────────────────────────────┘

2. 去中心化架构(Peer-to-Peer模式)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
┌─────────────────────────────────────────────────────────────────┐
│ 去中心化架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ │
│ │ Agent A │◀───────▶│ Agent B │ │
│ └────┬────┘ └────┬────┘ │
│ │ │ │
│ │ ┌─────────┐ │ │
│ └───▶│ Agent C │◀───┘ │
│ └────┬────┘ │
│ │ │
│ ┌──────────────┼──────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Agent D │◀─▶│ Agent E │◀─▶│ Agent F │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ 特点: │
│ - 无单点故障 │
│ - 高度灵活和自适应 │
│ - 协调复杂,可能出现冲突 │
│ │
│ 适用:动态环境、需要高可用性的场景 │
│ │
└─────────────────────────────────────────────────────────────────┘

3. 层级架构(Hierarchical模式)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
┌─────────────────────────────────────────────────────────────────┐
│ 层级架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────┐ │
│ │ Manager Agent │ 战略层 │
│ │ (顶层管理者) │ - 目标设定 │
│ └────────┬─────────┘ - 资源分配 │
│ │ │
│ ┌──────────────┼──────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ 战术层 │
│ │ Supervisor│ │ Supervisor│ │ Supervisor│ - 任务规划 │
│ │ Agent A │ │ Agent B │ │ Agent C │ - 进度跟踪 │
│ └─────┬─────┘ └─────┬─────┘ └─────┬─────┘ │
│ │ │ │ │
│ ┌───┴───┐ ┌───┴───┐ ┌───┴───┐ │
│ ▼ ▼ ▼ ▼ ▼ ▼ │
│ ┌───┐ ┌───┐ ┌───┐ ┌───┐ ┌───┐ ┌───┐ 执行层 │
│ │ W │ │ W │ │ W │ │ W │ │ W │ │ W │ - 具体执行 │
│ └───┘ └───┘ └───┘ └───┘ └───┘ └───┘ - 结果反馈 │
│ │
│ 特点: │
│ - 适合大规模复杂任务 │
│ - 职责清晰,易于管理 │
│ - 通信开销较大 │
│ │
│ 适用:企业级应用、大型项目管理 │
│ │
└─────────────────────────────────────────────────────────────────┘

4. 混合架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
┌─────────────────────────────────────────────────────────────────┐
│ 混合架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌────────────────────────────────────────────────────────────┐│
│ │ 协调层 ││
│ │ ┌─────────────┐ ┌─────────────┐ ││
│ │ │ Orchestrator│◀────────────────────▶│ Orchestrator│ ││
│ │ │ A │ A2A通信 │ B │ ││
│ │ └──────┬──────┘ └──────┬──────┘ ││
│ └─────────┼─────────────────────────────────────┼─────────────┘│
│ │ │ │
│ ┌─────────┼─────────────┐ ┌───────────────┼────────────┐ │
│ │ │ 团队A │ │ 团队B │ │ │
│ │ ┌─────┼─────┐ │ │ ┌──────┼──────┐ │ │
│ │ ▼ ▼ ▼ │ │ ▼ ▼ ▼ │ │
│ │ ┌───┐ ┌───┐ ┌───┐ │ │ ┌───┐ ┌───┐ ┌───┐ │ │
│ │ │ A1│◀▶│A2│◀▶│A3│ │ │ │ B1│◀─▶│B2│◀─▶│B3│ │ │
│ │ └───┘ └───┘ └───┘ │ │ └───┘ └───┘ └───┘ │ │
│ │ P2P协作 │ │ P2P协作 │ │
│ └───────────────────────┘ └────────────────────────────┘ │
│ │
│ 特点: │
│ - 结合多种模式的优点 │
│ - 团队内部P2P灵活协作 │
│ - 团队之间通过协调者沟通 │
│ │
└─────────────────────────────────────────────────────────────────┘

Agent间通信

消息格式设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
from enum import Enum
from datetime import datetime
import uuid

class MessageType(Enum):
TASK_REQUEST = "task_request" # 任务请求
TASK_RESPONSE = "task_response" # 任务响应
STATUS_UPDATE = "status_update" # 状态更新
QUERY = "query" # 信息查询
BROADCAST = "broadcast" # 广播消息
HEARTBEAT = "heartbeat" # 心跳检测

class Priority(Enum):
LOW = 1
NORMAL = 2
HIGH = 3
URGENT = 4

@dataclass
class AgentMessage:
"""Agent间通信消息"""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
type: MessageType = MessageType.TASK_REQUEST
sender: str = "" # 发送者Agent ID
receiver: str = "" # 接收者Agent ID(空表示广播)
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
priority: Priority = Priority.NORMAL

# 消息内容
subject: str = "" # 主题
content: Dict[str, Any] = field(default_factory=dict)

# 上下文信息
conversation_id: str = "" # 会话ID,用于追踪相关消息
reply_to: Optional[str] = None # 回复的消息ID
metadata: Dict[str, Any] = field(default_factory=dict)

def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"type": self.type.value,
"sender": self.sender,
"receiver": self.receiver,
"timestamp": self.timestamp,
"priority": self.priority.value,
"subject": self.subject,
"content": self.content,
"conversation_id": self.conversation_id,
"reply_to": self.reply_to,
"metadata": self.metadata
}

消息总线实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import asyncio
from collections import defaultdict
from typing import Callable, Awaitable

class MessageBus:
"""Agent消息总线"""

def __init__(self):
self._subscribers: Dict[str, List[Callable]] = defaultdict(list)
self._topic_subscribers: Dict[str, List[Callable]] = defaultdict(list)
self._message_queue: asyncio.Queue = asyncio.Queue()
self._running = False

async def start(self):
"""启动消息总线"""
self._running = True
asyncio.create_task(self._process_messages())

async def stop(self):
"""停止消息总线"""
self._running = False

def subscribe(self, agent_id: str, handler: Callable[[AgentMessage], Awaitable[None]]):
"""订阅特定Agent的消息"""
self._subscribers[agent_id].append(handler)

def subscribe_topic(self, topic: str, handler: Callable[[AgentMessage], Awaitable[None]]):
"""订阅特定主题的消息"""
self._topic_subscribers[topic].append(handler)

async def publish(self, message: AgentMessage):
"""发布消息"""
await self._message_queue.put(message)

async def _process_messages(self):
"""消息处理循环"""
while self._running:
try:
message = await asyncio.wait_for(
self._message_queue.get(),
timeout=1.0
)
await self._route_message(message)
except asyncio.TimeoutError:
continue

async def _route_message(self, message: AgentMessage):
"""路由消息到正确的处理者"""
handlers = []

# 点对点消息
if message.receiver:
handlers.extend(self._subscribers.get(message.receiver, []))
else:
# 广播消息
for subscriber_handlers in self._subscribers.values():
handlers.extend(subscriber_handlers)

# 主题订阅
handlers.extend(self._topic_subscribers.get(message.subject, []))

# 并行调用所有处理者
await asyncio.gather(*[h(message) for h in handlers], return_exceptions=True)

通信协议

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
class AgentCommunicator:
"""Agent通信器"""

def __init__(self, agent_id: str, message_bus: MessageBus):
self.agent_id = agent_id
self.message_bus = message_bus
self._pending_requests: Dict[str, asyncio.Future] = {}

# 注册消息处理
message_bus.subscribe(agent_id, self._handle_message)

async def send_request(
self,
receiver: str,
content: Dict[str, Any],
timeout: float = 30.0
) -> Dict[str, Any]:
"""发送请求并等待响应"""

message = AgentMessage(
type=MessageType.TASK_REQUEST,
sender=self.agent_id,
receiver=receiver,
content=content
)

# 创建Future等待响应
future = asyncio.Future()
self._pending_requests[message.id] = future

# 发送消息
await self.message_bus.publish(message)

try:
# 等待响应
response = await asyncio.wait_for(future, timeout=timeout)
return response
except asyncio.TimeoutError:
del self._pending_requests[message.id]
raise TimeoutError(f"Request to {receiver} timed out")

async def send_response(
self,
original_message: AgentMessage,
content: Dict[str, Any]
):
"""发送响应"""

response = AgentMessage(
type=MessageType.TASK_RESPONSE,
sender=self.agent_id,
receiver=original_message.sender,
content=content,
reply_to=original_message.id,
conversation_id=original_message.conversation_id
)

await self.message_bus.publish(response)

async def broadcast(self, subject: str, content: Dict[str, Any]):
"""广播消息"""

message = AgentMessage(
type=MessageType.BROADCAST,
sender=self.agent_id,
receiver="", # 空表示广播
subject=subject,
content=content
)

await self.message_bus.publish(message)

async def _handle_message(self, message: AgentMessage):
"""处理收到的消息"""

if message.type == MessageType.TASK_RESPONSE:
# 处理响应消息
if message.reply_to in self._pending_requests:
future = self._pending_requests.pop(message.reply_to)
future.set_result(message.content)

实现Multi-Agent系统

基础Agent类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
from abc import ABC, abstractmethod

class BaseAgent(ABC):
"""Multi-Agent系统中的基础Agent"""

def __init__(
self,
agent_id: str,
name: str,
message_bus: MessageBus,
llm: LLMClient = None
):
self.agent_id = agent_id
self.name = name
self.llm = llm or LLMClient()
self.communicator = AgentCommunicator(agent_id, message_bus)
self.state = AgentState.IDLE
self._message_handlers: Dict[MessageType, Callable] = {}

self._setup_handlers()

@property
@abstractmethod
def capabilities(self) -> List[str]:
"""Agent的能力列表"""
pass

@property
@abstractmethod
def description(self) -> str:
"""Agent的描述"""
pass

def _setup_handlers(self):
"""设置消息处理器"""
self._message_handlers = {
MessageType.TASK_REQUEST: self._handle_task_request,
MessageType.QUERY: self._handle_query,
MessageType.BROADCAST: self._handle_broadcast,
}

async def start(self):
"""启动Agent"""
self.state = AgentState.RUNNING
self.communicator.message_bus.subscribe(
self.agent_id,
self._dispatch_message
)

async def stop(self):
"""停止Agent"""
self.state = AgentState.STOPPED

async def _dispatch_message(self, message: AgentMessage):
"""分发消息到对应处理器"""
handler = self._message_handlers.get(message.type)
if handler:
await handler(message)

@abstractmethod
async def _handle_task_request(self, message: AgentMessage):
"""处理任务请求"""
pass

async def _handle_query(self, message: AgentMessage):
"""处理查询请求"""
query_type = message.content.get("type")

if query_type == "capabilities":
await self.communicator.send_response(message, {
"capabilities": self.capabilities
})
elif query_type == "status":
await self.communicator.send_response(message, {
"status": self.state.value
})

async def _handle_broadcast(self, message: AgentMessage):
"""处理广播消息"""
# 子类可以覆盖实现特定逻辑
pass

async def delegate_task(
self,
target_agent: str,
task: Dict[str, Any]
) -> Dict[str, Any]:
"""委托任务给其他Agent"""
return await self.communicator.send_request(target_agent, task)

Orchestrator Agent实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
class OrchestratorAgent(BaseAgent):
"""协调者Agent - 负责任务分解和Agent调度"""

def __init__(self, agent_id: str, message_bus: MessageBus, llm: LLMClient):
super().__init__(agent_id, "Orchestrator", message_bus, llm)
self.worker_agents: Dict[str, Dict] = {} # 已注册的Worker Agent

@property
def capabilities(self) -> List[str]:
return ["task_decomposition", "agent_coordination", "result_aggregation"]

@property
def description(self) -> str:
return "负责分解复杂任务、调度Worker Agent、整合执行结果"

def register_worker(self, agent_id: str, capabilities: List[str], description: str):
"""注册Worker Agent"""
self.worker_agents[agent_id] = {
"capabilities": capabilities,
"description": description,
"status": "available"
}

async def execute_task(self, task: str) -> Dict[str, Any]:
"""执行复杂任务"""

# 1. 任务分解
subtasks = await self._decompose_task(task)

# 2. 分配任务
assignments = await self._assign_tasks(subtasks)

# 3. 执行任务
results = await self._execute_assignments(assignments)

# 4. 整合结果
final_result = await self._aggregate_results(task, results)

return final_result

async def _decompose_task(self, task: str) -> List[Dict[str, Any]]:
"""使用LLM分解任务"""

workers_info = "\n".join([
f"- {aid}: {info['description']}, 能力: {info['capabilities']}"
for aid, info in self.worker_agents.items()
])

prompt = f"""
你是一个任务分解专家。请将以下任务分解为可以独立执行的子任务。

可用的Agent:
{workers_info}

任务:{task}

请以JSON格式返回子任务列表:
[
{{
"id": "subtask_1",
"description": "子任务描述",
"dependencies": [], // 依赖的子任务ID
"required_capabilities": ["capability1"]
}}
]
"""

response = await self.llm.generate(prompt)
return json.loads(response)

async def _assign_tasks(
self,
subtasks: List[Dict[str, Any]]
) -> Dict[str, List[Dict]]:
"""将子任务分配给合适的Agent"""

assignments = defaultdict(list)

for subtask in subtasks:
# 找到能力匹配的Agent
best_agent = self._find_best_agent(subtask["required_capabilities"])
if best_agent:
assignments[best_agent].append(subtask)
else:
raise ValueError(f"No agent can handle: {subtask}")

return dict(assignments)

def _find_best_agent(self, required_capabilities: List[str]) -> Optional[str]:
"""找到最适合的Agent"""
for agent_id, info in self.worker_agents.items():
if info["status"] == "available":
if all(cap in info["capabilities"] for cap in required_capabilities):
return agent_id
return None

async def _execute_assignments(
self,
assignments: Dict[str, List[Dict]]
) -> Dict[str, Any]:
"""执行任务分配"""

results = {}

# 按依赖关系排序执行
executed = set()
pending = []

for agent_id, tasks in assignments.items():
for task in tasks:
pending.append((agent_id, task))

while pending:
# 找出可以执行的任务(依赖已完成)
ready = [
(aid, t) for aid, t in pending
if all(dep in executed for dep in t.get("dependencies", []))
]

if not ready:
raise ValueError("Circular dependency detected")

# 并行执行ready的任务
tasks_to_run = []
for agent_id, task in ready:
tasks_to_run.append(self._execute_single_task(agent_id, task))
pending.remove((agent_id, task))

task_results = await asyncio.gather(*tasks_to_run)

for (agent_id, task), result in zip(ready, task_results):
results[task["id"]] = result
executed.add(task["id"])

return results

async def _execute_single_task(
self,
agent_id: str,
task: Dict[str, Any]
) -> Dict[str, Any]:
"""委托单个任务给Worker Agent"""

self.worker_agents[agent_id]["status"] = "busy"

try:
result = await self.delegate_task(agent_id, {
"type": "execute",
"task": task
})
return result
finally:
self.worker_agents[agent_id]["status"] = "available"

async def _aggregate_results(
self,
original_task: str,
results: Dict[str, Any]
) -> Dict[str, Any]:
"""整合所有子任务结果"""

prompt = f"""
请整合以下子任务的执行结果,生成最终的任务输出。

原始任务:{original_task}

子任务结果:
{json.dumps(results, ensure_ascii=False, indent=2)}

请生成结构化的最终结果。
"""

response = await self.llm.generate(prompt)
return {
"status": "completed",
"summary": response,
"details": results
}

async def _handle_task_request(self, message: AgentMessage):
"""处理任务请求"""
task = message.content.get("task")
result = await self.execute_task(task)
await self.communicator.send_response(message, result)

Worker Agent实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
class WorkerAgent(BaseAgent):
"""执行具体任务的Worker Agent"""

def __init__(
self,
agent_id: str,
name: str,
message_bus: MessageBus,
llm: LLMClient,
skills: List[BaseSkill]
):
super().__init__(agent_id, name, message_bus, llm)
self.skills = {s.definition.id: s for s in skills}

@property
def capabilities(self) -> List[str]:
return list(self.skills.keys())

@property
def description(self) -> str:
skill_names = [s.definition.name for s in self.skills.values()]
return f"专业Agent,擅长: {', '.join(skill_names)}"

async def _handle_task_request(self, message: AgentMessage):
"""处理任务请求"""
task = message.content.get("task", {})

try:
# 1. 选择合适的Skill
skill_id = await self._select_skill(task)

if not skill_id or skill_id not in self.skills:
raise ValueError(f"No suitable skill for task")

# 2. 准备参数
params = await self._prepare_params(task, skill_id)

# 3. 执行Skill
skill = self.skills[skill_id]
result = await skill.run(params)

# 4. 返回结果
await self.communicator.send_response(message, {
"status": "success",
"result": result
})

except Exception as e:
await self.communicator.send_response(message, {
"status": "error",
"error": str(e)
})

async def _select_skill(self, task: Dict[str, Any]) -> str:
"""选择执行任务的Skill"""
task_desc = task.get("description", "")
required_caps = task.get("required_capabilities", [])

# 优先匹配required_capabilities
for cap in required_caps:
if cap in self.skills:
return cap

# 使用LLM选择
skills_info = "\n".join([
f"- {sid}: {s.definition.name} - {s.definition.description}"
for sid, s in self.skills.items()
])

prompt = f"""
根据任务描述,从以下技能中选择最合适的一个:

技能列表:
{skills_info}

任务:{task_desc}

只返回技能ID,不要其他内容。
"""

return (await self.llm.generate(prompt)).strip()

async def _prepare_params(
self,
task: Dict[str, Any],
skill_id: str
) -> Dict[str, Any]:
"""准备Skill执行参数"""
skill = self.skills[skill_id]
schema = skill.definition.input_schema

# 使用LLM提取参数
prompt = f"""
从任务信息中提取参数。

任务:{json.dumps(task, ensure_ascii=False)}

参数Schema:{json.dumps(schema, ensure_ascii=False)}

以JSON格式返回参数。
"""

response = await self.llm.generate(prompt)
return json.loads(response)

协作模式

1. 任务分解协作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
async def task_decomposition_example():
"""任务分解协作示例"""

# 创建消息总线
bus = MessageBus()
await bus.start()

# 创建Orchestrator
orchestrator = OrchestratorAgent("orchestrator", bus, LLMClient())

# 创建Worker Agents
coder = WorkerAgent("coder", "代码专家", bus, LLMClient(), [
CodeGenerationSkill(),
CodeReviewSkill()
])

tester = WorkerAgent("tester", "测试专家", bus, LLMClient(), [
TestGenerationSkill(),
TestExecutionSkill()
])

documenter = WorkerAgent("documenter", "文档专家", bus, LLMClient(), [
DocumentationSkill()
])

# 注册Workers
orchestrator.register_worker("coder", coder.capabilities, coder.description)
orchestrator.register_worker("tester", tester.capabilities, tester.description)
orchestrator.register_worker("documenter", documenter.capabilities, documenter.description)

# 启动所有Agent
await asyncio.gather(
orchestrator.start(),
coder.start(),
tester.start(),
documenter.start()
)

# 执行复杂任务
result = await orchestrator.execute_task(
"实现一个用户认证模块,包含登录、注册、密码重置功能,并编写测试和文档"
)

print(result)

2. 辩论协作(Debate)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
class DebateOrchestrator:
"""辩论式协作 - 多个Agent讨论得出更好的答案"""

def __init__(self, agents: List[BaseAgent], llm: LLMClient):
self.agents = agents
self.llm = llm

async def debate(
self,
question: str,
max_rounds: int = 3
) -> str:
"""进行多轮辩论"""

responses = {}
history = []

# 第一轮:各Agent独立回答
for agent in self.agents:
response = await agent.think(question)
responses[agent.agent_id] = response
history.append({
"round": 1,
"agent": agent.agent_id,
"response": response
})

# 多轮辩论
for round_num in range(2, max_rounds + 1):
for agent in self.agents:
# 让Agent看到其他Agent的回答并回应
others_responses = {
aid: resp for aid, resp in responses.items()
if aid != agent.agent_id
}

prompt = f"""
问题:{question}

你之前的回答:{responses[agent.agent_id]}

其他专家的回答:
{json.dumps(others_responses, ensure_ascii=False, indent=2)}

请考虑其他专家的观点,更新你的回答。如果你认为某个观点更好,可以采纳。
如果你坚持原有观点,请说明理由。
"""

new_response = await agent.llm.generate(prompt)
responses[agent.agent_id] = new_response
history.append({
"round": round_num,
"agent": agent.agent_id,
"response": new_response
})

# 综合所有观点得出最终答案
final_answer = await self._synthesize(question, history)
return final_answer

async def _synthesize(self, question: str, history: List[Dict]) -> str:
"""综合所有讨论得出最终答案"""

prompt = f"""
问题:{question}

以下是多位专家的讨论过程:
{json.dumps(history, ensure_ascii=False, indent=2)}

请综合所有专家的观点,给出一个全面、准确的最终答案。
"""

return await self.llm.generate(prompt)

3. 投票协作(Voting)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class VotingOrchestrator:
"""投票式协作 - 多数决定"""

def __init__(self, agents: List[BaseAgent]):
self.agents = agents

async def vote(self, question: str, options: List[str] = None) -> Dict[str, Any]:
"""进行投票"""

votes = []

# 收集每个Agent的投票
for agent in self.agents:
if options:
vote = await self._vote_with_options(agent, question, options)
else:
vote = await self._vote_open(agent, question)
votes.append({
"agent": agent.agent_id,
"vote": vote["choice"],
"reasoning": vote["reasoning"]
})

# 统计结果
vote_counts = defaultdict(int)
for v in votes:
vote_counts[v["vote"]] += 1

winner = max(vote_counts.items(), key=lambda x: x[1])

return {
"winner": winner[0],
"vote_count": winner[1],
"total_votes": len(votes),
"all_votes": votes,
"breakdown": dict(vote_counts)
}

async def _vote_with_options(
self,
agent: BaseAgent,
question: str,
options: List[str]
) -> Dict[str, Any]:
"""从选项中投票"""

options_str = "\n".join([f"{i+1}. {opt}" for i, opt in enumerate(options)])

prompt = f"""
问题:{question}

选项:
{options_str}

请选择一个选项并说明理由。返回JSON格式:
{{"choice": "选项内容", "reasoning": "理由"}}
"""

response = await agent.llm.generate(prompt)
return json.loads(response)

4. 反思协作(Reflection)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
class ReflectionChain:
"""反思链 - Agent A生成,Agent B审查,迭代改进"""

def __init__(
self,
generator: BaseAgent,
reviewer: BaseAgent,
max_iterations: int = 3
):
self.generator = generator
self.reviewer = reviewer
self.max_iterations = max_iterations

async def generate_with_reflection(self, task: str) -> Dict[str, Any]:
"""生成并反思改进"""

history = []
current_output = None

for iteration in range(self.max_iterations):
# 生成/改进
if current_output is None:
current_output = await self._initial_generate(task)
else:
current_output = await self._improve(task, current_output, feedback)

history.append({
"iteration": iteration + 1,
"type": "generation",
"output": current_output
})

# 审查
review_result = await self._review(task, current_output)
feedback = review_result["feedback"]

history.append({
"iteration": iteration + 1,
"type": "review",
"passed": review_result["passed"],
"feedback": feedback
})

# 如果通过审查,结束迭代
if review_result["passed"]:
break

return {
"final_output": current_output,
"iterations": len(history) // 2,
"history": history
}

async def _initial_generate(self, task: str) -> str:
"""初始生成"""
return await self.generator.llm.generate(f"请完成以下任务:\n{task}")

async def _improve(self, task: str, current: str, feedback: str) -> str:
"""根据反馈改进"""
prompt = f"""
任务:{task}

当前输出:
{current}

审查反馈:
{feedback}

请根据反馈改进输出。
"""
return await self.generator.llm.generate(prompt)

async def _review(self, task: str, output: str) -> Dict[str, Any]:
"""审查输出"""
prompt = f"""
任务:{task}

输出:
{output}

请审查这个输出:
1. 是否完全满足任务要求?
2. 有哪些可以改进的地方?

返回JSON格式:
{{"passed": true/false, "feedback": "具体反馈"}}
"""
response = await self.reviewer.llm.generate(prompt)
return json.loads(response)

实际应用案例

软件开发团队

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
async def software_dev_team():
"""模拟软件开发团队"""

bus = MessageBus()
await bus.start()

# 团队成员
pm = ProjectManagerAgent("pm", bus) # 项目经理
architect = ArchitectAgent("architect", bus) # 架构师
frontend = FrontendDevAgent("frontend", bus) # 前端开发
backend = BackendDevAgent("backend", bus) # 后端开发
qa = QAAgent("qa", bus) # 测试工程师
devops = DevOpsAgent("devops", bus) # 运维工程师

# 项目经理收到需求
requirement = """
开发一个在线商城系统:
- 用户注册登录
- 商品浏览和搜索
- 购物车功能
- 订单管理
- 支付集成
"""

# PM分解任务
tasks = await pm.analyze_requirement(requirement)

# 架构师设计方案
architecture = await architect.design(tasks)

# 并行开发
frontend_task = frontend.develop(architecture["frontend"])
backend_task = backend.develop(architecture["backend"])

frontend_code, backend_code = await asyncio.gather(
frontend_task,
backend_task
)

# QA测试
test_results = await qa.test({
"frontend": frontend_code,
"backend": backend_code
})

# 如果测试失败,返回修复
while not test_results["passed"]:
if test_results["frontend_issues"]:
frontend_code = await frontend.fix(test_results["frontend_issues"])
if test_results["backend_issues"]:
backend_code = await backend.fix(test_results["backend_issues"])

test_results = await qa.test({
"frontend": frontend_code,
"backend": backend_code
})

# DevOps部署
deployment = await devops.deploy({
"frontend": frontend_code,
"backend": backend_code
})

return deployment

研究分析团队

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
┌─────────────────────────────────────────────────────────────────┐
│ 研究分析Multi-Agent │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 用户问题:"分析AI对就业市场的影响" │
│ │
│ ┌──────────────┐ │
│ │ 研究协调员 │ │
│ └──────┬───────┘ │
│ │ │
│ ┌───────────────────┼───────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 数据收集 │ │ 学术研究 │ │ 行业分析 │ │
│ │ Agent │ │ Agent │ │ Agent │ │
│ │ │ │ │ │ │ │
│ │ 搜索统计 │ │ 论文检索 │ │ 报告分析 │ │
│ │ 数据爬取 │ │ 引用分析 │ │ 趋势预测 │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
│ └──────────────────┼──────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ 报告撰写 │ │
│ │ Agent │ │
│ │ │ │
│ │ 综合分析 │ │
│ │ 生成报告 │ │
│ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘

最佳实践

设计原则

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
┌─────────────────────────────────────────────────────────────────┐
│ Multi-Agent设计原则 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. 单一职责 │
│ └── 每个Agent专注一个领域,避免万能Agent │
│ │
│ 2. 松耦合 │
│ └── Agent之间通过消息通信,不直接依赖 │
│ │
│ 3. 明确接口 │
│ └── 清晰定义每个Agent的输入输出格式 │
│ │
│ 4. 容错设计 │
│ └── 单个Agent失败不应导致整体崩溃 │
│ │
│ 5. 可观测性 │
│ └── 记录所有Agent交互,便于调试 │
│ │
│ 6. 渐进式复杂度 │
│ └── 从简单架构开始,按需增加Agent │
│ │
└─────────────────────────────────────────────────────────────────┘

常见陷阱

陷阱 问题 解决方案
过度分解 Agent太多,协调成本高 合理粒度,按需拆分
循环依赖 Agent互相等待 明确依赖关系
通信风暴 消息过多,性能下降 批量处理,减少通信
状态不一致 多Agent状态冲突 引入协调机制
错误传播 一个错误影响全局 隔离和重试机制

总结

Multi-Agent核心价值

1
2
3
4
5
6
7
┌─────────────────────────────────────────────────────────────────┐
│ 1. 专业分工:每个Agent专注擅长领域 │
│ 2. 并行处理:多Agent同时工作,提高效率 │
│ 3. 容错性强:单点故障不影响整体 │
│ 4. 易于扩展:添加新Agent即可增加能力 │
│ 5. 复杂任务:能够处理单Agent无法完成的任务 │
└─────────────────────────────────────────────────────────────────┘

架构选择指南

场景 推荐架构
流程固定的任务 中心化(Orchestrator)
动态灵活的环境 去中心化(P2P)
大规模复杂项目 层级架构
需要高可用 混合架构

协作模式选择

模式 适用场景
任务分解 可以明确拆分的复杂任务
辩论协作 需要多角度分析的问题
投票协作 需要做出选择的决策
反思协作 需要高质量输出的创作

Multi-Agent系统代表了AI Agent发展的重要方向,通过合理的架构设计和协作模式,可以构建出远超单Agent能力的智能系统。