基于Agent的金融問答系統:RAG檢索模塊初建成 原創
前言
我們在上一章《【項目實戰】基于Agent的金融問答系統之項目簡介》中簡單介紹了項目背景以及數據集情況,本章將介紹RAG模塊的實現。
功能列表
參考之前所學內容《大模型之初識RAG》,我們需要實現如下功能:
- 向量庫的基礎功能
向量庫
數據入庫
- 文件導入
PDF文件的讀取
PDF文件的切分
調用向量庫接口入庫
- 文件檢索
連接向量庫
檢索器檢索文件
開發過程
1、規劃工程文件
項目開始之后,我們如果能夠抑制住直接擼代碼的沖動,改為提前做好規劃,這會是一個好的習慣。 為此,我提前做了如下規劃:
代碼管理
- 代碼使用Git進行管理,這樣后期多人協作時方便代碼更新、代碼Merge、沖突解決等。
- 由于國內訪問Github優勢會ban,所以我們將代碼倉庫放在Gitee上。
- 為代碼倉庫起了一個響亮的名稱后,倉庫地址定為https://gitee.com/deadwalk/smart-finance-bot
項目目錄
考慮這個項目會涉及到前端、后端、數據集、模型等,所以項目目錄規劃如下:
smart-finance-bot \
|- dataset \ # 該目錄用于保存PDF以及SQLite數據
|- doc \ # 該目錄用于保存文檔類文件,例如:需求文檔、說明文檔、數據文檔
|- app \ # 該目錄用于服務端代碼
|- agent \ # 該目錄用于保存agent相關代碼
|- rag \ # 該目錄用于保存rag相關代碼
|-test \ # 該目錄用于保存測試類驅動相關代碼
|- conf \ # 該目錄用于保存配置文件
|-.qwen # 該文件保存QWen的配置文件(請自行添加對應的API KEY)
|-.ernie # 該文件保存百度千帆的配置文件(請自行添加對應的API KEY)
|- chatweb \ # 該目錄用于保存前端頁面代碼
|- scripts \ # 該目錄用于保存腳本文件,例如:啟動腳本、導入向量數據庫腳本等
|- test_result \ # 該目錄用于保存測試結果
|- docker \
|- backend \ # 該目錄對應后端python服務的Dockerfile
|- frontend \ # 該目錄對應前端python服務的Dockerfile
上述目錄中,??dataset?
? 是直接使用git的submodul命令,直接將天池大賽提供的數據集引入到本項目中,方便后續使用。
引入方法:
git submodule add https://www.modelscope.cn/datasets/BJQW14B/bs_challenge_financial_14b_dataset.git dataset
命名規范
項目如果能夠約束統一的命名規范,這對于后續代碼的可讀性、可維護性會提供需要便利,在此我沿用了約定俗成的代碼命名規范:
- 類名:使用大駝峰命名法,例如:?
?MyClass?
? - 函數名:使用小駝峰命名法,例如:?
?my_function?
? - 變量名:使用小駝峰命名法,例如:?
?my_variable?
? - 文件夾:使用小駝峰命名法。
整體命名時,要盡量見文知意。
2、實現基本的連接大模型的util庫
代碼文件及目錄:??app/utils/util.py?
?
from dotenv import load_dotenv
import os
# 獲取當前文件的目錄
current_dir = os.path.dirname(__file__)
# 構建到 conf/.qwen 的相對路徑
conf_file_path_qwen = os.path.join(current_dir,'..','conf','.qwen')
# 加載千問環境變量
load_dotenv(dotenv_path=conf_file_path_qwen)
defget_qwen_models():
"""
加載千問系列大模型
"""
# llm 大模型
from langchain_community.llms.tongyi importTongyi
llm =Tongyi(model="qwen-max", temperature=0.1, top_p=0.7, max_tokens=1024)
# chat 大模型
from langchain_community.chat_models importChatTongyi
chat =ChatTongyi(model="qwen-max", temperature=0.01, top_p=0.2, max_tokens=1024)
# embedding 大模型
from langchain_community.embeddings importDashScopeEmbeddings
embed =DashScopeEmbeddings(model="text-embedding-v3")
return llm, chat, embed
在app/conf/.qwen中,添加對應的API KEY,例如:
DASHSCOPE_API_KEY = sk-xxxxxx
3、實現向量庫基礎功能
向量庫文件考慮使用Chroma實現,所以我們先實現一個向量庫的類,用于完成基本的向量庫連接、數據入庫操作。
代碼文件及目錄:??app/rag/chroma_conn.py?
?
import chromadb
from chromadb importSettings
from langchain_chroma importChroma
classChromaDB:
def__init__(self,
chroma_server_type="local", # 服務器類型:http是http方式連接方式,local是本地讀取文件方式
host="localhost", port=8000, # 服務器地址,http方式必須指定
persist_path="chroma_db", # 數據庫的路徑:如果是本地連接,需要指定數據庫的路徑
collection_name="langchain", # 數據庫的集合名稱
embed=None # 數據庫的向量化函數
):
self.host = host
self.port = port
self.path = persist_path
self.embed = embed
self.store =None
# 如果是http協議方式連接數據庫
if chroma_server_type =="http":
client = chromadb.HttpClient(host=host, port=port)
self.store =Chroma(collection_name=collection_name,
embedding_function=embed,
client=client)
if chroma_server_type =="local":
self.store =Chroma(collection_name=collection_name,
embedding_function=embed,
persist_directory=persist_path)
if self.store isNone:
raiseValueError("Chroma store init failed!")
defadd_with_langchain(self, docs):
"""
將文檔添加到數據庫
"""
self.store.add_documents(documents=docs)
defget_store(self):
"""
獲得向量數據庫的對象實例
"""
return self.store
在實際項目實踐過程中,我們發現導入Chroma數據時使用本地化連接方式更快一些,所以對連接方式做了兩個參數的擴展,local 代表本地連接,http 代表遠程連接。
4、實現入庫功能
本著先跑通流程,再優化交互過程的思路,對于PDF文件入向量庫的過程,我們先通過一段腳本實現(暫不做前端UI的交互)。
代碼文件及目錄:??app/rag/pdf_processor.py?
?
import os
import logging
import time
from tqdm import tqdm
from langchain_community.document_loaders importPyMuPDFLoader
from langchain_text_splitters importRecursiveCharacterTextSplitter
from rag.chroma_conn importChromaDB
classPDFProcessor:
def__init__(self,
directory, # PDF文件所在目錄
chroma_server_type, # ChromaDB服務器類型
persist_path, # ChromaDB持久化路徑
embed):# 向量化函數
self.directory = directory
self.file_group_num =80# 每組處理的文件數
self.batch_num =6# 每次插入的批次數量
self.chunksize =500# 切分文本的大小
self.overlap =100# 切分文本的重疊大小
self.chroma_db =ChromaDB(chroma_server_type=chroma_server_type,
persist_path=persist_path,
embed=embed)
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
defload_pdf_files(self):
"""
加載目錄下的所有PDF文件
"""
pdf_files =[]
for file in os.listdir(self.directory):
if file.lower().endswith('.pdf'):
pdf_files.append(os.path.join(self.directory, file))
logging.info(f"Found {len(pdf_files)} PDF files.")
return pdf_files
defload_pdf_content(self, pdf_path):
"""
讀取PDF文件內容
"""
pdf_loader =PyMuPDFLoader(file_path=pdf_path)
docs = pdf_loader.load()
logging.info(f"Loading content from {pdf_path}.")
return docs
defsplit_text(self, documents):
"""
將文本切分成小段
"""
# 切分文檔
text_splitter =RecursiveCharacterTextSplitter(
chunk_size=self.chunksize,
chunk_overlap=self.overlap,
length_function=len,
add_start_index=True,
)
docs = text_splitter.split_documents(documents)
logging.info("Split text into smaller chunks with RecursiveCharacterTextSplitter.")
return docs
definsert_docs_chromadb(self, docs, batch_size=6):
"""
將文檔插入到ChromaDB
"""
# 分批入庫
logging.info(f"Inserting {len(docs)} documents into ChromaDB.")
# 記錄開始時間
start_time = time.time()
total_docs_inserted =0
# 計算總批次
total_batches =(len(docs)+ batch_size -1)// batch_size
with tqdm(total=total_batches, desc="Inserting batches", unit="batch")as pbar:
for i inrange(0,len(docs), batch_size):
# 獲取當前批次的樣本
batch = docs[i:i + batch_size]
# 將樣本入庫
self.chroma_db.add_with_langchain(batch)
# self.chroma_db.async_add_with_langchain(batch)
# 更新已插入的文檔數量
total_docs_inserted +=len(batch)
# 計算并顯示當前的TPM
elapsed_time = time.time()- start_time # 計算已用時間(秒)
if elapsed_time >0:# 防止除以零
tpm =(total_docs_inserted / elapsed_time)*60# 轉換為每分鐘插入的文檔數
pbar.set_postfix({"TPM":f"{tpm:.2f}"})# 更新進度條的后綴信息
# 更新進度條
pbar.update(1)
defprocess_pdfs_group(self, pdf_files_group):
# 讀取PDF文件內容
pdf_contents =[]
for pdf_path in pdf_files_group:
# 讀取PDF文件內容
documents = self.load_pdf_content(pdf_path)
# 將documents 逐一添加到pdf_contents
pdf_contents.extend(documents)
# 將文本切分成小段
docs = self.split_text(pdf_contents)
# 將文檔插入到ChromaDB
self.insert_docs_chromadb(docs, self.batch_num)
defprocess_pdfs(self):
# 獲取目錄下所有的PDF文件
pdf_files = self.load_pdf_files()
group_num = self.file_group_num
# group_num 個PDF文件為一組,分批處理
for i inrange(0,len(pdf_files), group_num):
pdf_files_group = pdf_files[i:i + group_num]
self.process_pdfs_group(pdf_files_group)
print("PDFs processed successfully!")
5、測試導入功能
因為Python的導入庫的原因(一般都是從工作目錄查找),所以我們在項目根目錄下創建test_framework.py,方便后續統一測試工作。
smart-finance-bot \
|- app \ # 該目錄用于服務端代碼
|- rag \ # 該目錄用于保存rag相關代碼
|- pdf_processor.py
|- chroma_conn.py
|- test_framework.py
代碼文件: ??app/test_framework.py?
?
# 測試導入PDF到向量庫主流程
deftest_import():
from rag.pdf_processor importPDFProcessor
from utils.util import get_qwen_models
llm , chat , embed = get_qwen_models()
# embed = get_huggingface_embeddings()
directory ="./app/dataset/pdf"
persist_path ="chroma_db"
server_type ="local"
# 創建 PDFProcessor 實例
pdf_processor =PDFProcessor(directory=directory,
chroma_server_type=server_type,
persist_path=persist_path,
embed=embed)
# 處理 PDF 文件
pdf_processor.process_pdfs()
if __name__ =="__main__":
test_import()
1.通過命令行啟動ChromaDB服務端:
chroma run --path chroma_db --port 8000 --host localhost
2.運行test_framework.py
運行效果:
備注:一般測試框架會使用Pytest并且編寫相應的單元測試函數,本次項目中由于項目較小且函數返回結果不固定,所以就沒有寫UnitTest。如果想了解Pytest的使用示例,可以參考我的其他代碼倉庫,例如:UnitTest的使用
6、實現檢索功能
代碼文件: ??app/rag/rag.py?
?
import logging
from langchain_core.prompts importChatPromptTemplate
from langchain_core.runnables importRunnablePassthrough
from langchain_core.runnables.base importRunnableLambda
from langchain_core.output_parsers importStrOutputParser
from.chroma_conn importChromaDB
# 配置日志記錄
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s')
classRagManager:
def__init__(self,
chroma_server_type="http",
host="localhost", port=8000,
persist_path="chroma_db",
llm=None, embed=None):
self.llm = llm
self.embed = embed
chrom_db =ChromaDB(chroma_server_type=chroma_server_type,
host=host, port=port,
persist_path=persist_path,
embed=embed)
self.store = chrom_db.get_store()
defget_chain(self, retriever):
"""獲取RAG查詢鏈"""
# RAG系統經典的 Prompt (A 增強的過程)
prompt =ChatPromptTemplate.from_messages([
("human","""You are an assistant for question-answering tasks. Use the following pieces
of retrieved context to answer the question.
If you don't know the answer, just say that you don't know.
Use three sentences maximum and keep the answer concise.
Question: {question}
Context: {context}
Answer:""")
])
# 將 format_docs 方法包裝為 Runnable
format_docs_runnable =RunnableLambda(self.format_docs)
# RAG 鏈
rag_chain =(
{"context": retriever | format_docs_runnable,
"question":RunnablePassthrough()}
| prompt
| self.llm
|StrOutputParser()
)
return rag_chain
defformat_docs(self, docs):
# 返回檢索到的資料文件名稱
logging.info(f"檢索到資料文件個數:{len(docs)}")
retrieved_files ="\n".join([doc.metadata["source"]for doc in docs])
logging.info(f"資料文件分別是:\n{retrieved_files}")
retrieved_content ="\n\n".join(doc.page_content for doc in docs)
logging.info(f"檢索到的資料為:\n{retrieved_content}")
return retrieved_content
defget_retriever(self, k=4, mutuality=0.3):
retriever = self.store.as_retriever(search_type="similarity_score_threshold",
search_kwargs={"k": k,"score_threshold": mutuality})
return retriever
defget_result(self, question, k=4, mutuality=0.3):
"""獲取RAG查詢結果"""
retriever = self.get_retriever(k, mutuality)
rag_chain = self.get_chain(retriever)
return rag_chain.invoke(input=question)
以上是實現了一個使用基本檢索器的RAG,其中:
- 代碼中通過chroma_conn.py模塊連接到ChromaDB數據庫,并使用ChromaDB的as_retriever方法創建一個檢索器。
7、測試檢索效果
在test_framework.py中添加RAG的測試調用函數。
代碼文件:??app/test_framework.py?
?
# 測試RAG主流程
deftest_rag():
from rag.rag importRagManager
from utils.util import get_qwen_models
llm, chat, embed = get_qwen_models()
rag =RagManager(host="localhost", port=8000, llm=llm, embed=embed)
result = rag.get_result("湖南長遠鋰科股份有限公司變更設立時作為發起人的法人有哪些?")
print(result)
if __name__ =="__main__":
test_rag()# RAG測試函數
# test_import() # 批量導入PDF測試函數
注釋掉批量導入函數,開啟test_rag()函數,運行效果:
至此,我們完成了RAG模塊的基本功能,它包括PDF文件的批量導入以及檢索功能。
內容小結
- 首先,我們創建了一個ChromaDB的類,封裝了基礎的Chroma連接、插入、檢索。
- 其次,我們實現了PDFProcessor類,該類中會調用ChromaDB類的插入函數,將批量讀取的PDF文件進行切分后保存至向量庫。
- 然后,我們實現了RagManager類,該類中封裝了RAG的檢索鏈,并定義了檢索的參數。
- 最后,我們實現了一個測試函數,用于測試RAG的檢索功能。
- 除此之外,有兩個注意事項:
在項目初期,進行合理的項目文件目錄規劃,可以有效減少項目維護的難度。
在項目行進中,通過搭建測試框架,可以方便函數驗證以及后續重構的回歸測試。
本文轉載自公眾號一起AI技術 作者:Dongming
原文鏈接:??https://mp.weixin.qq.com/s/ZMwZSsmms03U-GRRwnxMNg??
