成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

基于Agent的金融問答系統:RAG檢索模塊初建成 原創

發布于 2024-11-22 15:46
瀏覽
0收藏

前言

我們在上一章《【項目實戰】基于Agent的金融問答系統之項目簡介》中簡單介紹了項目背景以及數據集情況,本章將介紹RAG模塊的實現。

功能列表

參考之前所學內容《大模型之初識RAG》,我們需要實現如下功能:

  • 向量庫的基礎功能

     向量庫

     數據入庫

  • 文件導入

    PDF文件的讀取

    PDF文件的切分

    調用向量庫接口入庫

  • 文件檢索

    連接向量庫

    檢索器檢索文件

開發過程

1、規劃工程文件

項目開始之后,我們如果能夠抑制住直接擼代碼的沖動,改為提前做好規劃,這會是一個好的習慣。 為此,我提前做了如下規劃:

代碼管理

  • 代碼使用Git進行管理,這樣后期多人協作時方便代碼更新、代碼Merge、沖突解決等。
  • 由于國內訪問Github優勢會ban,所以我們將代碼倉庫放在Gitee上。
  • 為代碼倉庫起了一個響亮的名稱后,倉庫地址定為https://gitee.com/deadwalk/smart-finance-bot

項目目錄

考慮這個項目會涉及到前端、后端、數據集、模型等,所以項目目錄規劃如下:

smart-finance-bot \
|- dataset \  # 該目錄用于保存PDF以及SQLite數據
|- doc \  # 該目錄用于保存文檔類文件,例如:需求文檔、說明文檔、數據文檔
|- app \   # 該目錄用于服務端代碼
|- agent \ # 該目錄用于保存agent相關代碼
|- rag \   # 該目錄用于保存rag相關代碼
|-test \   # 該目錄用于保存測試類驅動相關代碼
|- conf \  # 該目錄用于保存配置文件
|-.qwen # 該文件保存QWen的配置文件(請自行添加對應的API KEY)
|-.ernie # 該文件保存百度千帆的配置文件(請自行添加對應的API KEY)
|- chatweb \   # 該目錄用于保存前端頁面代碼
|- scripts \   # 該目錄用于保存腳本文件,例如:啟動腳本、導入向量數據庫腳本等
|- test_result \   # 該目錄用于保存測試結果
|- docker \
|- backend \ # 該目錄對應后端python服務的Dockerfile
|- frontend \ # 該目錄對應前端python服務的Dockerfile

上述目錄中,??dataset?? 是直接使用git的submodul命令,直接將天池大賽提供的數據集引入到本項目中,方便后續使用。

引入方法:

git submodule add https://www.modelscope.cn/datasets/BJQW14B/bs_challenge_financial_14b_dataset.git dataset

命名規范

項目如果能夠約束統一的命名規范,這對于后續代碼的可讀性、可維護性會提供需要便利,在此我沿用了約定俗成的代碼命名規范:

  • 類名:使用大駝峰命名法,例如:??MyClass??
  • 函數名:使用小駝峰命名法,例如:??my_function??
  • 變量名:使用小駝峰命名法,例如:??my_variable??
  • 文件夾:使用小駝峰命名法。

整體命名時,要盡量見文知意。

2、實現基本的連接大模型的util庫

代碼文件及目錄:??app/utils/util.py??

from dotenv import load_dotenv
import os

# 獲取當前文件的目錄
current_dir = os.path.dirname(__file__)

# 構建到 conf/.qwen 的相對路徑
conf_file_path_qwen = os.path.join(current_dir,'..','conf','.qwen')

# 加載千問環境變量
load_dotenv(dotenv_path=conf_file_path_qwen)

defget_qwen_models():
"""
    加載千問系列大模型
    """
# llm 大模型
from langchain_community.llms.tongyi importTongyi

    llm =Tongyi(model="qwen-max", temperature=0.1, top_p=0.7, max_tokens=1024)

# chat 大模型
from langchain_community.chat_models importChatTongyi

    chat =ChatTongyi(model="qwen-max", temperature=0.01, top_p=0.2, max_tokens=1024)
# embedding 大模型
from langchain_community.embeddings importDashScopeEmbeddings

    embed =DashScopeEmbeddings(model="text-embedding-v3")

return llm, chat, embed

在app/conf/.qwen中,添加對應的API KEY,例如:

DASHSCOPE_API_KEY = sk-xxxxxx

3、實現向量庫基礎功能

向量庫文件考慮使用Chroma實現,所以我們先實現一個向量庫的類,用于完成基本的向量庫連接、數據入庫操作。

