線上RAG應(yīng)用pdf文檔頻繁更新,老板下了死命令要節(jié)省預(yù)算,不能重復(fù)做embedding,我這么做..... 原創(chuàng)
我們最近在一個(gè)項(xiàng)目中遇到了一個(gè)問(wèn)題。項(xiàng)目的場(chǎng)景是這樣的:用戶將他們的PDF文檔存儲(chǔ)在磁盤(pán)的某個(gè)特定目錄中,然后有一個(gè)定時(shí)任務(wù)來(lái)掃描此目錄并從中的PDF文檔構(gòu)建知識(shí)庫(kù)。
一開(kāi)始,我們采用"增量更新"策略。在掃描目錄中的文檔時(shí),我們會(huì)對(duì)每個(gè)文檔進(jìn)行哈希運(yùn)算以生成其指紋,并檢查該指紋是否已存在于數(shù)據(jù)庫(kù)中。如果指紋不存在,就表示這是一個(gè)新文件,我們會(huì)對(duì)新文件的document做embedding,然后將其加入到知識(shí)庫(kù)中。
然而,這種方法存在一個(gè)問(wèn)題。如果同一文件進(jìn)行了增量添加,例如我們已經(jīng)將A.pdf文件加入到了知識(shí)庫(kù),但后來(lái)這個(gè)文件添加了新的內(nèi)容。當(dāng)我們重新計(jì)算其指紋并在數(shù)據(jù)庫(kù)中查找時(shí),由于指紋不存在,我們會(huì)將這個(gè)更新過(guò)的文件作為新文件處理,并重新做embedding加入到知識(shí)庫(kù)。這樣一來(lái),對(duì)于未更新的部分,知識(shí)庫(kù)會(huì)有兩份相同的數(shù)據(jù)記錄,第二份相同的記錄可能會(huì)"占據(jù)"原本應(yīng)該被召回的數(shù)據(jù)記錄的位置,從而降低問(wèn)答效果。
那么應(yīng)該怎么解決這個(gè)問(wèn)題呢?對(duì)于增量更新,做hash指紋這一點(diǎn)毋庸置疑,但是hash的對(duì)象不能是文件了,而應(yīng)該聚焦于真實(shí)存到知識(shí)庫(kù)的數(shù)據(jù): document.
在這里,我們將查看使用LangChain index API的基本索引工作流。
index API允許您將來(lái)自任何源的文檔加載到矢量存儲(chǔ)中并保持同步。具體來(lái)說(shuō),它有助于:
- 避免將重復(fù)的內(nèi)容寫(xiě)入vector存儲(chǔ)
- 避免重寫(xiě)未更改的內(nèi)容
- 避免在未更改的內(nèi)容上重新計(jì)算embedding
所有這些都可以節(jié)省你的時(shí)間和金錢(qián),并改善你的矢量搜索結(jié)果。
如何工作
LangChain索引使用記錄管理器(RecordManager)來(lái)跟蹤寫(xiě)入矢量存儲(chǔ)的文檔。
當(dāng)索引內(nèi)容時(shí),為每個(gè)文檔計(jì)算哈希值,并將以下信息存儲(chǔ)在記錄管理器中:
- 文檔hash(頁(yè)面內(nèi)容和元數(shù)據(jù)的散列)
- 寫(xiě)時(shí)間
- 源id——每個(gè)文檔應(yīng)該在其元數(shù)據(jù)中包含信息,以便我們確定該文檔的最終來(lái)源
刪除模式
將文檔索引到矢量存儲(chǔ)時(shí),可能會(huì)刪除矢量存儲(chǔ)中的一些現(xiàn)有文檔。在某些情況下,您可能希望刪除與正在索引的新文檔來(lái)自相同來(lái)源的所有現(xiàn)有文檔。在其他情況下,您可能希望批量刪除所有現(xiàn)有文檔。索引API刪除模式可以讓你選擇你想要的行為:
Cleanup Mode | De-Duplicates Content | Parallelizable | Cleans Up Deleted Source Docs | Cleans Up Mutations of Source Docs and/or Derived Docs | Clean Up Timing |
None | ? | ? | ? | ? | - |
Incremental | ? | ? | ? | ? | Continuously |
Full | ? | ? | ? | ? | At end of indexing |
快速開(kāi)始
首先,需要明確的是,無(wú)論使用何種清理模式,index函數(shù)都會(huì)自動(dòng)去重。也就是說(shuō),調(diào)用index([doc1, doc1, doc2])的效果等同于調(diào)用index([doc1, doc2])。然而,在我們的實(shí)際應(yīng)用場(chǎng)景中,情況并不完全如此。
可能在第一次運(yùn)行時(shí),我們對(duì)[doc1, doc2]進(jìn)行了索引操作,而在下次定時(shí)任務(wù)執(zhí)行時(shí),我們又對(duì)[doc1, doc3]進(jìn)行了索引。換言之,我們從源文檔中刪除了一部分內(nèi)容,并添加了一些新的內(nèi)容。這才是我們真正面臨的場(chǎng)景:我們希望保持doc1不變,新增doc3,并能夠自動(dòng)刪除doc2。這種需求可以通過(guò)Incremental增量模式得到滿足。
話不多說(shuō),我們來(lái)看看三種模式的使用效果吧。
None
None模式的功能可以理解為去重和添加,而不包括刪除。例如,如果你首次調(diào)用index([doc1, doc2]),然后再次調(diào)用index([doc1, doc3]),那么在向量庫(kù)中的數(shù)據(jù)就會(huì)是[doc1, doc2, doc3]。需要注意的是,這種模式下,舊版本的doc2并不會(huì)被刪除。
from langchain.embeddings import OpenAIEmbeddings
from langchain.schema import Document
from langchain.vectorstores.elasticsearch import ElasticsearchStore
from langchain.indexes import SQLRecordManager, index
collection_name = "test_index"
embedding = OpenAIEmbeddings()
vectorstore = ElasticsearchStore(
es_url="http://localhost:9200",
index_name="test_index",
embedding=embedding)
namespace = f"elasticsearch/{collection_name}"
record_manager = SQLRecordManager(
namespace, db_url="sqlite:///record_manager_cache.sql"
)
# record_manager.create_schema()
doc1 = Document(page_content="kitty", metadata={"source": "kitty.txt"})
doc2 = Document(page_content="doggy", metadata={"source": "doggy.txt"})
doc3 = Document(page_content="doggy1", metadata={"source": "doggy.txt"})
def _clear():
"""Hacky helper method to clear content. See the `full` mode section to to understand why it works."""
index(
[],
record_manager,
vectorstore,
cleanup="full",
source_id_key="source")
_clear()
res = index(
[doc1, doc1, doc2],
record_manager,
vectorstore,
cleanup=None,
source_id_key="source",
)
print(res)
得到的結(jié)果:
{'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
我們發(fā)現(xiàn)做了去重并且?guī)臀覀冊(cè)黾恿藘蓷l數(shù)據(jù)。
然后我們?cè)賵?zhí)行index操作:
res = index(
[doc1, doc3],
record_manager,
vectorstore,
cleanup=None,
source_id_key="source",
)
print(res)
執(zhí)行結(jié)果發(fā)現(xiàn)添加了doc3, 跳過(guò)了doc1, doc2 還在數(shù)據(jù)庫(kù)記錄里:
{'num_added': 1, 'num_updated': 0, 'num_skipped': 1, 'num_deleted': 0}
full
full 含義是用戶應(yīng)該將所有需要進(jìn)行索引的全部?jī)?nèi)容傳遞給index函數(shù),任何沒(méi)有傳遞到索引函數(shù)并且存在于vectorstore中的文檔將被刪除! 此行為對(duì)于處理源文檔的刪除非常有用。我們還是使用上面的代碼,這次只是把模式換成 full. 首先,我們需要重置并清空數(shù)據(jù),這可以通過(guò)調(diào)用??_clear()?
?函數(shù)實(shí)現(xiàn)。
res = index(
[doc1, doc1, doc2],
record_manager,
vectorstore,
cleanup="full",
source_id_key="source",
)
print(res)
我們發(fā)現(xiàn)添加了2個(gè)文檔:
{'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
接著我們執(zhí)行:
res = index(
[doc1, doc3],
record_manager,
vectorstore,
cleanup="full",
source_id_key="source",
)
print(res)
我們發(fā)現(xiàn)添加了一個(gè)文檔doc3,跳過(guò)了一個(gè)文檔doc1,刪除了一個(gè)文檔doc2:
{'num_added': 1, 'num_updated': 0, 'num_skipped': 1, 'num_deleted': 1}
incremental
"增量模式"是我們最常用的一種。顧名思義,這種模式主要進(jìn)行增量操作,即添加最新記錄并刪除舊版記錄。在這種模式下,如果我們傳入一個(gè)空的文檔數(shù)組,即index([]),將不會(huì)發(fā)生任何操作。然而,如果我們?cè)?全量模式"下傳入同樣的空數(shù)組,系統(tǒng)則會(huì)清除所有數(shù)據(jù)。
首先,執(zhí)行以下操作:
_clear()
res = index(
[doc1, doc1, doc2],
record_manager,
vectorstore,
cleanup="incremental",
source_id_key="source",
)
print(res)
res = index(
[doc1, doc3],
record_manager,
vectorstore,
cleanup="incremental",
source_id_key="source",
)
print(res)
得到的結(jié)果如下:
{'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
{'num_added': 1, 'num_updated': 0, 'num_skipped': 1, 'num_deleted': 1}
可以看出,第一次操作添加了兩個(gè)文檔。在第二次操作中,系統(tǒng)跳過(guò)了doc1,并刪除了之前屬于"doggy.txt"的doc2,因?yàn)楝F(xiàn)在我們只傳入了doc3。因此,增量模式會(huì)將這個(gè)舊版本(doc2)刪除。
然后執(zhí)行以下操作:
res = index(
[doc1],
record_manager,
vectorstore,
cleanup="incremental",
source_id_key="source",
)
print(res)
這次對(duì)于"doggy.txt"沒(méi)有任何新的文檔被傳入,所以數(shù)據(jù)沒(méi)有任何改動(dòng),結(jié)果如下:
{'num_added': 0, 'num_updated': 0, 'num_skipped': 1, 'num_deleted': 0}
但是,如果我們只傳入doc2,則會(huì)發(fā)現(xiàn)系統(tǒng)增加了doc2,并刪除了同一源文件("doggy.txt")的doc3。結(jié)果如下:
{'num_added': 1, 'num_updated': 0, 'num_skipped':
源碼
def index(
docs_source: Union[BaseLoader, Iterable[Document]],
record_manager: RecordManager,
vector_store: VectorStore,
*,
batch_size: int = 100,
cleanup: Literal["incremental", "full", None] = None,
source_id_key: Union[str, Callable[[Document], str], None] = None,
cleanup_batch_size: int = 1_000,
) -> IndexingResult:
...
if isinstance(docs_source, BaseLoader):
try:
doc_iterator = docs_source.lazy_load()
except NotImplementedError:
doc_iterator = iter(docs_source.load())
else:
doc_iterator = iter(docs_source)
source_id_assigner = _get_source_id_assigner(source_id_key)
# Mark when the update started.
index_start_dt = record_manager.get_time()
num_added = 0
num_skipped = 0
num_updated = 0
num_deleted = 0
for doc_batch in _batch(batch_size, doc_iterator):
hashed_docs = list(
_deduplicate_in_order(
[_HashedDocument.from_document(doc) for doc in doc_batch]
)
)
source_ids: Sequence[Optional[str]] = [
source_id_assigner(doc) for doc in hashed_docs
]
....
exists_batch = record_manager.exists([doc.uid for doc in hashed_docs])
# Filter out documents that already exist in the record store.
uids = []
docs_to_index = []
# 判斷哪些是要更新,哪些是要添加的
for hashed_doc, doc_exists in zip(hashed_docs, exists_batch):
if doc_exists:
# Must be updated to refresh timestamp.
record_manager.update([hashed_doc.uid], time_at_least=index_start_dt)
num_skipped += 1
continue
uids.append(hashed_doc.uid)
docs_to_index.append(hashed_doc.to_document())
# 知識(shí)入向量庫(kù)
if docs_to_index:
vector_store.add_documents(docs_to_index, ids=uids)
num_added += len(docs_to_index)
# 更新數(shù)據(jù)庫(kù)記錄時(shí)間
record_manager.update(
[doc.uid for doc in hashed_docs],
group_ids=source_ids,
time_at_least=index_start_dt,
)
# 根據(jù)時(shí)間和source_ids 清理舊版本數(shù)據(jù)
if cleanup == "incremental":
...
uids_to_delete = record_manager.list_keys(
group_ids=_source_ids, before=index_start_dt
)
if uids_to_delete:
vector_store.delete(uids_to_delete)
record_manager.delete_keys(uids_to_delete)
num_deleted += len(uids_to_delete)
if cleanup == "full":
while uids_to_delete := record_manager.list_keys(
before=index_start_dt, limit=cleanup_batch_size
):
# First delete from record store.
vector_store.delete(uids_to_delete)
# Then delete from record manager.
record_manager.delete_keys(uids_to_delete)
num_deleted += len(uids_to_delete)
return {
"num_added": num_added,
"num_updated": num_updated,
"num_skipped": num_skipped,
"num_deleted": num_deleted,
}
通過(guò)上述代碼,我們可以了解到一個(gè)常見(jiàn)的優(yōu)化策略:對(duì)于涉及大量數(shù)據(jù)操作的數(shù)據(jù)庫(kù)和向量庫(kù),我們通常使用批處理(batch)方式進(jìn)行操作。上面代碼的流程圖如下:
本文轉(zhuǎn)載自公眾號(hào)AI 博物院 作者:longyunfeigu
