討論一個技術問題,大模型流式返回 原創
?“ 技術上最容易犯的錯就是經驗主義,以及拿來主義”
最近在對接GPT實現一個功能,具體功能就不說了;主要是這個功能需要流式返回,因此踩了一些坑;所以就在此記錄一下。至于什么是流式返回,不清楚的可以自己問度娘。
大模型流式返回帶來的問題
自chatGPT推出以來,其一個字一個字的出現,就像一個打字機;這效果驚艷了很多人,因此在很多場景下很多人都會選擇打字機的效果。而打字機效果背后的實現就是流式返回。
對技術有過了解的人應該都知道,正常情況下接口是在所有業務處理完成之后一起返回;但流式返回是分多批次返回。簡單來說就是處理了一部分就返回一部分,不用等全部完成之后再返回。
如下圖所示就是一個典型的流式返回:
那目前流式返回所遇到的問題是什么呢?
其實從后端的角度來說,流式返回沒有任何問題;不論是使用大模型官方提供的SDK亦或者是調用他們的接口,都是正常的流式返回。但問題是,調用第三方接口的目的是為了完成業務功能,因此怎么把這個流式返回也用流式返回給前端就是一個需要思考的問題了。
從web開發的角度來說,現在前后端交互主要使用的是http協議;但http協議是前端向后端發起請求,而不能從后端向前端發起請求;為了解決這個問題,因此就有了websocket和SSE協議。
這兩個協議的區別是websocket是全雙工的,而SSE是半雙工的;意思就是說,websocket建立連接之后,前端可以主動向后端發消息,后端也可以主動向前端發消息;而SSE是只能后端向前端發消息。
但不論是websocket還是SSE協議,本質上只是一種通訊協議,和業務沒什么具體的關系;這就類似于,搞貨運的目的是把貨物安全的送到目的地,至于你是用汽車運,還是用火車運都可以。
那問題出在哪里呢?
剛開始我們使用的是websocket作為流式返回的通訊工具;但再實際使用中才發現一個很大的問題,那就是websocket無法在短時間內接受大量的網絡傳輸需求;一旦過量就會導致websocket緩沖區溢出,也就是TEXT_FULL_WRITRING異常;簡單來說就是,websocket為了減輕網絡壓力,每次發送消息都會先把緩沖區寫滿;然后再一次性發送。
但由于流式返回速度較快,有時候websocket上一條消息還沒發送出去,下一條新的數據又進來了;因此就會導致websocket報錯,即使使用的是異步發送也會報錯。
public void sendText(String text) {
for(Session session : sessions.values()){
if (session.isOpen()) {
try {
//異步發送
session.getAsyncRemote().sendText(text);
} catch (Exception e) {
log.error("發送會話異常");
}
}else{
log.error("socket 在不可發送狀態");
}
}
}
為了解決這個問題, 因此就在網上查了一下發現;類似于這種流式返回,大部分人的處理方式都是用SSE協議;因為SSE協議相對websocket更簡單,效率更高。而在java語言中,使用SSE有兩種方式,第一種就是自己手動創建SSE對象,使用SseEmitter 對象來實現。
但這種原生的實現方式存在很多問題,比如需要自己去控制sse與用戶的關聯關系,sse的狀態判斷,自動重連等等。
因此,springboot就提供了另一種方式,那就是Flux流式處理。
OpenAIClient client = new OpenAIClientBuilder().credential(new AzureKeyCredential(key)).endpoint(endPoint).buildClient();
IterableStream<ChatCompletions> stream = client.getChatCompletionsStream(modelName, new ChatCompletionsOptions(messages));
StringBuffer stringBuffer = new StringBuffer();
return Flux.<String>create(sink -> {
stream.iterator().forEachRemaining(
chatCompletions -> {
if (chatCompletions.getChoices() != null && chatCompletions.getChoices().size() > 0) {
if (chatCompletions.getChoices().get(0).getDelta() != null) {
String content = chatCompletions.getChoices().get(0).getDelta().getContent();
log.info(content);
if (content != null) {
stringBuffer.append(content);
sink.next(content);
}
}
}
}
);
sink.complete();
}).map(data -> ServerSentEvent.<String>builder().data(data).build())
// 每隔一段時間發送一個字符
.delayElements(Duration.ofMillis(10))
// 停止
.takeWhile(i -> !redisUtil.hasKey(stopKey))
// 最后執行
.doOnComplete(() -> {
//傳輸完成 業務處理
});
如上所示,Flux通過sink封裝大模型的流式返回,然后調用next方法主動把數據返回給前端,以此達到流式效果。
雖然從操作上來說,各種技術已經逐漸成熟,我們都可以直接拿來主義,拿過來用就好了;但實際上存在的一個問題就是,當你不知道其原理,又沒有經驗時,你還是會踩很多坑。
?
本文轉載自公眾號AI探索時代 作者:DFires