代碼文件及目錄:??app/rag/chroma_conn.py??

import chromadb
from chromadb importSettings
from langchain_chroma importChroma


classChromaDB:
def__init__(self,
                 chroma_server_type="local",  # 服務器類型:http是http方式連接方式,local是本地讀取文件方式
                 host="localhost", port=8000,  # 服務器地址,http方式必須指定
                 persist_path="chroma_db",  # 數據庫的路徑:如果是本地連接,需要指定數據庫的路徑
                 collection_name="langchain",  # 數據庫的集合名稱
                 embed=None  # 數據庫的向量化函數
                 ):

        self.host = host
        self.port = port
        self.path = persist_path
        self.embed = embed
        self.store =None

# 如果是http協議方式連接數據庫
if chroma_server_type =="http":
            client = chromadb.HttpClient(host=host, port=port)

            self.store =Chroma(collection_name=collection_name,
                                embedding_function=embed,
                                client=client)

if chroma_server_type =="local":
            self.store =Chroma(collection_name=collection_name,
                                embedding_function=embed,
                                persist_directory=persist_path)

if self.store isNone:
raiseValueError("Chroma store init failed!")

defadd_with_langchain(self, docs):
"""
        將文檔添加到數據庫
        """
        self.store.add_documents(documents=docs)

defget_store(self):
"""
        獲得向量數據庫的對象實例
        """
return self.store

在實際項目實踐過程中,我們發現導入Chroma數據時使用本地化連接方式更快一些,所以對連接方式做了兩個參數的擴展,local 代表本地連接,http 代表遠程連接。

4、實現入庫功能

本著先跑通流程,再優化交互過程的思路,對于PDF文件入向量庫的過程,我們先通過一段腳本實現(暫不做前端UI的交互)。

代碼文件及目錄:??app/rag/pdf_processor.py??

import os
import logging
import time
from tqdm import tqdm
from langchain_community.document_loaders importPyMuPDFLoader
from langchain_text_splitters importRecursiveCharacterTextSplitter
from rag.chroma_conn importChromaDB


classPDFProcessor:
def__init__(self,
                 directory,  # PDF文件所在目錄
                 chroma_server_type,  # ChromaDB服務器類型
                 persist_path,  # ChromaDB持久化路徑
                 embed):# 向量化函數

        self.directory = directory
        self.file_group_num =80# 每組處理的文件數
        self.batch_num =6# 每次插入的批次數量

        self.chunksize =500# 切分文本的大小
        self.overlap =100# 切分文本的重疊大小

        self.chroma_db =ChromaDB(chroma_server_type=chroma_server_type,
                                  persist_path=persist_path,
                                  embed=embed)
# 配置日志
        logging.basicConfig(
            level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S'
)

defload_pdf_files(self):
"""
        加載目錄下的所有PDF文件
        """
        pdf_files =[]
for file in os.listdir(self.directory):
if file.lower().endswith('.pdf'):
                pdf_files.append(os.path.join(self.directory, file))

        logging.info(f"Found {len(pdf_files)} PDF files.")
return pdf_files

defload_pdf_content(self, pdf_path):
"""
        讀取PDF文件內容
        """
        pdf_loader =PyMuPDFLoader(file_path=pdf_path)
        docs = pdf_loader.load()
        logging.info(f"Loading content from {pdf_path}.")
return docs

defsplit_text(self, documents):
"""
        將文本切分成小段
        """
# 切分文檔
        text_splitter =RecursiveCharacterTextSplitter(
            chunk_size=self.chunksize,
            chunk_overlap=self.overlap,
            length_function=len,
            add_start_index=True,
)

        docs = text_splitter.split_documents(documents)

        logging.info("Split text into smaller chunks with RecursiveCharacterTextSplitter.")
return docs

definsert_docs_chromadb(self, docs, batch_size=6):
"""
        將文檔插入到ChromaDB
        """
# 分批入庫
        logging.info(f"Inserting {len(docs)} documents into ChromaDB.")

# 記錄開始時間
        start_time = time.time()
        total_docs_inserted =0

# 計算總批次
        total_batches =(len(docs)+ batch_size -1)// batch_size

with tqdm(total=total_batches, desc="Inserting batches", unit="batch")as pbar:
for i inrange(0,len(docs), batch_size):
# 獲取當前批次的樣本
                batch = docs[i:i + batch_size]

# 將樣本入庫
                self.chroma_db.add_with_langchain(batch)
