AI Agents-7 | Muti-Agent的架構(gòu)解析 原創(chuàng)
在當(dāng)今快速發(fā)展的AI領(lǐng)域,多智能體架構(gòu)正逐漸成為解決復(fù)雜任務(wù)的強(qiáng)大工具。從簡(jiǎn)單的單智能體系統(tǒng)到復(fù)雜的多智能體協(xié)同,我們看到了AI在任務(wù)管理、資源分配和決策效率上的巨大潛力。今天,就讓我們深入探討多智能體架構(gòu)的魅力,看看它是如何通過(guò)不同的模式和策略,為我們的生活和工作帶來(lái)變革的。
一、單智能體與多智能體架構(gòu):選擇適合的路徑
在AI的世界里,單智能體架構(gòu)就像是一個(gè)“全能型選手”,它試圖用一個(gè)智能體來(lái)完成所有任務(wù),無(wú)論是瀏覽網(wǎng)頁(yè)還是處理文件操作。這種架構(gòu)在任務(wù)簡(jiǎn)單且明確時(shí)非常高效,但隨著任務(wù)復(fù)雜度的增加和工具數(shù)量的增多,單智能體系統(tǒng)往往會(huì)陷入困境。比如,當(dāng)智能體需要處理過(guò)多的工具或面對(duì)過(guò)于復(fù)雜的上下文時(shí),它可能會(huì)開(kāi)始犯錯(cuò),甚至產(chǎn)生次優(yōu)或錯(cuò)誤的結(jié)果。
而多智能體架構(gòu)則像是一個(gè)“團(tuán)隊(duì)作戰(zhàn)”的模式,每個(gè)智能體專注于自己的領(lǐng)域和工具集。這種架構(gòu)在面對(duì)復(fù)雜、動(dòng)態(tài)的用例時(shí)表現(xiàn)得尤為出色,尤其是在需要專業(yè)知識(shí)和協(xié)作的場(chǎng)景中。例如,一個(gè)軟件開(kāi)發(fā)項(xiàng)目中,可以有規(guī)劃者、研究者、數(shù)學(xué)專家等多個(gè)智能體協(xié)同工作,每個(gè)智能體各司其職,共同完成任務(wù)。
二、多智能體架構(gòu)的模式:解鎖協(xié)同的力量
多智能體架構(gòu)的魅力在于其多樣性和靈活性。不同的任務(wù)和場(chǎng)景需要不同的協(xié)同模式,下面我們來(lái)看看幾種常見(jiàn)的模式。
(一)并行模式:多任務(wù)處理的高效方式
并行模式是多智能體架構(gòu)中最直觀的一種。在這種模式下,多個(gè)智能體同時(shí)處理任務(wù)的不同部分。比如,我們需要對(duì)一段文本進(jìn)行總結(jié)、翻譯和情感分析,三個(gè)智能體可以同時(shí)開(kāi)始工作,分別完成各自的任務(wù)。這種方式大大提高了任務(wù)處理的效率,因?yàn)樗梢猿浞掷枚嗪颂幚砥鞯牟⑿杏?jì)算能力。
代碼示例:
from typing import Dict, Any, TypedDict
from langgraph.graph import StateGraph, END
from langchain_core.runnables import RunnableConfig
from textblob import TextBlob
import re
import time
# 定義狀態(tài)
class AgentState(TypedDict):
text: str
summary: str
translation: str
sentiment: str
summary_time: float
translation_time: float
sentiment_time: float
# 總結(jié)智能體
def summarize_agent(state: AgentState) -> Dict[str, Any]:
print("總結(jié)智能體:正在運(yùn)行")
start_time = time.time()
try:
text = state["text"]
ifnot text.strip():
return {
"summary": "未提供文本進(jìn)行總結(jié)。",
"summary_time": 0.0
}
time.sleep(2)
sentences = re.split(r'(?<=[.!?]) +', text.strip())
scored_sentences = [(s, len(s.split())) for s in sentences if s]
top_sentences = [s for s, _ in sorted(scored_sentences, key=lambda x: x[1], reverse=True)[:2]]
summary = " ".join(top_sentences) if top_sentences else"文本太短,無(wú)法總結(jié)。"
processing_time = time.time() - start_time
print(f"總結(jié)智能體:完成,耗時(shí) {processing_time:.2f} 秒")
return {
"summary": summary,
"summary_time": processing_time
}
except Exception as e:
return {
"summary": f"總結(jié)錯(cuò)誤:{str(e)}",
"summary_time": 0.0
}
# 翻譯智能體
def translate_agent(state: AgentState) -> Dict[str, Any]:
print("翻譯智能體:正在運(yùn)行")
start_time = time.time()
try:
text = state["text"]
ifnot text.strip():
return {
"translation": "未提供文本進(jìn)行翻譯。",
"translation_time": 0.0
}
time.sleep(3)
translation = (
"El nuevo parque en la ciudad es una maravillosa adición. "
"Las familias disfrutan de los espacios abiertos, y a los ni?os les encanta el parque infantil. "
"Sin embargo, algunas personas piensan que el área de estacionamiento es demasiado peque?a."
)
processing_time = time.time() - start_time
print(f"翻譯智能體:完成,耗時(shí) {processing_time:.2f} 秒")
return {
"translation": translation,
"translation_time": processing_time
}
except Exception as e:
return {
"translation": f"翻譯錯(cuò)誤:{str(e)}",
"translation_time": 0.0
}
# 情感分析智能體
def sentiment_agent(state: AgentState) -> Dict[str, Any]:
print("情感分析智能體:正在運(yùn)行")
start_time = time.time()
try:
text = state["text"]
ifnot text.strip():
return {
"sentiment": "未提供文本進(jìn)行情感分析。",
"sentiment_time": 0.0
}
time.sleep(1.5)
blob = TextBlob(text)
polarity = blob.sentiment.polarity
subjectivity = blob.sentiment.subjectivity
sentiment = "Positive"if polarity > 0else"Negative"if polarity < 0else"Neutral"
result = f"{sentiment} (Polarity: {polarity:.2f}, Subjectivity: {subjectivity:.2f})"
processing_time = time.time() - start_time
print(f"情感分析智能體:完成,耗時(shí) {processing_time:.2f} 秒")
return {
"sentiment": result,
"sentiment_time": processing_time
}
except Exception as e:
return {
"sentiment": f"情感分析錯(cuò)誤:{str(e)}",
"sentiment_time": 0.0
}
# 合并節(jié)點(diǎn)
def join_parallel_results(state: AgentState) -> AgentState:
return state
# 構(gòu)建圖
def build_parallel_graph() -> StateGraph:
workflow = StateGraph(AgentState)
parallel_branches = {
"summarize_node": summarize_agent,
"translate_node": translate_agent,
"sentiment_node": sentiment_agent
}
for name, agent in parallel_branches.items():
workflow.add_node(name, agent)
workflow.add_node("branch", lambda state: state)
workflow.add_node("join", join_parallel_results)
workflow.set_entry_point("branch")
for name in parallel_branches:
workflow.add_edge("branch", name)
workflow.add_edge(name, "join")
workflow.add_edge("join", END)
return workflow.compile()
# 主函數(shù)
def main():
text = (
"The new park in the city is a wonderful addition. Families are enjoying the open spaces, "
"and children love the playground. However, some people think the parking area is too small."
)
initial_state: AgentState = {
"text": text,
"summary": "",
"translation": "",
"sentiment": "",
"summary_time": 0.0,
"translation_time": 0.0,
"sentiment_time": 0.0
}
print("\n構(gòu)建新圖...")
app = build_parallel_graph()
print("\n開(kāi)始并行處理...")
start_time = time.time()
config = RunnableConfig(parallel=True)
result = app.invoke(initial_state, cnotallow=config)
total_time = time.time() - start_time
print("\n=== 并行任務(wù)結(jié)果 ===")
print(f"輸入文本:\n{text}\n")
print(f"總結(jié):\n{result['summary']}\n")
print(f"翻譯(西班牙語(yǔ)):\n{result['translation']}\n")
print(f"情感分析:\n{result['sentiment']}\n")
print("\n=== 處理時(shí)間 ===")
processing_times = {
"summary": result["summary_time"],
"translation": result["translation_time"],
"sentiment": result["sentiment_time"]
}
for agent, time_taken in processing_times.items():
print(f"{agent.capitalize()}: {time_taken:.2f} 秒")
print(f"\n總耗時(shí): {total_time:.2f} 秒")
print(f"各任務(wù)總耗時(shí): {sum(processing_times.values()):.2f} 秒")
print(f"并行節(jié)省時(shí)間: {sum(processing_times.values()) - total_time:.2f} 秒")
if __name__ == "__main__":
main()
(二)順序模式:按部就班的協(xié)同
順序模式則是一種“接力棒”式的協(xié)同方式。在這種模式下,任務(wù)按照一定的順序依次傳遞給不同的智能體,每個(gè)智能體的輸出成為下一個(gè)智能體的輸入。比如,在一個(gè)多步驟的審批流程中,團(tuán)隊(duì)負(fù)責(zé)人先審批,然后是部門(mén)經(jīng)理,最后是財(cái)務(wù)總監(jiān)。這種方式適合那些需要嚴(yán)格順序執(zhí)行的任務(wù)。
代碼示例:
from typing import Dict
from langgraph.graph import StateGraph, MessagesState, END
from langchain_core.runnables import RunnableConfig
from langchain_core.messages import HumanMessage, AIMessage
import json
# 團(tuán)隊(duì)負(fù)責(zé)人
def team_lead_agent(state: MessagesState, config: RunnableConfig) -> Dict:
print("團(tuán)隊(duì)負(fù)責(zé)人:開(kāi)始審批")
messages = state["messages"]
proposal = json.loads(messages[0].content)
title = proposal.get("title", "")
amount = proposal.get("amount", 0.0)
ifnot title or amount <= 0:
status = "Rejected"
comment = "團(tuán)隊(duì)負(fù)責(zé)人:提案因缺少標(biāo)題或金額無(wú)效而被拒絕。"
goto = END
else:
status = "Approved by Team Lead"
comment = "團(tuán)隊(duì)負(fù)責(zé)人:提案完整且已批準(zhǔn)。"
goto = "dept_manager"
print(f"團(tuán)隊(duì)負(fù)責(zé)人:審批完成 - {status}")
messages.append(AIMessage(
cnotallow=json.dumps({"status": status, "comment": comment}),
additional_kwargs={"agent": "team_lead", "goto": goto}
))
return {"messages": messages}
# 部門(mén)經(jīng)理
def dept_manager_agent(state: MessagesState, config: RunnableConfig) -> Dict:
print("部門(mén)經(jīng)理:開(kāi)始審批")
messages = state["messages"]
team_lead_msg = next((m for m in messages if m.additional_kwargs.get("agent") == "team_lead"), None)
proposal = json.loads(messages[0].content)
amount = proposal.get("amount", 0.0)
if json.loads(team_lead_msg.content)["status"] != "Approved by Team Lead":
status = "Rejected"
comment = "部門(mén)經(jīng)理:因團(tuán)隊(duì)負(fù)責(zé)人拒絕而跳過(guò)。"
goto = END
elif amount > 100000:
status = "Rejected"
comment = "部門(mén)經(jīng)理:預(yù)算超出限制。"
goto = END
else:
status = "Approved by Department Manager"
comment = "部門(mén)經(jīng)理:預(yù)算在限制范圍內(nèi)。"
goto = "finance_director"
print(f"部門(mén)經(jīng)理:審批完成 - {status}")
messages.append(AIMessage(
cnotallow=json.dumps({"status": status, "comment": comment}),
additional_kwargs={"agent": "dept_manager", "goto": goto}
))
return {"messages": messages}
# 財(cái)務(wù)總監(jiān)
def finance_director_agent(state: MessagesState, config: RunnableConfig) -> Dict:
print("財(cái)務(wù)總監(jiān):開(kāi)始審批")
messages = state["messages"]
dept_msg = next((m for m in messages if m.additional_kwargs.get("agent") == "dept_manager"), None)
proposal = json.loads(messages[0].content)
amount = proposal.get("amount", 0.0)
if json.loads(dept_msg.content)["status"] != "Approved by Department Manager":
status = "Rejected"
comment = "財(cái)務(wù)總監(jiān):因部門(mén)經(jīng)理拒絕而跳過(guò)。"
elif amount > 50000:
status = "Rejected"
comment = "財(cái)務(wù)總監(jiān):預(yù)算不足。"
else:
status = "Approved"
comment = "財(cái)務(wù)總監(jiān):批準(zhǔn)且可行。"
print(f"財(cái)務(wù)總監(jiān):審批完成 - {status}")
messages.append(AIMessage(
cnotallow=json.dumps({"status": status, "comment": comment}),
additional_kwargs={"agent": "finance_director", "goto": END}
))
return {"messages": messages}
# 路由函數(shù)
def route_step(state: MessagesState) -> str:
for msg in reversed(state["messages"]):
goto = msg.additional_kwargs.get("goto")
if goto:
print(f"路由:智能體 {msg.additional_kwargs.get('agent')} 設(shè)置跳轉(zhuǎn)到 {goto}")
return goto
return END
# 構(gòu)建 LangGraph
builder = StateGraph(MessagesState)
builder.add_node("team_lead", team_lead_agent)
builder.add_node("dept_manager", dept_manager_agent)
builder.add_node("finance_director", finance_director_agent)
builder.set_entry_point("team_lead")
builder.add_conditional_edges("team_lead", route_step, {
"dept_manager": "dept_manager",
END: END
})
builder.add_conditional_edges("dept_manager", route_step, {
"finance_director": "finance_director",
END: END
})
builder.add_conditional_edges("finance_director", route_step, {
END: END
})
workflow = builder.compile()
# 主運(yùn)行器
def main():
initial_state = {
"messages": [
HumanMessage(
cnotallow=json.dumps({
"title": "New Equipment Purchase",
"amount": 40000.0,
"department": "Engineering"
})
)
]
}
result = workflow.invoke(initial_state)
messages = result["messages"]
proposal = json.loads(messages[0].content)
print("\n=== 審批結(jié)果 ===")
print(f"提案標(biāo)題:{proposal['title']}")
final_status = "Unknown"
comments = []
for msg in messages[1:]:
if isinstance(msg, AIMessage):
try:
data = json.loads(msg.content)
if"status"in data:
final_status = data["status"]
if"comment"in data:
comments.append(data["comment"])
except Exception:
continue
print(f"最終狀態(tài):{final_status}")
print("評(píng)論:")
for comment in comments:
print(f" - {comment}")
if __name__ == "__main__":
main()
(三)循環(huán)模式:持續(xù)改進(jìn)的協(xié)同
循環(huán)模式則像是一個(gè)“精益求精”的過(guò)程。在這種模式下,智能體們會(huì)不斷地迭代,根據(jù)其他智能體的反饋來(lái)改進(jìn)自己的輸出。比如,在代碼編寫(xiě)和測(cè)試的場(chǎng)景中,代碼編寫(xiě)智能體會(huì)根據(jù)測(cè)試智能體的反饋不斷優(yōu)化代碼,直到代碼通過(guò)所有測(cè)試為止。
代碼示例:
from typing import Dict, Any, List
from langgraph.graph import StateGraph, END
from langchain_core.runnables import RunnableConfig
import textwrap
# 定義狀態(tài)
class EvaluationState(Dict[str, Any]):
code: str = ""
feedback: str = ""
passed: bool = False
iteration: int = 0
max_iterations: int = 3
history: List[Dict] = []
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.setdefault("code", "")
self.setdefault("feedback", "")
self.setdefault("passed", False)
self.setdefault("iteration", 0)
self.setdefault("max_iterations", 3)
self.setdefault("history", [])
# 代碼編寫(xiě)智能體
def code_writer_agent(state: EvaluationState, config: RunnableConfig) -> Dict[str, Any]:
print(f"迭代 {state['iteration'] + 1} - 代碼編寫(xiě)智能體:生成代碼")
print(f"迭代 {state['iteration'] + 1} - 代碼編寫(xiě)智能體:收到反饋:{state['feedback']}")
iteration = state["iteration"] + 1
feedback = state["feedback"]
if iteration == 1:
code = textwrap.dedent("""
def factorial(n):
result = 1
for i in range(1, n + 1):
result *= i
return result
""")
writer_feedback = "初始代碼已生成。"
elif"factorial(0)"in feedback.lower():
code = textwrap.dedent("""
def factorial(n):
if n == 0:
return 1
result = 1
for i in range(1, n + 1):
result *= i
return result
""")
writer_feedback = "已修復(fù) n=0 的處理。"
elif"factorial(-1)"in feedback.lower() or"negative"in feedback.lower():
code = textwrap.dedent("""
def factorial(n):
if n < 0:
raise ValueError("Factorial not defined for negative numbers")
if n == 0:
return 1
result = 1
for i in range(1, n + 1):
result *= i
return result
""")
writer_feedback = "已添加對(duì)負(fù)數(shù)輸入的錯(cuò)誤處理。"
else:
code = state["code"]
writer_feedback = "未發(fā)現(xiàn)進(jìn)一步改進(jìn)的內(nèi)容。"
print(f"迭代 {iteration} - 代碼編寫(xiě)智能體:代碼生成完成")
return {
"code": code,
"feedback": writer_feedback,
"iteration": iteration
}
# 代碼測(cè)試智能體
def code_tester_agent(state: EvaluationState, config: RunnableConfig) -> Dict[str, Any]:
print(f"迭代 {state['iteration']} - 代碼測(cè)試智能體:測(cè)試代碼")
code = state["code"]
try:
test_cases = [
(0, 1),
(1, 1),
(5, 120),
(-1, None)
]
namespace = {}
exec(code, namespace)
factorial = namespace.get('factorial')
ifnot callable(factorial):
return {"passed": False, "feedback": "未找到階乘函數(shù)。"}
feedback_parts = []
passed = True
for input_val, expected in test_cases:
try:
result = factorial(input_val)
if expected isNone:
passed = False
feedback_parts.append(f"測(cè)試失敗:factorial({input_val}) 應(yīng)該拋出錯(cuò)誤。")
elif result != expected:
passed = False
feedback_parts.append(f"測(cè)試失敗:factorial({input_val}) 返回 {result},預(yù)期為 {expected}。")
except ValueError as ve:
if expected isnotNone:
passed = False
feedback_parts.append(f"測(cè)試失敗:factorial({input_val}) 意外拋出 ValueError:{str(ve)}")
except Exception as e:
passed = False
feedback_parts.append(f"測(cè)試失敗:factorial({input_val}) 拋出錯(cuò)誤:{str(e)}")
feedback = "所有測(cè)試通過(guò)!"if passed else"\n".join(feedback_parts)
print(f"迭代 {state['iteration']} - 代碼測(cè)試智能體:測(cè)試完成 - {'通過(guò)' if passed else '失敗'}")
history = state["history"]
history.append({
"iteration": state["iteration"],
"code": code,
"feedback": feedback,
"passed": passed
})
return {
"passed": passed,
"feedback": feedback,
"history": history
}
except Exception as e:
print(f"迭代 {state['iteration']} - 代碼測(cè)試智能體:測(cè)試失敗")
return {"passed": False, "feedback": f"測(cè)試錯(cuò)誤:{str(e)}"}
# 條件邊:決定是否繼續(xù)循環(huán)
def should_continue(state: EvaluationState) -> str:
if state["passed"] or state["iteration"] >= state["max_iterations"]:
print(f"迭代 {state['iteration']} - {'循環(huán)停止:測(cè)試通過(guò)' if state['passed'] else '循環(huán)停止:達(dá)到最大迭代次數(shù)'}")
return"end"
print(f"迭代 {state['iteration']} - 循環(huán)繼續(xù):測(cè)試失敗")
return"code_writer"
# 構(gòu)建 LangGraph 工作流
workflow = StateGraph(EvaluationState)
workflow.add_node("code_writer", code_writer_agent)
workflow.add_node("code_tester", code_tester_agent)
workflow.set_entry_point("code_writer")
workflow.add_edge("code_writer", "code_tester")
workflow.add_conditional_edges(
"code_tester",
should_continue,
{
"code_writer": "code_writer",
"end": END
}
)
app = workflow.compile()
# 運(yùn)行工作流
def main():
initial_state = EvaluationState()
result = app.invoke(initial_state)
print("\n=== 評(píng)估結(jié)果 ===")
print(f"最終狀態(tài):{'通過(guò)' if result['passed'] else '失敗'},經(jīng)過(guò) {result['iteration']} 次迭代")
print(f"最終代碼:\n{result['code']}")
print(f"最終反饋:\n{result['feedback']}")
print("\n迭代歷史:")
for attempt in result["history"]:
print(f"迭代 {attempt['iteration']}:")
print(f" 代碼:\n{attempt['code']}")
print(f" 反饋:{attempt['feedback']}")
print(f" 是否通過(guò):{attempt['passed']}\n")
if __name__ == "__main__":
main()
(四)路由器模式:智能分配任務(wù)
路由器模式則像是一個(gè)“任務(wù)分發(fā)中心”。在這種模式下,一個(gè)中央路由器智能體根據(jù)任務(wù)或輸入的內(nèi)容,決定調(diào)用哪些智能體。比如,在一個(gè)客戶支持系統(tǒng)中,路由器可以根據(jù)客戶的問(wèn)題內(nèi)容,將問(wèn)題分配給不同的支持團(tuán)隊(duì),如賬單團(tuán)隊(duì)、技術(shù)支持團(tuán)隊(duì)或普通咨詢團(tuán)隊(duì)。
代碼示例:
from typing import Dict, Any, TypedDict, Literal
from langgraph.graph import StateGraph, END
from langchain_core.runnables import RunnableConfig
import re
import time
# 定義狀態(tài)
class TicketState(TypedDict):
ticket_text: str
category: str
resolution: str
processing_time: float
# 路由器智能體
def router_agent(state: TicketState) -> Dict[str, Any]:
print("路由器智能體:分析工單...")
start_time = time.time()
ticket_text = state["ticket_text"].lower()
if any(keyword in ticket_text for keyword in ["billing", "payment", "invoice", "charge"]):
category = "Billing"
elif any(keyword in ticket_text for keyword in ["technical", "bug", "error", "crash"]):
category = "Technical"
elif any(keyword in ticket_text for keyword in ["general", "question", "inquiry", "info"]):
category = "General"
else:
category = "Unknown"
processing_time = time.time() - start_time
print(f"路由器智能體:分類為 '{category}',耗時(shí) {processing_time:.2f} 秒")
return {
"category": category,
"processing_time": processing_time
}
# 賬單團(tuán)隊(duì)
def billing_team_agent(state: TicketState) -> Dict[str, Any]:
print("賬單團(tuán)隊(duì):處理工單...")
start_time = time.time()
ticket_text = state["ticket_text"]
resolution = f"賬單團(tuán)隊(duì):已審核工單 '{ticket_text}'。請(qǐng)檢查您的賬單詳情或聯(lián)系我們的賬單部門(mén)以獲取進(jìn)一步幫助。"
processing_time = time.time() - start_time
time.sleep(1)
print(f"賬單團(tuán)隊(duì):完成,耗時(shí) {processing_time:.2f} 秒")
return {
"resolution": resolution,
"processing_time": state["processing_time"] + processing_time
}
# 技術(shù)支持團(tuán)隊(duì)
def technical_team_agent(state: TicketState) -> Dict[str, Any]:
print("技術(shù)支持團(tuán)隊(duì):處理工單...")
start_time = time.time()
ticket_text = state["ticket_text"]
resolution = f"技術(shù)支持團(tuán)隊(duì):已審核工單 '{ticket_text}'。請(qǐng)嘗試重啟您的設(shè)備或提交詳細(xì)的錯(cuò)誤日志以供進(jìn)一步調(diào)查。"
processing_time = time.time() - start_time
time.sleep(1.5)
print(f"技術(shù)支持團(tuán)隊(duì):完成,耗時(shí) {processing_time:.2f} 秒")
return {
"resolution": resolution,
"processing_time": state["processing_time"] + processing_time
}
# 普通咨詢團(tuán)隊(duì)
def general_team_agent(state: TicketState) -> Dict[str, Any]:
print("普通咨詢團(tuán)隊(duì):處理工單...")
start_time = time.time()
ticket_text = state["ticket_text"]
resolution = f"普通咨詢團(tuán)隊(duì):已審核工單 '{ticket_text}'。更多信息,請(qǐng)參考我們的常見(jiàn)問(wèn)題解答或通過(guò)電子郵件聯(lián)系我們。"
processing_time = time.time() - start_time
time.sleep(0.8)
print(f"普通咨詢團(tuán)隊(duì):完成,耗時(shí) {processing_time:.2f} 秒")
return {
"resolution": resolution,
"processing_time": state["processing_time"] + processing_time
}
# 手動(dòng)審核團(tuán)隊(duì)
def manual_review_agent(state: TicketState) -> Dict[str, Any]:
print("手動(dòng)審核團(tuán)隊(duì):處理工單...")
start_time = time.time()
ticket_text = state["ticket_text"]
resolution = f"手動(dòng)審核:工單 '{ticket_text}' 無(wú)法分類。標(biāo)記為人工審核。請(qǐng)手動(dòng)分配到適當(dāng)?shù)膱F(tuán)隊(duì)。"
processing_time = time.time() - start_time
time.sleep(0.5)
print(f"手動(dòng)審核團(tuán)隊(duì):完成,耗時(shí) {processing_time:.2f} 秒")
return {
"resolution": resolution,
"processing_time": state["processing_time"] + processing_time
}
# 路由函數(shù)
def route_ticket(state: TicketState) -> Literal["billing_team", "technical_team", "general_team", "manual_review"]:
category = state["category"]
print(f"路由:工單分類為 '{category}'")
if category == "Billing":
return"billing_team"
elif category == "Technical":
return"technical_team"
elif category == "General":
return"general_team"
else:
return"manual_review"
# 構(gòu)建路由器模式的圖
def build_router_graph() -> StateGraph:
workflow = StateGraph(TicketState)
workflow.add_node("router", router_agent)
workflow.add_node("billing_team", billing_team_agent)
workflow.add_node("technical_team", technical_team_agent)
workflow.add_node("general_team", general_team_agent)
workflow.add_node("manual_review", manual_review_agent)
workflow.set_entry_point("router")
workflow.add_conditional_edges(
"router",
route_ticket,
{
"billing_team": "billing_team",
"technical_team": "technical_team",
"general_team": "general_team",
"manual_review": "manual_review"
}
)
workflow.add_edge("billing_team", END)
workflow.add_edge("technical_team", END)
workflow.add_edge("general_team", END)
workflow.add_edge("manual_review", END)
return workflow.compile()
# 運(yùn)行工作流
def main():
test_tickets = [
"I have a billing issue with my last invoice. It seems I was overcharged.",
"My app keeps crashing with a technical error. Please help!",
"I have a general question about your services. Can you provide more info?",
"I need assistance with something unrelated to billing or technical issues."
]
for ticket_text in test_tickets:
initial_state: TicketState = {
"ticket_text": ticket_text,
"category": "",
"resolution": "",
"processing_time": 0.0
}
print(f"\n=== 處理工單:'{ticket_text}' ===")
app = build_router_graph()
start_time = time.time()
result = app.invoke(initial_state, cnotallow=RunnableConfig())
total_time = time.time() - start_time
print("\n=== 工單結(jié)果 ===")
print(f"分類:{result['category']}")
print(f"解決方案:{result['resolution']}")
print(f"總處理時(shí)間:{result['processing_time']:.2f} 秒")
print(f"總耗時(shí):{total_time:.2f} 秒")
print("-" * 50)
if __name__ == "__main__":
main()
(五)聚合模式:整合輸出
聚合模式則像是一個(gè)“信息匯總中心”。在這種模式下,多個(gè)智能體分別完成自己的任務(wù)后,將輸出傳遞給一個(gè)聚合智能體,由聚合智能體將所有結(jié)果整合成一個(gè)最終結(jié)果。比如,在社交媒體情感分析中,可以有多個(gè)智能體分別收集不同平臺(tái)的帖子并進(jìn)行情感分析,最后由聚合智能體生成一份綜合報(bào)告。
代碼示例:
from typing import Dict, Any, TypedDict, List
from langgraph.graph import StateGraph, END
from langchain_core.runnables import RunnableConfig
from textblob import TextBlob
import time
from typing_extensions import Annotated
from operator import add
# 定義狀態(tài)
class SocialMediaState(TypedDict):
twitter_posts: List[str]
instagram_posts: List[str]
reddit_posts: List[str]
twitter_sentiment: Dict[str, float]
instagram_sentiment: Dict[str, float]
reddit_sentiment: Dict[str, float]
final_report: str
processing_time: Annotated[float, add]
# 收集推特帖子
def collect_twitter_posts(state: SocialMediaState) -> Dict[str, Any]:
print("推特智能體:收集帖子...")
start_time = time.time()
posts = [
"Loving the new product from this brand! Amazing quality.",
"Terrible customer service from this brand. Very disappointed."
]
time.sleep(1)
processing_time = time.time() - start_time
print(f"推特智能體:完成,耗時(shí) {processing_time:.2f} 秒")
return {
"twitter_posts": posts,
"processing_time": processing_time
}
# 收集 Instagram 帖子
def collect_instagram_posts(state: SocialMediaState) -> Dict[str, Any]:
print("Instagram 智能體:收集帖子...")
start_time = time.time()
posts = [
"Beautiful design by this brand! #loveit",
"Not impressed with the latest release. Expected better."
]
time.sleep(1.2)
processing_time = time.time() - start_time
print(f"Instagram 智能體:完成,耗時(shí) {processing_time:.2f} 秒")
return {
"instagram_posts": posts,
"processing_time": processing_time
}
# 收集 Reddit 帖子
def collect_reddit_posts(state: SocialMediaState) -> Dict[str, Any]:
print("Reddit 智能體:收集帖子...")
start_time = time.time()
posts = [
"This brand is awesome! Great value for money.",
"Had a bad experience with their support team. Not happy."
]
time.sleep(0.8)
processing_time = time.time() - start_time
print(f"Reddit 智能體:完成,耗時(shí) {processing_time:.2f} 秒")
return {
"reddit_posts": posts,
"processing_time": processing_time
}
# 分析推特情感
def analyze_twitter_sentiment(state: SocialMediaState) -> Dict[str, Any]:
print("推特情感智能體:分析情感...")
start_time = time.time()
posts = state["twitter_posts"]
polarities = [TextBlob(post).sentiment.polarity for post in posts]
avg_polarity = sum(polarities) / len(polarities) if polarities else0.0
time.sleep(0.5)
processing_time = time.time() - start_time
print(f"推特情感智能體:完成,耗時(shí) {processing_time:.2f} 秒")
return {
"twitter_sentiment": {"average_polarity": avg_polarity, "num_posts": len(posts)},
"processing_time": processing_time
}
# 分析 Instagram 情感
def analyze_instagram_sentiment(state: SocialMediaState) -> Dict[str, Any]:
print("Instagram 情感智能體:分析情感...")
start_time = time.time()
posts = state["instagram_posts"]
polarities = [TextBlob(post).sentiment.polarity for post in posts]
avg_polarity = sum(polarities) / len(polarities) if polarities else0.0
time.sleep(0.6)
processing_time = time.time() - start_time
print(f"Instagram 情感智能體:完成,耗時(shí) {processing_time:.2f} 秒")
return {
"instagram_sentiment": {"average_polarity": avg_polarity, "num_posts": len(posts)},
"processing_time": processing_time
}
# 分析 Reddit 情感
def analyze_reddit_sentiment(state: SocialMediaState) -> Dict[str, Any]:
print("Reddit 情感智能體:分析情感...")
start_time = time.time()
posts = state["reddit_posts"]
polarities = [TextBlob(post).sentiment.polarity for post in posts]
avg_polarity = sum(polarities) / len(polarities) if polarities else0.0
time.sleep(0.4)
processing_time = time.time() - start_time
print(f"Reddit 情感智能體:完成,耗時(shí) {processing_time:.2f} 秒")
return {
"reddit_sentiment": {"average_polarity": avg_polarity, "num_posts": len(posts)},
"processing_time": processing_time
}
# 聚合結(jié)果
def aggregate_results(state: SocialMediaState) -> Dict[str, Any]:
print("聚合智能體:生成最終報(bào)告...")
start_time = time.time()
twitter_sentiment = state["twitter_sentiment"]
instagram_sentiment = state["instagram_sentiment"]
reddit_sentiment = state["reddit_sentiment"]
total_posts = (twitter_sentiment["num_posts"] +
instagram_sentiment["num_posts"] +
reddit_sentiment["num_posts"])
weighted_polarity = (
twitter_sentiment["average_polarity"] * twitter_sentiment["num_posts"] +
instagram_sentiment["average_polarity"] * instagram_sentiment["num_posts"] +
reddit_sentiment["average_polarity"] * reddit_sentiment["num_posts"]
) / total_posts if total_posts > 0else0.0
overall_sentiment = ("Positive"if weighted_polarity > 0else
"Negative"if weighted_polarity < 0else"Neutral")
report = (
f"總體情感:{overall_sentiment} (平均極性:{weighted_polarity:.2f})\n"
f"推特情感:{twitter_sentiment['average_polarity']:.2f} (帖子數(shù):{twitter_sentiment['num_posts']})\n"
f"Instagram 情感:{instagram_sentiment['average_polarity']:.2f} (帖子數(shù):{instagram_sentiment['num_posts']})\n"
f"Reddit 情感:{reddit_sentiment['average_polarity']:.2f} (帖子數(shù):{reddit_sentiment['num_posts']})"
)
time.sleep(0.3)
processing_time = time.time() - start_time
print(f"聚合智能體:完成,耗時(shí) {processing_time:.2f} 秒")
return {
"final_report": report,
"processing_time": processing_time
}
# 構(gòu)建聚合模式的圖
def build_aggregator_graph() -> StateGraph:
workflow = StateGraph(SocialMediaState)
workflow.add_node("collect_twitter", collect_twitter_posts)
workflow.add_node("collect_instagram", collect_instagram_posts)
workflow.add_node("collect_reddit", collect_reddit_posts)
workflow.add_node("analyze_twitter", analyze_twitter_sentiment)
workflow.add_node("analyze_instagram", analyze_instagram_sentiment)
workflow.add_node("analyze_reddit", analyze_reddit_sentiment)
workflow.add_node("aggregate", aggregate_results)
workflow.add_node("branch", lambda state: state)
workflow.set_entry_point("branch")
workflow.add_edge("branch", "collect_twitter")
workflow.add_edge("branch", "collect_instagram")
workflow.add_edge("branch", "collect_reddit")
workflow.add_edge("collect_twitter", "analyze_twitter")
workflow.add_edge("collect_instagram", "analyze_instagram")
workflow.add_edge("collect_reddit", "analyze_reddit")
workflow.add_edge("analyze_twitter", "aggregate")
workflow.add_edge("analyze_instagram", "aggregate")
workflow.add_edge("analyze_reddit", "aggregate")
workflow.add_edge("aggregate", END)
return workflow.compile()
# 運(yùn)行工作流
def main():
initial_state: SocialMediaState = {
"twitter_posts": [],
"instagram_posts": [],
"reddit_posts": [],
"twitter_sentiment": {"average_polarity": 0.0, "num_posts": 0},
"instagram_sentiment": {"average_polarity": 0.0, "num_posts": 0},
"reddit_sentiment": {"average_polarity": 0.0, "num_posts": 0},
"final_report": "",
"processing_time": 0.0
}
print("\n開(kāi)始社交媒體情感分析...")
app = build_aggregator_graph()
start_time = time.time()
config = RunnableConfig(parallel=True)
result = app.invoke(initial_state, cnotallow=config)
total_time = time.time() - start_time
print("\n=== 情感分析結(jié)果 ===")
print(result["final_report"])
print(f"\n總處理時(shí)間:{result['processing_time']:.2f} 秒")
print(f"總耗時(shí):{total_time:.2f} 秒")
if __name__ == "__main__":
main()
(六)網(wǎng)絡(luò)模式(Network or Horizontal)
在多智能體系統(tǒng)中,網(wǎng)絡(luò)模式是一種去中心化的架構(gòu),智能體之間以多對(duì)多的方式直接通信,形成一個(gè)分散的網(wǎng)絡(luò)。這種架構(gòu)非常適合那些沒(méi)有明確智能體層級(jí)或調(diào)用順序的任務(wù)。
優(yōu)點(diǎn)與挑戰(zhàn)
優(yōu)點(diǎn):
- 分布式協(xié)作:每個(gè)智能體都可以獨(dú)立運(yùn)行,即使部分智能體失敗,系統(tǒng)仍然可以正常工作。
- 群體決策:智能體之間可以相互協(xié)作,共同做出決策。
挑戰(zhàn):
- 通信管理:智能體之間的通信可能會(huì)變得復(fù)雜,導(dǎo)致效率低下。
- 重復(fù)工作:如果沒(méi)有良好的協(xié)調(diào)機(jī)制,智能體可能會(huì)重復(fù)執(zhí)行相同的工作。
代碼示例:
from typing import Literal
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.types import Command
model = ChatOpenAI()
# 定義智能體 1
def agent_1(state: MessagesState) -> Command[Literal["agent_2", "agent_3", END]]:
response = model.invoke(state["messages"])
next_agent = response.get("next_agent", END)
return Command(
goto=next_agent,
update={"messages": [response["content"]]}
)
# 定義智能體 2
def agent_2(state: MessagesState) -> Command[Literal["agent_1", "agent_3", END]]:
response = model.invoke(state["messages"])
next_agent = response.get("next_agent", END)
return Command(
goto=next_agent,
update={"messages": [response["content"]]}
)
# 定義智能體 3
def agent_3(state: MessagesState) -> Command[Literal["agent_1", "agent_2", END]]:
response = model.invoke(state["messages"])
next_agent = response.get("next_agent", END)
return Command(
goto=next_agent,
update={"messages": [response["content"]]}
)
# 構(gòu)建圖
builder = StateGraph(MessagesState)
builder.add_node(agent_1)
builder.add_node(agent_2)
builder.add_node(agent_3)
builder.add_edge(START, "agent_1")
network = builder.compile()
(七)交接模式(Handoffs)
在多智能體架構(gòu)中,交接模式是一種常見(jiàn)的交互方式,其中一個(gè)智能體在完成自己的任務(wù)后,將控制權(quán)交給另一個(gè)智能體。這種模式允許智能體在執(zhí)行過(guò)程中靈活地將任務(wù)傳遞給其他智能體,甚至可以返回到自己。
代碼示例:
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.types import Command
# 定義一個(gè)智能體
def agent(state: MessagesState) -> Command[Literal["another_agent"]]:
# 假設(shè) get_next_agent 是一個(gè)函數(shù),用于決定下一個(gè)智能體
next_agent = get_next_agent(state)
return Command(
goto=next_agent,
update={"messages": ["Message from current agent"]}
)
(八)監(jiān)督者模式(Supervisor)
監(jiān)督者模式是一種集中式的架構(gòu),其中有一個(gè)中央監(jiān)督者(Supervisor)智能體,負(fù)責(zé)決定下一個(gè)調(diào)用的智能體。這種模式適合需要集中管理和協(xié)調(diào)的場(chǎng)景。
代碼示例:
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.types import Command
model = ChatOpenAI()
# 定義監(jiān)督者
def supervisor(state: MessagesState) -> Command[Literal["agent_1", "agent_2", END]]:
response = model.invoke(state["messages"])
next_agent = response.get("next_agent", END)
return Command(goto=next_agent)
# 定義智能體 1
def agent_1(state: MessagesState) -> Command[Literal["supervisor"]]:
response = model.invoke(state["messages"])
return Command(goto="supervisor", update={"messages": [response]})
# 定義智能體 2
def agent_2(state: MessagesState) -> Command[Literal["supervisor"]]:
response = model.invoke(state["messages"])
return Command(goto="supervisor", update={"messages": [response]})
# 構(gòu)建圖
builder = StateGraph(MessagesState)
builder.add_node(supervisor)
builder.add_node(agent_1)
builder.add_node(agent_2)
builder.add_edge(START, "supervisor")
supervisor_graph = builder.compile()
(九)層次化模式(Hierarchical)
層次化模式是一種樹(shù)狀結(jié)構(gòu),其中高級(jí)智能體(監(jiān)督者)管理低級(jí)智能體。這種架構(gòu)適合大型系統(tǒng),可以清晰地劃分角色和職責(zé)。
代碼示例:
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.types import Command
model = ChatOpenAI()
# 定義團(tuán)隊(duì) 1 的監(jiān)督者
def team_1_supervisor(state: MessagesState) -> Command[Literal["team_1_agent_1", "team_1_agent_2", END]]:
response = model.invoke(state["messages"])
next_agent = response.get("next_agent", END)
return Command(goto=next_agent)
# 定義團(tuán)隊(duì) 1 的智能體 1
def team_1_agent_1(state: MessagesState) -> Command[Literal["team_1_supervisor"]]:
response = model.invoke(state["messages"])
return Command(goto="team_1_supervisor", update={"messages": [response]})
# 定義團(tuán)隊(duì) 1 的智能體 2
def team_1_agent_2(state: MessagesState) -> Command[Literal["team_1_supervisor"]]:
response = model.invoke(state["messages"])
return Command(goto="team_1_supervisor", update={"messages": [response]})
# 構(gòu)建團(tuán)隊(duì) 1 的圖
team_1_builder = StateGraph(MessagesState)
team_1_builder.add_node(team_1_supervisor)
team_1_builder.add_node(team_1_agent_1)
team_1_builder.add_node(team_1_agent_2)
team_1_builder.add_edge(START, "team_1_supervisor")
team_1_graph = team_1_builder.compile()
# 定義頂層監(jiān)督者
def top_level_supervisor(state: MessagesState) -> Command[Literal["team_1_graph", END]]:
response = model.invoke(state["messages"])
next_team = response.get("next_team", END)
return Command(goto=next_team)
# 構(gòu)建頂層圖
top_builder = StateGraph(MessagesState)
top_builder.add_node(top_level_supervisor)
top_builder.add_node("team_1_graph", team_1_graph)
top_builder.add_edge(START, "top_level_supervisor")
top_builder.add_edge("team_1_graph", "top_level_supervisor")
top_graph = top_builder.compile()
(十)自定義工作流(Custom Multi-Agent Workflow)
自定義工作流允許開(kāi)發(fā)者根據(jù)具體需求定義智能體之間的調(diào)用順序。這種模式結(jié)合了顯式控制流和動(dòng)態(tài)控制流的優(yōu)點(diǎn),提供了更高的靈活性。
代碼示例:
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, MessagesState, START
model = ChatOpenAI()
# 定義智能體 1
def agent_1(state: MessagesState):
response = model.invoke(state["messages"])
return {"messages": [response]}
# 定義智能體 2
def agent_2(state: MessagesState):
response = model.invoke(state["messages"])
return {"messages": [response]}
# 構(gòu)建圖
builder = StateGraph(MessagesState)
builder.add_node(agent_1)
builder.add_node(agent_2)
builder.add_edge(START, "agent_1")
builder.add_edge("agent_1", "agent_2")
custom_workflow = builder.compile()
三、智能體之間的通信:協(xié)同的關(guān)鍵
在多智能體系統(tǒng)中,智能體之間的通信是協(xié)同工作的核心。我們需要考慮以下幾個(gè)問(wèn)題:
- 通過(guò)圖狀態(tài)還是工具調(diào)用通信?在大多數(shù)架構(gòu)中,智能體通過(guò)圖狀態(tài)通信。而在工具調(diào)用的架構(gòu)中,通信的內(nèi)容則是工具調(diào)用的參數(shù)。
- 不同狀態(tài)模式的智能體如何通信?如果一個(gè)智能體需要與其他智能體有不同的狀態(tài)模式,可以通過(guò)定義子圖智能體或具有私有輸入狀態(tài)模式的智能體節(jié)點(diǎn)來(lái)實(shí)現(xiàn)。
- 共享消息列表如何通信?最常見(jiàn)的通信方式是通過(guò)共享狀態(tài)通道(通常是消息列表)。智能體可以選擇共享完整的思考過(guò)程(“草稿”),也可以只共享最終結(jié)果。
四、多智能體架構(gòu)的優(yōu)勢(shì)與挑戰(zhàn)
多智能體架構(gòu)為我們解決復(fù)雜任務(wù)提供了強(qiáng)大的工具,但也帶來(lái)了新的挑戰(zhàn)。它通過(guò)并行、順序、路由器和聚合等多種工作流模式,實(shí)現(xiàn)了高效的協(xié)同工作。然而,隨著智能體數(shù)量的增加和任務(wù)復(fù)雜度的提升,如何管理智能體之間的通信、避免重復(fù)工作以及確保系統(tǒng)的可擴(kuò)展性,成為了我們需要面對(duì)的問(wèn)題。
五、結(jié)論
多智能體架構(gòu)為我們提供了一個(gè)全新的視角來(lái)解決復(fù)雜任務(wù)。通過(guò)不同的協(xié)同模式和通信機(jī)制,我們可以構(gòu)建出高效、靈活且可擴(kuò)展的系統(tǒng)。無(wú)論是并行處理、順序執(zhí)行、智能路由還是結(jié)果聚合,多智能體架構(gòu)都能根據(jù)具體需求提供合適的解決方案。在未來(lái),隨著AI技術(shù)的不斷發(fā)展,多智能體架構(gòu)必將在更多領(lǐng)域發(fā)揮更大的作用,為我們的生活和工作帶來(lái)更多的便利和效率。
本文轉(zhuǎn)載自??Halo咯咯?? 作者:基咯咯
