使用 LangChain、LangGraph 和 RAGAS 構建復雜的 RAG 系統
引言
想打造一個生產就緒的 RAG(Retrieval-Augmented Generation)系統?那可不是件簡單的事兒!得一步步來,精心設計,反復迭代。咱們得先把數據收拾干凈,然后試試不同的分塊策略——邏輯分塊和傳統分塊都得試試,找到最適合你的場景。接著,還要匿名化數據,減少那些模型“胡思亂想”的情況(也就是所謂的 hallucination)。為了讓檢索更精準,可以用子圖(subgraphs)來聚焦最相關的信息,過濾掉那些沒用的“噪音”。在檢索層之上,還得加個計劃和執行系統,靠 LLM(大語言模型)驅動,像是派了個智能體,邊干邊學,決定下一步咋走。最后,系統生成回答后,咱們得用一堆指標來評估它表現如何。
這篇博客會帶你從頭開始,手把手教你用 LangChain、LangGraph 和 RAGAS(評估框架)構建一個完整的 RAG 系統,模擬真實世界的挑戰,展示開發者在打造 RAG 機器人時會遇到的實際問題和解決方案。所有代碼都可以在 GitHub 倉庫里找到:https://github.com/FareedKhan-dev/complex-RAG-guide
目錄
? 理解 RAG 管道
? 環境配置
? 數據拆分(傳統/邏輯)
? 數據清洗
? 數據重組
? 數據向量化
? 創建上下文檢索器
? 過濾無關信息
? 查詢重寫
? 鏈式推理(Chain-of-Thought, COT)
? 相關性和事實核查
? 測試 RAG 管道
? 使用 LangGraph 可視化 RAG 管道
? 子圖方法與提煉驗證
? 創建檢索與提煉子圖
? 創建減少幻覺的子圖
? 創建并測試計劃執行器
? 重新規劃邏輯
? 創建任務處理器
? 輸入問題的匿名化/去匿名化
? 編譯與可視化 RAG 管道
? 測試最終管道
? 使用 RAGAS 評估
? 總結
理解 RAG 管道
在動手寫代碼之前,咱們先來“畫”一張 RAG 管道的藍圖,方便后面逐步拆解每個部分。
首先,調用 anonymize_question,把具體名字(比如“Harry Potter”“Voldemort”)替換成占位符(Person X, Villain Y),避免 LLM 因預訓練知識產生偏見。
接著,規劃器(planner)會制定一個高層次策略。比如,問題“How did X defeat Y?”可能會被規劃為:
1. 識別 X 和 Y
2. 找到他們的最終對決
3. 分析 X 的行動
4. 起草答案
然后,de_anonymize_plan 把占位符換回原名,讓計劃更具體。更新后的計劃交給 break_down_plan,將每個高層次步驟拆成具體任務。
task_handler 再為每個任務選擇合適的工具,比如:
- ?chosen_tool_is_retrieve_quotes:找具體對話或引用
- ?chosen_tool_is_retrieve_chunks:獲取通用信息和上下文
- ?chosen_tool_is_retrieve_summaries:總結整章內容
- ?chosen_tool_is_answer:當足夠上下文時直接回答
用完檢索工具(retrieve_book_quotes、retrieve_chunks 或 retrieve_summaries)后,新信息會送去 replan,它會根據進展、目標和新輸入決定是否更新計劃。
這個循環(task_handler -> 工具 -> replan)一直重復,直到系統判斷問題可以直接回答(can_be_answered_already)。然后,get_final_answer 綜合所有證據生成最終回答。
最后,用 eval_using_RAGAS 檢查回答的準確性和來源忠實度。如果通過,流程以 __end__ 結束,輸出一個經過驗證、推理充分的答案。
環境配置
LangChain、LangGraph 這些模塊加起來是個完整的架構,所以咱們得按需導入,避免一下子加載太多東西,方便學習。
第一步是設置環境變量,存放 API 密鑰等敏感信息:
# 設置 OpenAI API 密鑰(用于 OpenAI LLMs)
os.environ["OPENAI_API_KEY"] = os.getenv('OPENAI_API_KEY')
# 設置 Together API 密鑰(用于 Together AI 模型)
os.environ["TOGETHER_API_KEY"] = os.getenv('TOGETHER_API_KEY')
# 獲取 Groq API 密鑰(用于 Groq LLMs)
groq_api_key = os.getenv('GROQ_API_KEY')
這里用了兩個 AI 模型提供商:Together AI 提供開源模型,成本低,性價比高;Groq 能生成結構化輸出。如果你的 prompt 模板寫得好,能引導 LLM 輸出結構化結果,甚至可以不用 Groq,完全依賴 Together AI 或 Hugging Face 本地模型,畢竟 LangChain 生態功能很強大。
數據拆分(傳統/邏輯)
要開始,得先有數據集。RAG 管道通常處理大量原始文本數據,比如 PDF、CSV 或 TXT 格式。但這些數據往往需要大量清洗,每個文件可能得用不同方法。
咱們用《哈利·波特》系列作為數據集,因為它很貼近現實場景,包含各種字符串格式問題。你可以從這里下載書。下載后,就可以開始拆分文檔了。
定義 PDF 路徑:
book_path = "Harry Potter - Book 1 - The Sorcerers Stone.pdf"
在預處理或清洗數據之前,最重要的一步是按邏輯和傳統方式拆分文檔。
對于《哈利·波特》,按章節拆分是最自然的邏輯方式。咱們先把 PDF 加載成一個完整的文本:
import re
import PyPDF2
from langchain.docstore.document import Document
with open(book_path, 'rb') as pdf_file:
pdf_reader = PyPDF2.PdfReader(pdf_file)
full_text = " ".join([page.extract_text() for page in pdf_reader.pages])
然后,用正則表達式按章節標題拆分:
chapter_sections = re.split(r'(CHAPTER\s[A-Z]+(?:\s[A-Z]+)*)', full_text)
為每個章節創建 Document 對象:
chapters = []
for i in range(1, len(chapter_sections), 2):
chapter_text = chapter_sections[i] + chapter_sections[i + 1]
doc = Document(page_content=chapter_text, metadata={"chapter": i // 2 + 1})
chapters.append(doc)
print(f"總共提取的章節數: {len(chapters)}")
輸出:
總共提取的章節數: 17
除了章節,引用(quotes)也是重要的斷點,因為它們往往概括了關鍵信息。對于金融文檔,表格或財務報表可能是關鍵斷點。咱們再按引用拆分:
quote_pattern_longer_than_min_length = re.compile(rf'"(.{{{min_length},}}?)"', re.DOTALL)
book_quotes_list = []
min_length = 50
for doc in tqdm(chapters, desc="提取引用"):
content = doc.page_content
found_quotes = quote_pattern_longer_than_min_length.findall(content)
for quote in found_quotes:
quote_doc = Document(page_content=quote)
book_quotes_list.append(quote_doc)
print(f"總共提取的引用數: {len(book_quotes_list)}")
print(f"隨機引用內容: {book_quotes_list[5].page_content[:500]}...")
輸出:
總共提取的引用數: 1337
隨機引用內容: Most mysterious. And now, over to JimMcGuffin ...
最后,用傳統的分塊方法:
from langchain.text_splitter import RecursiveCharacterTextSplitter
chunk_size = 1000
chunk_overlap = 200
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size, chunk_overlap=chunk_overlap, length_function=len
)
document_splits = text_splitter.split_documents(documents)
print(f"分塊后的文檔數: {len(document_splits)}")
輸出:
分塊后的文檔數: 612
這樣,我們按章節、引用和傳統分塊三種方式拆分了數據,接下來開始清洗。
數據清洗
看看第一個章節的內容,發現字母之間有額外的空格(\t 制表符),得用正則表達式清理掉:
print(f"第一個章節內容: {chapters[0].page_content[:500]}...")
輸出:
第一個章節內容: CHAPTER ONE
THE BOY WHO LIVED
M
r. and M r s. D u r s l e y , o f n u m b e r ...
清理制表符:
tab_pattern = re.compile(r'\t')
for doc in chapters:
doc.page_content = tab_pattern.sub(' ', doc.page_content)
print(f"清理后的第一個章節內容: {chapters[0].page_content[:500]}...")
輸出:
清理后的第一個章節內容: CHAPTER ONE
THE BOY WHO LIVED
M
r. and Mrs. Dursley, of number f ...
還有換行符和多余空格,得繼續處理:
multiple_newlines_pattern = re.compile(r'\n\s*\n')
word_split_newline_pattern = re.compile(r'(\w)\n(\w)')
multiple_spaces_pattern = re.compile(r' +')
for doc in chapters:
page_content = multiple_newlines_pattern.sub('\n', doc.page_content)
page_content = word_split_newline_pattern.sub(r'\1\2', page_content)
page_content = page_content.replace('\n', ' ')
page_content = multiple_spaces_pattern.sub(' ', page_content)
doc.page_content = page_content
print(f"最終清理的章節內容: {chapters[15].page_content[:500]}...")
輸出:
最終清理的章節內容:
THE BOY WHO LIVED
Mr. and Mrs. Dursley, of number f ...
對傳統分塊數據也做同樣處理:
for doc in document_splits:
doc.page_content = tab_pattern.sub(' ', doc.page_content)
doc.page_content = multiple_newlines_pattern.sub('\n', doc.page_content)
doc.page_content = word_split_newline_pattern.sub(r'\1\2', doc.page_content)
doc.page_content = multiple_spaces_pattern.sub(' ', doc.page_content)
分析數據:
chapter_word_counts = [len(doc.page_content.split()) for doc in chapters]
max_words = max(chapter_word_counts)
min_words = min(chapter_word_counts)
average_words = sum(chapter_word_counts) / len(chapter_word_counts)
print(f"章節最大詞數: {max_words}")
print(f"章節最小詞數: {min_words}")
print(f"章節平均詞數: {average_words:.2f}")
輸出:
章節最大詞數: 6343
章節最小詞數: 2915
章節平均詞數: 4402.18
章節詞數都在 LLM 上下文窗口限制內,暫時沒問題。
數據重組
引用數據已經很精簡,但章節數據量大,包含很多不必要的對話??梢杂?LLM 總結章節,保留關鍵信息:
from langchain.prompts import PromptTemplate
template = """Write an extensive summary of the following:
{text}
SUMMARY:"""
summarization_prompt = PromptTemplate(
template=template,
input_variables=["text"]
)
chain = load_summarize_chain(deepseek_v3, chain_type="stuff", prompt=summarization_prompt)
chapter_summaries = []
for chapter in chapters:
summary = chain.invoke([chapter])
cleaned_text = re.sub(r'\n\n', '\n', summary["output_text"])
doc_summary = Document(page_content=cleaned_text, metadata=chapter.metadata)
chapter_summaries.append(doc_summary)
這里用 stuff 鏈類型,因為章節最大詞數(6K)在 DeepSeek V3 的上下文窗口內。如果數據超限,可以用 map_reduce 或 refine 鏈類型。
數據向量化
用 ML2 BERT 模型(32k 上下文窗口)向量化數據,用 FAISS 存儲:
from langchain.vectorstores import FAISS
book_splits_vectorstore = FAISS.from_documents(document_splits, m2_bert_80M_32K)
chapter_summaries_vectorstore = FAISS.from_documents(chapter_summaries, m2_bert_80M_32K)
quotes_vectorstore = FAISS.from_documents(book_quotes_list, m2_bert_80M_32K)
quotes_vectorstore.save_local("quotes_vectorstore")
可以加載本地向量數據庫:
quotes_vectorstore = FAISS noctua2_bert_80M_32K, allow_dangerous_deserialization=True)
創建上下文檢索器
為每個數據集(章節摘要、引用、傳統分塊)創建檢索器:
book_chunks_retriever = book_splits_vectorstore.as_retriever(search_kwargs={"k": 1})
chapter_summaries_retriever = chapter_summaries_vectorstore.as_retriever(search_kwargs={"k": 1})
book_quotes_retriever = quotes_vectorstore.as_retriever(search_kwargs={"k": 10})
defretrieve_context_per_question(state):
question = state["question"]
docs = book_chunks_retriever.get_relevant_documents(question)
context = " ".join(doc.page_content for doc in docs)
docs_summaries = chapter_summaries_retriever.get_relevant_documents(state["question"])
context_summaries = " ".join(f"{doc.page_content} (Chapter {doc.metadata['chapter']})"for doc in docs_summaries)
docs_book_quotes = book_quotes_retriever.get_relevant_documents(state["question"])
book_qoutes = " ".join(doc.page_content for doc in docs_book_quotes)
all_contexts = context + context_summaries + book_qoutes
all_contexts = all_contexts.replace('"', '\\"').replace("'", "\\'")
return {"context": all_contexts, "question": question}
過濾無關信息
用 LLM 過濾無關內容:
keep_only_relevant_content_prompt_template = """
You receive a query: {query} and retrieved documents: {retrieved_documents} from a vector store.
You need to filter out all the non-relevant information that does not supply important information regarding the {query}.
Your goal is to filter out the non-relevant information only.
You can remove parts of sentences that are not relevant to the query or remove whole sentences that are not relevant to the query.
DO NOT ADD ANY NEW INFORMATION THAT IS NOT IN THE RETRIEVED DOCUMENTS.
Output the filtered relevant content.
"""
classKeepRelevantContent(BaseModel):
relevant_content: str = Field(description="The relevant content from the retrieved documents that is relevant to the query.")
keep_only_relevant_content_prompt = PromptTemplate(
template=keep_only_relevant_content_prompt_template,
input_variables=["query", "retrieved_documents"],
)
keep_only_relevant_content_llm = ChatTogether(
temperature=0,
model_name="meta-llama/Llama-3.3-70B-Instruct-Turbo-Free",
api_key=together_api_key,
max_tokens=2000
)
keep_only_relevant_content_chain = (
keep_only_relevant_content_prompt
| keep_only_relevant_content_llm.with_structured_output(KeepRelevantContent)
)
defkeep_only_relevant_content(state):
question = state["question"]
context = state["context"]
input_data = {"query": question, "retrieved_documents": context}
print("保留僅相關內容...")
output = keep_only_relevant_content_chain.invoke(input_data)
relevant_content = output.relevant_content
relevant_content = "".join(relevant_content)
relevant_content = relevant_content.replace('"', '\\"').replace("'", "\\'")
return {"relevant_context": relevant_content, "context": context, "question": question}
查詢重寫
用戶查詢可能不夠明確,需用 LLM 重寫:
class RewriteQuestion(BaseModel):
rewritten_question: str = Field(description="優化后的查詢")
explanation: str = Field(description="重寫說明")
rewrite_question_string_parser = JsonOutputParser(pydantic_object=RewriteQuestion)
rewrite_llm = ChatGroq(
temperature=0,
model_name="llama3-70b-8192",
groq_api_key=groq_api_key,
max_tokens=4000
)
rewrite_prompt_template = """You are a question re-writer that converts an input question to a better version optimized for vectorstore retrieval.
Analyze the input question {question} and try to reason about the underlying semantic intent / meaning.
{format_instructions}
"""
rewrite_prompt = PromptTemplate(
template=rewrite_prompt_template,
input_variables=["question"],
partial_variables={"format_instructions": rewrite_question_string_parser.get_format_instructions()},
)
question_rewriter = rewrite_prompt | rewrite_llm | rewrite_question_string_parser
defrewrite_question(state):
question = state["question"]
print("重寫查詢...")
result = question_rewriter.invoke({"question": question})
new_question = result["rewritten_question"]
return {"question": new_question}
鏈式推理(COT)
用鏈式推理(Chain-of-Thought, COT)提高回答質量:
class QuestionAnswerFromContext(BaseModel):
answer_based_on_content: str = Field(description="基于上下文的回答")
question_answer_from_context_llm = ChatTogether(
temperature=0,
model_name="meta-llama/Llama-3.3-70B-Instruct-Turbo-Free",
api_key=together_api_key,
max_tokens=2000
)
question_answer_cot_prompt_template = """
Chain-of-Thought Reasoning Examples
Example 1
Context: Mary is taller than Jane. Jane is shorter than Tom. Tom is the same height as David.
Question: Who is the tallest person?
Reasoning:
Mary > Jane
Jane < Tom → Tom > Jane
Tom = David
So: Mary > Tom = David > Jane
Final Answer: Mary
...
Context: {context}
Question: {question}
"""
question_answer_from_context_cot_prompt = PromptTemplate(
template=question_answer_cot_prompt_template,
input_variables=["context", "question"],
)
question_answer_from_context_cot_chain = (
question_answer_from_context_cot_prompt
| question_answer_from_context_llm.with_structured_output(QuestionAnswerFromContext)
)
defanswer_question_from_context(state):
question = state["question"]
context = state["aggregated_context"] if"aggregated_context"in state else state["context"]
input_data = {"question": question, "context": context}
print("從檢索上下文回答問題...")
output = question_answer_from_context_cot_chain.invoke(input_data)
answer = output.answer_based_on_content
print(f'回答(未檢查幻覺): {answer}')
return {"answer": answer, "context": context, "question": question}
相關性和事實核查
進一步檢查文檔相關性和事實依據:
class Relevance(BaseModel):
is_relevant: bool = Field(description="文檔是否相關")
explanation: str = Field(description="相關性說明")
is_relevant_json_parser = JsonOutputParser(pydantic_object=Relevance)
is_relevant_llm = ChatGroq(
temperature=0,
model_name="llama3-70b-8192",
groq_api_key=groq_api_key,
max_tokens=2000
)
is_relevant_content_prompt = PromptTemplate(
template=is_relevant_content_prompt_template,
input_variables=["query", "context"],
partial_variables={"format_instructions": is_relevant_json_parser.get_format_instructions()},
)
is_relevant_content_chain = is_relevant_content_prompt | is_relevant_llm | is_relevant_json_parser
defis_relevant_content(state):
question = state["question"]
context = state["context"]
input_data = {"query": question, "context": context}
print("判斷文檔相關性...")
output = is_relevant_content_chain.invoke(input_data)
if output["is_relevant"]:
print("文檔相關。")
return"relevant"
else:
print("文檔不相關。")
return "not relevant"
事實核查:
class is_grounded_on_facts(BaseModel):
grounded_on_facts: bool = Field(description="回答是否基于事實")
is_grounded_on_facts_llm = ChatTogether(
temperature=0,
model_name="meta-llama/Llama-3.3-70B-Instruct-Turbo-Free",
api_key=together_api_key,
max_tokens=2000
)
is_grounded_on_facts_prompt_template = """You are a fact-checker that determines if the given answer {answer} is grounded in the given context {context}...
"""
is_grounded_on_facts_prompt = PromptTemplate(
template=is_grounded_on_facts_prompt_template,
input_variables=["context", "answer"],
)
is_grounded_on_facts_chain = (
is_grounded_on_facts_prompt
| is_grounded_on_facts_llm.with_structured_output(is_grounded_on_facts)
)
defgrade_generation_v_documents_and_question(state):
context = state["context"]
answer = state["answer"]
question = state["question"]
grounded = is_grounded_on_facts_chain.invoke({"context": context, "answer": answer}).grounded_on_facts
ifnot grounded:
print("回答是幻覺。")
return"hallucination"
print("回答基于事實。")
can_be_answered = can_be_answered_chain.invoke({"question": question, "context": context})["can_be_answered"]
if can_be_answered:
print("問題可以完全回答。")
return"useful"
else:
print("問題無法完全回答。")
return "not_useful"
測試 RAG 管道
測試一個簡單問題:
init_state = {"question": "who is fluffy?"}
context_state = retrieve_context_per_question(init_state)
relevant_content_state = keep_only_relevant_content(context_state)
is_relevant_content_state = is_relevant_content(relevant_content_state)
answer_state = answer_question_from_context(relevant_content_state)
final_answer = grade_generation_v_documents_and_question(answer_state)
print(answer_state["answer"])
輸出:
檢索相關分塊...
檢索相關章節摘要...
保留僅相關內容...
判斷文檔相關性...
文檔相關。
從檢索上下文回答問題...
回答(未檢查幻覺): Fluffy is a three-headed dog.
檢查回答是否基于事實...
回答基于事實。
判斷問題是否完全回答...
問題可以完全回答。
Fluffy is a three-headed dog.
Fluffy 是《哈利·波特》中的三頭犬,管道正確識別,說明運行正常。
使用 LangGraph 可視化 RAG 管道
用 LangGraph 可視化管道:
from typing import TypedDict
from langgraph.graph import END, StateGraph
from langchain_core.runnables.graph import MermaidDrawMethod
from IPython.display import display, Image
classQualitativeRetievalAnswerGraphState(TypedDict):
question: str; context: str; answer: str
wf = StateGraph(QualitativeRetievalAnswerGraphState)
for n, f in [("retrieve", retrieve_context_per_question),
("filter", keep_only_relevant_content),
("rewrite", rewrite_question),
("answer", answer_question_from_context)]:
wf.add_node(n, f)
wf.set_entry_point("retrieve")
wf.add_edge("retrieve", "filter")
wf.add_conditional_edges("filter", is_relevant_content, {
"relevant": "answer",
"not relevant": "rewrite"
})
wf.add_edge("rewrite", "retrieve")
wf.add_conditional_edges("answer", grade_generation_v_documents_and_question, {
"hallucination": "answer",
"not_useful": "rewrite",
"useful": END
})
display(Image(wf.compile().get_graph().draw_mermaid_png(draw_method=MermaidDrawMethod.API)))
這個圖清晰展示了從檢索上下文到過濾、查詢重寫、回答生成和事實核查的流程。
子圖方法與提煉驗證
復雜任務需要子圖(subgraphs)來拆分功能,比如檢索、提煉和驗證:
is_distilled_content_grounded_on_content_prompt_template = """
You receive some distilled content: {distilled_content} and the original context: {original_context}.
You need to determine if the distilled content is grounded on the original context.
...
"""
classIsDistilledContentGroundedOnContent(BaseModel):
grounded: bool
explanation: str
is_distilled_content_grounded_on_content_json_parser = JsonOutputParser(
pydantic_object=IsDistilledContentGroundedOnContent
)
is_distilled_content_grounded_on_content_prompt = PromptTemplate(
template=is_distilled_content_grounded_on_content_prompt_template,
input_variables=["distilled_content", "original_context"],
partial_variables={"format_instructions": is_distilled_content_grounded_on_content_json_parser.get_format_instructions()},
)
is_distilled_content_grounded_on_content_llm = ChatGroq(
temperature=0,
model_name="llama3-70b-8192",
groq_api_key=groq_api_key,
max_tokens=4000
)
is_distilled_content_grounded_on_content_chain = (
is_distilled_content_grounded_on_content_prompt
| is_distilled_content_grounded_on_content_llm
| is_distilled_content_grounded_on_content_json_parser
)
defis_distilled_content_grounded_on_content(state):
print("判斷提煉內容是否基于原始上下文...")
distilled_content = state["relevant_context"]
original_context = state["context"]
input_data = {"distilled_content": distilled_content, "original_context": original_context}
output = is_distilled_content_grounded_on_content_chain.invoke(input_data)
grounded = output["grounded"]
if grounded:
print("提煉內容基于原始上下文。")
return"grounded on the original context"
else:
print("提煉內容不基于原始上下文。")
return "not grounded on the original context"
創建檢索與提煉子圖
為章節摘要、引用和傳統分塊創建單獨的檢索函數:
def retrieve_chunks_context_per_question(state):
print("檢索相關分塊...")
question = state["question"]
docs = book_chunks_retriever.get_relevant_documents(question)
context = " ".join(doc.page_content for doc in docs)
context = context.replace('"', '\\"').replace("'", "\\'")
return {"context": context, "question": question}
defretrieve_summaries_context_per_question(state):
print("檢索相關章節摘要...")
question = state["question"]
docs_summaries = chapter_summaries_retriever.get_relevant_documents(state["question"])
context_summaries = " ".join(f"{doc.page_content} (Chapter {doc.metadata['chapter']})"for doc in docs_summaries)
context_summaries = context_summaries.replace('"', '\\"').replace("'", "\\'")
return {"context": context_summaries, "question": question}
defretrieve_book_quotes_context_per_question(state):
print("檢索相關書籍引用...")
question = state["question"]
docs_book_quotes = book_quotes_retriever.get_relevant_documents(state["question"])
book_qoutes = " ".join(doc.page_content for doc in docs_book_quotes)
book_qoutes_context = book_qoutes.replace('"', '\\"').replace("'", "\\'")
return {"context": book_qoutes_context, "question": question}
classQualitativeRetrievalGraphState(TypedDict):
question: str
context: str
relevant_context: str
defbuild_retrieval_workflow(node_name, retrieve_fn):
graph = StateGraph(QualitativeRetrievalGraphState)
graph.add_node(node_name, retrieve_fn)
graph.add_node("keep_only_relevant_content", keep_only_relevant_content)
graph.set_entry_point(node_name)
graph.add_edge(node_name, "keep_only_relevant_content")
graph.add_conditional_edges(
"keep_only_relevant_content",
is_distilled_content_grounded_on_content,
{
"grounded on the original context": END,
"not grounded on the original context": "keep_only_relevant_content",
},
)
app = graph.compile()
display(Image(app.get_graph().draw_mermaid_png(draw_method=MermaidDrawMethod.API)))
return graph
build_retrieval_workflow("retrieve_chunks_context_per_question", retrieve_chunks_context_per_question)
build_retrieval_workflow("retrieve_summaries_context_per_question", retrieve_summaries_context_per_question)
build_retrieval_workflow("retrieve_book_quotes_context_per_question", retrieve_book_quotes_context_per_question)
創建減少幻覺的子圖
減少幻覺的子圖:
def is_answer_grounded_on_context(state):
print("檢查回答是否基于事實...")
context = state["context"]
answer = state["answer"]
result = is_grounded_on_facts_chain.invoke({"context": context, "answer": answer})
grounded_on_facts = result.grounded_on_facts
ifnot grounded_on_facts:
print("回答是幻覺。")
return"hallucination"
else:
print("回答基于事實。")
return"grounded on context"
classQualitativeAnswerGraphState(TypedDict):
question: str; context: str; answer: str
wf = StateGraph(QualitativeAnswerGraphState)
wf.add_node("answer", answer_question_from_context)
wf.set_entry_point("answer")
wf.add_conditional_edges("answer", is_answer_grounded_on_context, {
"hallucination": "answer",
"grounded on context": END
})
display(Image(wf.compile().get_graph().draw_mermaid_png(draw_method=MermaidDrawMethod.API)))
測試幻覺子圖:
question = "who is harry?"
context = "Harry Potter is a cat."
init_state = {"question": question, "context": context}
for output in qualitative_answer_workflow_app.stream(init_state):
for _, value in output.items():
pass
print("--------------------")
print(f'answer: {value["answer"]}')
輸出:
從檢索上下文回答問題...
回答(未檢查幻覺): Harry Potter is a cat.
檢查回答是否基于事實...
回答基于事實。
--------------------
answer: Harry Potter is a cat.
即使上下文錯誤,系統仍基于上下文回答,說明它不會憑空“捏造”。
創建并測試計劃執行器
定義計劃執行器:
class PlanExecute(TypedDict):
curr_state: str
question: str
anonymized_question: str
query_to_retrieve_or_answer: str
plan: List[str]
past_steps: List[str]
mapping: dict
curr_context: str
aggregated_context: str
tool: str
response: str
classPlan(BaseModel):
steps: List[str] = Field(description="按順序執行的步驟")
planner_prompt = """For the given query {question}, come up with a simple step by step plan of how to figure out the answer. ..."""
planner_prompt = PromptTemplate(
template=planner_prompt,
input_variables=["question"],
)
planner_llm = ChatTogether(
temperature=0,
model_name="meta-llama/Llama-3.3-70B-Instruct-Turbo-Free",
api_key=together_api_key,
max_tokens=2000
)
planner = planner_prompt | planner_llm.with_structured_output(Plan)
break_down_plan_prompt_template = """You receive a plan {plan} which contains a series of steps to follow in order to answer a query. ..."""
break_down_plan_prompt = PromptTemplate(
template=break_down_plan_prompt_template,
input_variables=["plan"],
)
break_down_plan_llm = ChatTogether(
temperature=0,
model_name="meta-llama/Llama-3.3-70B-Instruct-Turbo-Free",
api_key=together_api_key,
max_tokens=2000
)
break_down_plan_chain = break_down_plan_prompt | break_down_plan_llm.with_structured_output(Plan)
測試計劃執行器:
question = {"question": "how did the main character beat the villain?"}
my_plan = planner.invoke(question)
print(my_plan)
refined_plan = break_down_plan_chain.invoke(my_plan.steps)
print(refined_plan)
輸出:
steps = [
'從向量存儲中識別英雄和反派。',
'從向量存儲中找到高潮或最終對決。',
'從向量存儲中分析英雄在此對決中的行動。',
'從向量存儲中確定擊敗反派的關鍵行動/策略。',
'使用檢索到的上下文總結英雄如何擊敗反派。'
]
重新規劃邏輯
更新計劃:
replanner_prompt_template = """
For the given objective, come up with a simple step by step plan of how to figure out the answer. ...
"""
classActPossibleResults(BaseModel):
plan: Plan = Field(description="未來計劃")
explanation: str = Field(description="行動說明")
act_possible_results_parser = JsonOutputParser(pydantic_object=ActPossibleResults)
replanner_prompt = PromptTemplate(
template=replanner_prompt_template,
input_variables=["question", "plan", "past_steps", "aggregated_context"],
partial_variables={"format_instructions": act_possible_results_parser.get_format_instructions()},
)
replanner_llm = ChatTogether(temperature=0, model_name="LLaMA-3.3-70B-Turbo-Free", max_tokens=2000)
replanner = replanner_prompt | replanner_llm | act_possible_results_parser
創建任務處理器
任務處理器決定使用哪個子圖:
tasks_handler_prompt_template = """
You are a task handler that receives a task: {curr_task} and must decide which tool to use to execute the task. ...
"""
class TaskHandlerOutput(BaseModel):
query: str = Field(description="用于檢索或回答的查詢")
curr_context: str = Field(description="回答查詢的上下文")
tool: str = Field(description="使用的工具:retrieve_chunks, retrieve_summaries, retrieve_quotes, 或 answer_from_context")
task_handler_prompt = PromptTemplate(
template=tasks_handler_prompt_template,
input_variables=["curr_task", "aggregated_context", "last_tool", "past_steps", "question"],
)
task_handler_llm = ChatTogether(temperature=0, model_name="meta-llama/Llama-3.3-70B-Instruct-Turbo-Free", api_key=together_api_key, max_tokens=2000)
task_handler_chain = task_handler_prompt | task_handler_llm.with_structured_output(TaskHandlerOutput)
輸入問題的匿名化/去匿名化
匿名化問題以避免 LLM 偏見:
class AnonymizeQuestion(BaseModel):
anonymized_question: str
mapping: dict
explanation: str
anonymize_question_chain = (
PromptTemplate(
input_variables=["question"],
partial_variables={"format_instructions": JsonOutputParser(pydantic_object=AnonymizeQuestion).get_format_instructions()},
template="""You anonymize questions by replacing named entities with variables. ...""",
)
| ChatTogether(temperature=0, model_name="meta-llama/Llama-3.3-70B-Instruct-Turbo-Free", api_key=together_api_key, max_tokens=2000)
| JsonOutputParser(pydantic_object=AnonymizeQuestion)
)
classDeAnonymizePlan(BaseModel):
plan: List
de_anonymize_plan_chain = (
PromptTemplate(
input_variables=["plan", "mapping"],
template="Replace variables in: {plan}, using: {mapping}. Output updated list as JSON."
)
| ChatTogether(temperature=0, model_name="meta-llama/Llama-3.3-70B-Instruct-Turbo-Free", api_key=together_api_key, max_tokens=2000).with_structured_output(DeAnonymizePlan)
)
編譯與可視化 RAG 管道
執行計劃并打印步驟:
def execute_plan_and_print_steps(state):
state["curr_state"] = "task_handler"
curr_task = state["plan"].pop(0)
inputs = {
"curr_task": curr_task,
"aggregated_context": state.get("aggregated_context", ""),
"last_tool": state.get("tool"),
"past_steps": state.get("past_steps", []),
"question": state["question"]
}
output = task_handler_chain.invoke(inputs)
state["past_steps"].append(curr_task)
state["query_to_retrieve_or_answer"] = output.query
state["tool"] = output.tool if output.tool != "answer_from_context"else"answer"
if output.tool == "answer_from_context":
state["curr_context"] = output.curr_context
return state
整體流程:
- 1. 匿名化問題
- 2. 規劃器創建高層次策略
- 3. 去匿名化計劃
- 4. 拆分計劃為小任務
- 5. 任務處理器選擇工具
- 6. 檢索或回答
- 7. 根據新信息重新規劃
- 8. 生成最終回答
- 9. 結束
測試最終管道
測試無法回答的問題:
input = {"question": "盧平教授教了什么?"}
final_answer, final_state = execute_plan_and_print_steps(input)
輸出:
...
最終回答: 數據中未找到答案。
測試復雜問題:
input = {"question": "幫助反派的教授教什么課?"}
final_answer, final_state = execute_plan_and_print_steps(input)
輸出:
...
最終回答: 幫助反派的教授是奇洛教授,教黑魔法防御術。
測試推理問題:
input = {"question": "哈利如何擊敗奇洛?"}
final_answer, final_state = execute_plan_and_print_steps(input)
輸出:
...
最終回答: 哈利擊敗奇洛因為他母親的保護魔法使奇洛在接觸哈利時會被灼傷。
使用 RAGAS 評估
用 RAGAS 評估管道:
questions = [
"守護魔法石的三頭犬叫什么?",
"誰給了哈利·波特他的第一把飛天掃帚?",
"分院帽最初為哈利考慮哪個學院?"
]
ground_truth_answers = [
"Fluffy",
"麥格教授",
"斯萊特林"
]
data_samples = {
'question': questions,
'answer': generated_answers,
'contexts': retrieved_documents,
'ground_truth': ground_truth_answers
}
data_samples['contexts'] = [[context] ifisinstance(context, str) else context for context in data_samples['contexts']]
dataset = Dataset.from_dict(data_samples)
metrics = [
answer_correctness,
faithfulness,
answer_relevancy,
context_recall,
answer_similarity
]
llm = ChatTogether(temperature=0, model_name="meta-llama/Llama-3.3-70B-Instruct-Turbo-Free", api_key=together_api_key, max_tokens=2000)
score = evaluate(dataset, metrics=metrics, llm=llm)
results_df = score.to_pandas()
評估結果顯示管道在小規模測試中表現良好,部分指標得分約 0.9。
總結
我們從零開始,清洗數據、拆分數據,創建檢索器、過濾器、查詢重寫器和 COT 管道。為了處理復雜查詢,引入了子圖方法,構建了檢索、提煉等子圖,還開發了減少幻覺的組件,設計了規劃器和任務處理器,最后用 RAGAS 評估了系統。希望你學到了新東西!
本文轉載自??AI大模型觀察站??,作者:AI大模型觀察站