# self.chroma_db.async_add_with_langchain(batch)

# 更新已插入的文檔數量
                total_docs_inserted +=len(batch)

# 計算并顯示當前的TPM
                elapsed_time = time.time()- start_time  # 計算已用時間(秒)
if elapsed_time >0:# 防止除以零
                    tpm =(total_docs_inserted / elapsed_time)*60# 轉換為每分鐘插入的文檔數
                    pbar.set_postfix({"TPM":f"{tpm:.2f}"})# 更新進度條的后綴信息

# 更新進度條
                pbar.update(1)

defprocess_pdfs_group(self, pdf_files_group):
# 讀取PDF文件內容
        pdf_contents =[]

for pdf_path in pdf_files_group:
# 讀取PDF文件內容
            documents = self.load_pdf_content(pdf_path)

# 將documents 逐一添加到pdf_contents
            pdf_contents.extend(documents)

# 將文本切分成小段
        docs = self.split_text(pdf_contents)

# 將文檔插入到ChromaDB
        self.insert_docs_chromadb(docs, self.batch_num)

defprocess_pdfs(self):
# 獲取目錄下所有的PDF文件
        pdf_files = self.load_pdf_files()

        group_num = self.file_group_num

# group_num 個PDF文件為一組,分批處理
for i inrange(0,len(pdf_files), group_num):
            pdf_files_group = pdf_files[i:i + group_num]
            self.process_pdfs_group(pdf_files_group)

print("PDFs processed successfully!")

5、測試導入功能

因為Python的導入庫的原因(一般都是從工作目錄查找),所以我們在項目根目錄下創建test_framework.py,方便后續統一測試工作。

smart-finance-bot \
    |- app \   # 該目錄用于服務端代碼
        |- rag \   # 該目錄用于保存rag相關代碼
            |- pdf_processor.py
            |- chroma_conn.py
        |- test_framework.py

代碼文件: ??app/test_framework.py??

# 測試導入PDF到向量庫主流程
deftest_import():
from rag.pdf_processor importPDFProcessor
from utils.util import get_qwen_models

    llm , chat , embed = get_qwen_models()
# embed = get_huggingface_embeddings()

    directory ="./app/dataset/pdf"
    persist_path ="chroma_db"
    server_type ="local"

# 創建 PDFProcessor 實例
    pdf_processor =PDFProcessor(directory=directory,
                                 chroma_server_type=server_type,
                                 persist_path=persist_path,
                                 embed=embed)

# 處理 PDF 文件
    pdf_processor.process_pdfs()

if __name__ =="__main__":
    test_import()

1.通過命令行啟動ChromaDB服務端:

chroma run --path chroma_db --port 8000 --host localhost

2.運行test_framework.py

運行效果:

基于Agent的金融問答系統:RAG檢索模塊初建成-AI.x社區


備注:一般測試框架會使用Pytest并且編寫相應的單元測試函數,本次項目中由于項目較小且函數返回結果不固定,所以就沒有寫UnitTest。如果想了解Pytest的使用示例,可以參考我的其他代碼倉庫,例如:UnitTest的使用

6、實現檢索功能

代碼文件: ??app/rag/rag.py??

import logging
from langchain_core.prompts importChatPromptTemplate
from langchain_core.runnables importRunnablePassthrough
from langchain_core.runnables.base importRunnableLambda
from langchain_core.output_parsers importStrOutputParser
from.chroma_conn importChromaDB

# 配置日志記錄
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s')


classRagManager:
def__init__(self,
                 chroma_server_type="http",
                 host="localhost", port=8000,
                 persist_path="chroma_db",
                 llm=None, embed=None):
        self.llm = llm
        self.embed = embed

        chrom_db =ChromaDB(chroma_server_type=chroma_server_type,
                            host=host, port=port,
                            persist_path=persist_path,
                            embed=embed)
        self.store = chrom_db.get_store()

defget_chain(self, retriever):
"""獲取RAG查詢鏈"""

# RAG系統經典的 Prompt (A 增強的過程)
        prompt =ChatPromptTemplate.from_messages([
("human","""You are an assistant for question-answering tasks. Use the following pieces 
          of retrieved context to answer the question. 
          If you don't know the answer, just say that you don't know. 
          Use three sentences maximum and keep the answer concise.
          Question: {question} 
          Context: {context} 
          Answer:""")
])
# 將 format_docs 方法包裝為 Runnable
        format_docs_runnable =RunnableLambda(self.format_docs)
