深度學(xué)習(xí)!構(gòu)建基于LangGraph的RAG多智能體研究工具 原創(chuàng)
在當(dāng)今信息爆炸的時(shí)代,快速準(zhǔn)確地獲取知識(shí)變得尤為重要。傳統(tǒng)的問(wèn)答系統(tǒng)雖然能夠處理一些簡(jiǎn)單問(wèn)題,但在面對(duì)復(fù)雜問(wèn)題時(shí)往往顯得力不從心。為了解決這一痛點(diǎn),我們開(kāi)發(fā)了一款基于 LangGraph 的 RAG 多智能體工具,它能夠高效地處理復(fù)雜問(wèn)題,整合多源信息,并通過(guò)迭代步驟得出精準(zhǔn)答案。今天,就讓我們深入了解一下這個(gè)強(qiáng)大的工具。
一、引言:從簡(jiǎn)單的 RAG 到智能的多智能體 RAG
在項(xiàng)目開(kāi)發(fā)初期,我們發(fā)現(xiàn)傳統(tǒng)的“簡(jiǎn)單 RAG”方法存在諸多不足。簡(jiǎn)單 RAG 無(wú)法拆解復(fù)雜問(wèn)題,只能在單一層面處理查詢,無(wú)法深入分析每個(gè)步驟并得出統(tǒng)一結(jié)論;它缺乏對(duì)幻覺(jué)(即模型生成錯(cuò)誤信息)或錯(cuò)誤處理的能力,無(wú)法通過(guò)驗(yàn)證步驟糾正錯(cuò)誤;此外,簡(jiǎn)單 RAG 系統(tǒng)也無(wú)法根據(jù)工作流條件動(dòng)態(tài)使用工具、調(diào)用外部 API 或與數(shù)據(jù)庫(kù)交互。
為了解決這些問(wèn)題,我們引入了多智能體 RAG 研究系統(tǒng)。基于智能體的框架能夠?qū)崿F(xiàn)以下功能:
- 路由和工具使用:路由智能體可以對(duì)用戶的查詢進(jìn)行分類,并將流程引導(dǎo)到合適的節(jié)點(diǎn)或工具。例如,它可以判斷文檔是否需要完整總結(jié)、是否需要更詳細(xì)的信息,或者問(wèn)題是否超出范圍。
- 規(guī)劃子步驟:復(fù)雜查詢通常需要被拆解成更小、更易管理的步驟。從一個(gè)查詢出發(fā),系統(tǒng)可以生成一系列執(zhí)行步驟,以探索查詢的不同方面并得出結(jié)論。比如,如果查詢需要比較文檔的兩個(gè)不同部分,基于智能體的方法可以識(shí)別這種比較需求,分別檢索兩個(gè)來(lái)源,并在最終回應(yīng)中將它們合并為比較分析。
- 反思和錯(cuò)誤糾正:除了簡(jiǎn)單的回應(yīng)生成,智能體方法還可以增加一個(gè)驗(yàn)證步驟,以應(yīng)對(duì)潛在的幻覺(jué)、錯(cuò)誤或未能準(zhǔn)確回答用戶查詢的回應(yīng)。此外,它還能夠整合人工參與的自我糾正機(jī)制,將人類輸入融入自動(dòng)化流程。這種功能使基于智能體的 RAG 系統(tǒng)成為企業(yè)應(yīng)用中更穩(wěn)健、更可靠的解決方案,因?yàn)樵谄髽I(yè)場(chǎng)景中,可靠性至關(guān)重要。
- 共享全局狀態(tài):智能體工作流共享一個(gè)全局狀態(tài),簡(jiǎn)化了跨多步驟的狀態(tài)管理。這個(gè)共享狀態(tài)對(duì)于維持多智能體過(guò)程不同階段的一致性至關(guān)重要。
二、項(xiàng)目概覽:構(gòu)建智能問(wèn)答的“大腦”
(一)系統(tǒng)架構(gòu)圖
我們的系統(tǒng)包含兩個(gè)核心部分:研究者子圖和主圖。研究者子圖負(fù)責(zé)生成用于檢索和重排向量數(shù)據(jù)庫(kù)中 top-k 文檔的不同查詢;主圖則包含主要工作流程,例如分析用戶查詢、生成完成任務(wù)所需的步驟、生成回應(yīng),并通過(guò)人工參與機(jī)制檢查幻覺(jué)。
(二)文檔處理與向量數(shù)據(jù)庫(kù)構(gòu)建
1. 文檔解析
對(duì)于結(jié)構(gòu)復(fù)雜的 PDF 文檔,尤其是包含復(fù)雜布局的表格,選擇合適的解析工具至關(guān)重要。許多庫(kù)在處理復(fù)雜頁(yè)面布局或表格結(jié)構(gòu)的 PDF 時(shí)精度不足。為此,我們采用了 Docling 這一開(kāi)源庫(kù),它能夠高效地解析文檔,并將內(nèi)容導(dǎo)出為所需格式。Docling 支持從 PDF、DOCX、PPTX、XLSX、圖片、HTML、AsciiDoc 和 Markdown 等多種常用文檔格式讀取和導(dǎo)出 Markdown 和 JSON 格式。它對(duì) PDF 文檔有全面的理解,包括表格結(jié)構(gòu)、閱讀順序和頁(yè)面布局,還支持對(duì)掃描 PDF 的 OCR 功能。
以下是使用 Docling 將 PDF 轉(zhuǎn)換為 Markdown 格式的代碼示例:
from docling.document_converter import DocumentConverter
logger.info("Starting document processing.")
converter = DocumentConverter()
markdown_document = converter.convert(source).document.export_to_markdown()
2. 向量數(shù)據(jù)庫(kù)構(gòu)建
我們使用 Chroma 構(gòu)建向量數(shù)據(jù)庫(kù),將句子存儲(chǔ)為向量嵌入,并在數(shù)據(jù)庫(kù)中進(jìn)行搜索。我們將持久化數(shù)據(jù)庫(kù)存儲(chǔ)在本地目錄 “db_vector” 中。通過(guò) OpenAI 的嵌入模型,我們將文檔列表轉(zhuǎn)換為向量,并存儲(chǔ)在 Chroma 中。
以下是構(gòu)建向量數(shù)據(jù)庫(kù)的代碼:
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
embd = OpenAIEmbeddings()
vectorstore_from_documents = Chroma.from_documents(
documents=docs_list,
collection_name="rag-chroma-google-v1",
embedding=embd,
persist_directory='db_vector'
)
(三)主圖構(gòu)建與狀態(tài)管理
LangGraph 的核心概念之一是狀態(tài)。每個(gè)圖執(zhí)行都會(huì)創(chuàng)建一個(gè)狀態(tài),該狀態(tài)在圖的節(jié)點(diǎn)執(zhí)行時(shí)傳遞,并在每個(gè)節(jié)點(diǎn)執(zhí)行后更新內(nèi)部狀態(tài)。
我們定義了兩個(gè)類:Router 和 GradeHallucinations,分別用于存儲(chǔ)用戶查詢的分類結(jié)果和回應(yīng)中幻覺(jué)的存在與否。基于這些狀態(tài),我們構(gòu)建了輸入狀態(tài)(InputState)和智能體狀態(tài)(AgentState),其中 AgentState 包含用戶查詢的分類、研究計(jì)劃的步驟列表、智能體可以引用的檢索文檔列表,以及幻覺(jué)的二進(jìn)制評(píng)分。
以下是狀態(tài)類的定義代碼:
from pydantic import BaseModel, Field
from typing import Literal, TypedDict
class Router(TypedDict):
"""Classify user query."""
logic: str
type: Literal["more-info", "environmental", "general"]
class GradeHallucinations(BaseModel):
"""Binary score for hallucination present in generation answer."""
binary_score: str = Field(descriptinotallow="Answer is grounded in the facts, '1' or '0'")
(四)工作流程詳解
1. 第一步:分析和路由查詢
這一步會(huì)更新智能體狀態(tài)中的 Router 對(duì)象,其類型變量包含 “more-info”、“environmental” 或 “general” 中的一個(gè)值。根據(jù)這個(gè)信息,工作流將被路由到合適的節(jié)點(diǎn),例如 “create_research_plan”、“ask_for_more_info” 或 “respond_to_general_query”。
以下是實(shí)現(xiàn)代碼:
async def analyze_and_route_query(
state: AgentState, *, config: RunnableConfig
) -> dict[str, Router]:
"""Analyze the user's query and determine the appropriate routing."""
model = ChatOpenAI(model=GPT_4o, temperature=TEMPERATURE, streaming=True)
messages = [
{"role": "system", "content": ROUTER_SYSTEM_PROMPT}
] + state.messages
logging.info("---ANALYZE AND ROUTE QUERY---")
response = cast(
Router, await model.with_structured_output(Router).ainvoke(messages)
)
return {"router": response}
2. 第二步:創(chuàng)建研究計(jì)劃
如果查詢分類返回 “environmental”,用戶的請(qǐng)求與文檔相關(guān),工作流將到達(dá) “create_research_plan” 節(jié)點(diǎn),該節(jié)點(diǎn)的功能是為回答與環(huán)境相關(guān)的查詢創(chuàng)建逐步研究計(jì)劃。
以下是實(shí)現(xiàn)代碼:
async def create_research_plan(
state: AgentState, *, config: RunnableConfig
) -> dict[str, list[str] | str]:
"""Create a step-by-step research plan for answering an environmental-related query."""
class Plan(TypedDict):
"""Generate research plan."""
steps: list[str]
model = ChatOpenAI(model=GPT_4o_MINI, temperature=TEMPERATURE, streaming=True)
messages = [
{"role": "system", "content": RESEARCH_PLAN_SYSTEM_PROMPT}
] + state.messages
logging.info("---PLAN GENERATION---")
response = cast(Plan, await model.with_structured_output(Plan).ainvoke(messages))
return {"steps": response["steps"], "documents": "delete"}
3. 第三步:開(kāi)展研究
這一步會(huì)從研究計(jì)劃中取出第一個(gè)步驟,并調(diào)用研究者子圖來(lái)執(zhí)行研究。研究者子圖會(huì)返回一系列文檔片段,我們將在后續(xù)步驟中進(jìn)一步處理。
以下是實(shí)現(xiàn)代碼:
async def conduct_research(state: AgentState) -> dict[str, Any]:
"""Execute the first step of the research plan."""
result = await researcher_graph.ainvoke({"question": state.steps[0]}) # graph call directly
docs = result["documents"]
step = state.steps[0]
logging.info(f"\n{len(docs)} documents retrieved in total for the step: {step}.")
return {"documents": result["documents"], "steps": state.steps[1:]}
4. 第四步:研究者子圖構(gòu)建
研究者子圖包含查詢生成和文檔檢索兩個(gè)關(guān)鍵步驟。查詢生成步驟會(huì)根據(jù)研究計(jì)劃中的問(wèn)題生成多個(gè)搜索查詢,以幫助回答問(wèn)題。文檔檢索步驟則使用混合搜索和 Cohere 重排技術(shù),從向量數(shù)據(jù)庫(kù)中檢索相關(guān)文檔。
以下是查詢生成的代碼:
async def generate_queries(
state: ResearcherState, *, config: RunnableConfig
) -> dict[str, list[str]]:
"""Generate search queries based on the question."""
class Response(TypedDict):
queries: list[str]
logger.info("---GENERATE QUERIES---")
model = ChatOpenAI(model="gpt-4o-mini-2024-07-18", temperature=0)
messages = [
{"role": "system", "content": GENERATE_QUERIES_SYSTEM_PROMPT},
{"role": "human", "content": state.question},
]
response = cast(Response, await model.with_structured_output(Response).ainvoke(messages))
queries = response["queries"]
queries.append(state.question)
logger.info(f"Queries: {queries}")
return {"queries": response["queries"]}
以下是文檔檢索和重排的代碼:
def _setup_vectorstore() -> Chroma:
"""Set up and return the Chroma vector store instance."""
embeddings = OpenAIEmbeddings()
return Chroma(
collection_name=VECTORSTORE_COLLECTION,
embedding_functinotallow=embeddings,
persist_directory=VECTORSTORE_DIRECTORY
)
# Create base retrievers
retriever_bm25 = BM25Retriever.from_documents(documents, search_kwargs={"k": TOP_K})
retriever_vanilla = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": TOP_K})
retriever_mmr = vectorstore.as_retriever(search_type="mmr", search_kwargs={"k": TOP_K})
ensemble_retriever = EnsembleRetriever(
retrievers=[retriever_vanilla, retriever_mmr, retriever_bm25],
weights=ENSEMBLE_WEIGHTS,
)
# Set up Cohere re-ranking
compressor = CohereRerank(top_n=2, model="rerank-english-v3.0")
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=ensemble_retriever,
)
5. 第五步:檢查是否完成
這一步通過(guò)檢查研究計(jì)劃中是否還有剩余步驟來(lái)確定研究過(guò)程是否完成。如果還有步驟,工作流將返回 “conduct_research” 節(jié)點(diǎn)繼續(xù)執(zhí)行;如果沒(méi)有剩余步驟,則進(jìn)入 “respond” 節(jié)點(diǎn)生成最終回應(yīng)。
以下是實(shí)現(xiàn)代碼:
def check_finished(state: AgentState) -> Literal["respond", "conduct_research"]:
"""Determine if the research process is complete."""
if len(state.steps or []) > 0:
return "conduct_research"
else:
return "respond"
6. 第六步:生成回應(yīng)
這一步根據(jù)研究過(guò)程中檢索到的文檔和對(duì)話歷史,生成對(duì)用戶查詢的最終回應(yīng)。它利用語(yǔ)言模型將所有相關(guān)信息整合成一個(gè)全面的答案。
以下是實(shí)現(xiàn)代碼:
async def respond(
state: AgentState, *, config: RunnableConfig
) -> dict[str, list[BaseMessage]]:
"""Generate the final response to the user's query."""
model = ChatOpenAI(model="gpt-4o-2024-08-06", temperature=0)
context = format_docs(state.documents)
prompt = RESPONSE_SYSTEM_PROMPT.format(cnotallow=context)
messages = [{"role": "system", "content": prompt}] + state.messages
response = await model.ainvoke(messages)
return {"messages": [response]}
7. 第七步:檢查幻覺(jué)
這一步會(huì)分析語(yǔ)言模型生成的回應(yīng),判斷其是否得到了檢索到的文檔事實(shí)的支持,并給出一個(gè)二進(jìn)制評(píng)分結(jié)果。如果評(píng)分表明回應(yīng)可能包含幻覺(jué),工作流將被中斷,并提示用戶決定是否重新生成回應(yīng)或結(jié)束流程。
以下是實(shí)現(xiàn)代碼:
async def check_hallucinations(
state: AgentState, *, config: RunnableConfig
) -> dict[str, Any]:
"""Analyze the response for hallucinations."""
model = ChatOpenAI(model=GPT_4o_MINI, temperature=TEMPERATURE, streaming=True)
system_prompt = CHECK_HALLUCINATIONS.format(
documents=state.documents,
generatinotallow=state.messages[-1]
)
messages = [
{"role": "system", "content": system_prompt}
] + state.messages
logging.info("---CHECK HALLUCINATIONS---")
response = cast(GradeHallucinations, await model.with_structured_output(GradeHallucinations).ainvoke(messages))
return {"hallucination": response}
8. 第八步:人工審批(人工參與)
如果語(yǔ)言模型的回應(yīng)未得到事實(shí)支持,可能包含幻覺(jué),此時(shí)工作流將暫停,并將控制權(quán)交給用戶。用戶可以選擇僅重新執(zhí)行最后的生成步驟,而無(wú)需重新啟動(dòng)整個(gè)工作流,或者選擇結(jié)束流程。這種人工參與機(jī)制確保了用戶對(duì)整個(gè)過(guò)程的控制,避免了不必要的循環(huán)或不期望的操作。
以下是實(shí)現(xiàn)代碼:
def human_approval(state: AgentState):
_binary_score = state.hallucination.binary_score
if _binary_score == "1":
return"END"
else:
retry_generation = interrupt(
{
"question": "Is this correct?",
"llm_output": state.messages[-1]
}
)
if retry_generation == "y":
print("Continue with retry...")
return"respond"
else:
return"END"
三、實(shí)戰(zhàn)測(cè)試:多智能體 RAG 的強(qiáng)大能力
為了驗(yàn)證系統(tǒng)的性能,我們使用了一份關(guān)于谷歌環(huán)境可持續(xù)性戰(zhàn)略的年度報(bào)告進(jìn)行了測(cè)試。這份報(bào)告包含了豐富的數(shù)據(jù)和復(fù)雜的表格結(jié)構(gòu),非常適合用來(lái)測(cè)試系統(tǒng)的多步驟處理能力和文檔解析功能。
(一)復(fù)雜問(wèn)題測(cè)試
我們提出了一個(gè)復(fù)雜的問(wèn)題:“檢索新加坡第二個(gè)數(shù)據(jù)中心 2019 年和 2022 年的 PUE 效率值,以及 2023 年亞太地區(qū)的區(qū)域平均 CFE 值。”
系統(tǒng)成功地將這個(gè)問(wèn)題拆解為多個(gè)步驟,并生成了相應(yīng)的查詢:
- “查找新加坡第二個(gè)數(shù)據(jù)中心 2019 年和 2022 年的 PUE 效率值。”
- “查找 2023 年亞太地區(qū)的區(qū)域平均 CFE 值。”
通過(guò)檢索和重排文檔,系統(tǒng)最終給出了準(zhǔn)確的答案:
- 新加坡第二個(gè)數(shù)據(jù)中心 2019 年的 PUE 效率值未提供,但 2022 年的 PUE 為 1.21。
- 2023 年亞太地區(qū)的區(qū)域平均 CFE 為 12%。
(二)與 ChatGPT 的對(duì)比測(cè)試
為了進(jìn)一步驗(yàn)證系統(tǒng)的可靠性,我們將同樣的問(wèn)題提交給了 ChatGPT。結(jié)果發(fā)現(xiàn),ChatGPT 返回的值是錯(cuò)誤的,明顯出現(xiàn)了幻覺(jué)現(xiàn)象。這表明,在處理復(fù)雜問(wèn)題時(shí),簡(jiǎn)單的語(yǔ)言模型可能會(huì)生成不準(zhǔn)確的信息,而我們的多智能體 RAG 系統(tǒng)通過(guò)幻覺(jué)檢查步驟能夠有效避免這種情況。
四、技術(shù)挑戰(zhàn)與展望:多智能體 RAG 的未來(lái)之路
盡管多智能體 RAG 在性能上有顯著提升,但在實(shí)際應(yīng)用中仍面臨一些挑戰(zhàn):
- 延遲問(wèn)題:由于智能體交互的復(fù)雜性增加,響應(yīng)時(shí)間可能會(huì)變長(zhǎng)。如何在速度和準(zhǔn)確性之間取得平衡是一個(gè)關(guān)鍵挑戰(zhàn)。
- 評(píng)估與可觀測(cè)性:隨著多智能體 RAG 系統(tǒng)變得越來(lái)越復(fù)雜,持續(xù)的評(píng)估和可觀測(cè)性變得必不可少。
總的來(lái)說(shuō),多智能體 RAG 是人工智能領(lǐng)域的一項(xiàng)重大突破。它將大型語(yǔ)言模型的能力與自主推理和信息檢索相結(jié)合,引入了一種新的智能和靈活性標(biāo)準(zhǔn)。隨著人工智能的不斷發(fā)展,多智能體 RAG 將在各個(gè)行業(yè)中發(fā)揮基礎(chǔ)性作用,徹底改變我們使用技術(shù)的方式。
本文轉(zhuǎn)載自公眾號(hào)Halo咯咯 作者:基咯咯