# RAG 鏈
        rag_chain =(
{"context": retriever | format_docs_runnable,
"question":RunnablePassthrough()}
| prompt
| self.llm
|StrOutputParser()
)

return rag_chain

defformat_docs(self, docs):
# 返回檢索到的資料文件名稱
        logging.info(f"檢索到資料文件個數:{len(docs)}")
        retrieved_files ="\n".join([doc.metadata["source"]for doc in docs])
        logging.info(f"資料文件分別是:\n{retrieved_files}")

        retrieved_content ="\n\n".join(doc.page_content for doc in docs)
        logging.info(f"檢索到的資料為:\n{retrieved_content}")

return retrieved_content

defget_retriever(self, k=4, mutuality=0.3):
        retriever = self.store.as_retriever(search_type="similarity_score_threshold",
                                            search_kwargs={"k": k,"score_threshold": mutuality})

return retriever

defget_result(self, question, k=4, mutuality=0.3):
"""獲取RAG查詢結果"""

        retriever = self.get_retriever(k, mutuality)

        rag_chain = self.get_chain(retriever)

return rag_chain.invoke(input=question)

以上是實現了一個使用基本檢索器的RAG,其中:

  • 代碼中通過chroma_conn.py模塊連接到ChromaDB數據庫,并使用ChromaDB的as_retriever方法創建一個檢索器。

7、測試檢索效果

在test_framework.py中添加RAG的測試調用函數。

代碼文件:??app/test_framework.py??

# 測試RAG主流程
deftest_rag():
from rag.rag importRagManager
from utils.util import get_qwen_models

    llm, chat, embed = get_qwen_models()
    rag =RagManager(host="localhost", port=8000, llm=llm, embed=embed)

    result = rag.get_result("湖南長遠鋰科股份有限公司變更設立時作為發起人的法人有哪些?")

print(result)


if __name__ =="__main__":
    test_rag()# RAG測試函數
# test_import()         # 批量導入PDF測試函數

注釋掉批量導入函數,開啟test_rag()函數,運行效果:

基于Agent的金融問答系統:RAG檢索模塊初建成-AI.x社區

至此,我們完成了RAG模塊的基本功能,它包括PDF文件的批量導入以及檢索功能。

內容小結

  • 首先,我們創建了一個ChromaDB的類,封裝了基礎的Chroma連接、插入、檢索。
  • 其次,我們實現了PDFProcessor類,該類中會調用ChromaDB類的插入函數,將批量讀取的PDF文件進行切分后保存至向量庫。
  • 然后,我們實現了RagManager類,該類中封裝了RAG的檢索鏈,并定義了檢索的參數。
  • 最后,我們實現了一個測試函數,用于測試RAG的檢索功能。
  • 除此之外,有兩個注意事項:

    在項目初期,進行合理的項目文件目錄規劃,可以有效減少項目維護的難度。

    在項目行進中,通過搭建測試框架,可以方便函數驗證以及后續重構的回歸測試。


本文轉載自公眾號一起AI技術 作者:Dongming

原文鏈接:??https://mp.weixin.qq.com/s/ZMwZSsmms03U-GRRwnxMNg??


?著作權歸作者所有,如需轉載,請注明出處,否則將追究法律責任
標簽
收藏
回復
舉報
回復
相關推薦
主站蜘蛛池模板: 亚洲欧美精品国产一级在线 | 精品国产一级片 | 欧美一级黑人aaaaaaa做受 | 在线观看黄视频 | 国产男人的天堂 | 精品美女在线观看视频在线观看 | 2一3sex性hd| xx视频在线观看 | 啪啪综合网 | 九色av| 久久九九色 | 在线播放国产一区二区三区 | 高清成人av | 国产成人精品久久二区二区91 | 天天影视综合 | 久久久久一区 | 亚洲成年影院 | 欧美99 | 人人艹人人爽 | 毛片韩国| 三区在线 | 久草在线 | 久久成人av电影 | 在线观看亚洲专区 | 欧美一级在线 | 黄色网址在线免费观看 | 成人中文字幕在线观看 | 96久久久久久 | 亚洲一区 | 狠狠干网站 | 国产精品久久久久久久久久久久久久 | 成人午夜在线观看 | 99精品欧美一区二区蜜桃免费 | 在线成人免费视频 | 日本aa毛片a级毛片免费观看 | 一区二区三区精品视频 | 狠狠干综合视频 | 欧美日韩一区二区在线 | 久久久www成人免费无遮挡大片 | 国产中文字幕av | 国产精品永久免费视频 |