響應(yīng)式異步非阻塞編程在服務(wù)端的應(yīng)用
作者 | 搜狐視頻 趙文浩
1、前言
對于服務(wù)端的開發(fā)者而言,我們總有一個共同的目標(biāo),那就是如何用更少的資源獲得足夠的性能來支持我們的服務(wù)!,我們不是在性能優(yōu)化中,就是在性能優(yōu)化的路上。作為Javaer我們,服務(wù)性能優(yōu)化的武器庫中,異步和并發(fā)是永遠不會過時的兩個。
然而理想很美好,現(xiàn)實很骨感:
- 異步編程的思維方式同大腦的命令式思維是背道而馳的。在Java的世界中,直到目前Jdk17,也沒有async/await來幫我們解決回調(diào)地獄的問題,強行大量異步,將深陷回調(diào)地獄而不能解脫...
- 并發(fā)調(diào)用方面,大量編排異步線程任務(wù),不僅會造成資源的快速消耗,也會導(dǎo)致業(yè)務(wù)流程的實現(xiàn)難以理解,正所謂:寫這段代碼時能理解它的只有我和God,一個月后能理解它的就只有God了...。
在服務(wù)端引入響應(yīng)式編程,是解決如上問題的一個好的思路。
下面,我以搜狐視頻服務(wù)端PUGC團隊在PUGC視頻對象創(chuàng)建接口的重構(gòu)工作的實踐為背景,介紹響應(yīng)式(基于RxJava)異步非阻塞編程在服務(wù)端的應(yīng)用在服務(wù)端的應(yīng)用。
2、問題概述
PUGC視頻對象創(chuàng)建接口,從業(yè)務(wù)角度看:用于為用戶上傳視頻數(shù)據(jù)前在服務(wù)端為其分配一個視頻對象記錄,該視頻對象記錄被用于描述當(dāng)前視頻的完整生命周期,從技術(shù)角度看:它是一個聚合接口,通過組合多個上游接口數(shù)據(jù),主業(yè)務(wù)過程涉及:帳號、內(nèi)容審核、視頻對象存儲、轉(zhuǎn)碼、CDN調(diào)度等,實現(xiàn)業(yè)務(wù)過程。
該接口代碼年代久遠,從提交記錄中可查到的最早的歷史在2013年,隨著業(yè)務(wù)的變化,開發(fā)人員的變更,代碼中充斥著各種各樣的味道。不論是從業(yè)務(wù)角度、性能角度亦或是日常維護角度看,都難以滿足需要。
- 業(yè)務(wù)流處理過程邏輯冗長,邏輯復(fù)雜,可讀性差,難于維護。
- 并發(fā)任務(wù)較多,但缺少合理的編排措施,是否需要異步或并發(fā)控制完全隨意。
- 幾乎沒有正確的異常處理。
- 充滿Ctrl+C/V的味道
- ......
為了解決以上的諸多問題,我們開始了重構(gòu)(重寫)之路。
3、讀懂業(yè)務(wù)流
重構(gòu)的原則是保證接口實現(xiàn)的業(yè)務(wù)規(guī)則的一致性,通過仔細研讀代碼,整理出接口中實現(xiàn)的諸多特性和業(yè)務(wù)規(guī)則。
視頻對象創(chuàng)建主流程如下圖所示:
注:時序圖中描述的是主流程中的關(guān)鍵點,因篇幅所限并未列出每個調(diào)用的具體細節(jié)
從用戶請求到達服務(wù)端開始劃分業(yè)務(wù)執(zhí)行階段:
- 數(shù)據(jù)校驗階段:帳號狀態(tài)/入?yún)⒑弦?guī)/內(nèi)容重復(fù)性
- 對象創(chuàng)建階段: 視頻對象存儲
- 關(guān)聯(lián)數(shù)據(jù)對象創(chuàng)建階段:
- 3.1 必要內(nèi)容:自媒體業(yè)務(wù)/視頻對象擴展
- 3.2 可選內(nèi)容:視頻元數(shù)據(jù)/轉(zhuǎn)碼/其它
- 4 上傳準(zhǔn)備:從CDN調(diào)度視頻內(nèi)容存儲結(jié)點
- 5 結(jié)束
這其中每個階段的內(nèi)容都需要通過若干個上游接口調(diào)用協(xié)作來完成,因篇幅所限,并未完全描述每個階段的具體實現(xiàn)細節(jié)。
4、選擇合適的架構(gòu)
通過分析業(yè)務(wù)流的特點:
- 業(yè)務(wù)流程鏈路較長
- 需要協(xié)作的上游接口較多
- IO操作為主
結(jié)合接口重構(gòu)核心目標(biāo):
- 代碼應(yīng)該準(zhǔn)確描述業(yè)務(wù)流
- 合理的并發(fā)任務(wù)編排
- 準(zhǔn)確異常處理
- 降低資源占用,提升接口性能
我們決定基于響應(yīng)式異步非阻塞架構(gòu)對接口進行重構(gòu):
- 響應(yīng)式編程是面向數(shù)據(jù)流的,業(yè)務(wù)流的特點可知,它有明確的階段性,數(shù)據(jù)在每個階段變換和流動
- 整個過程涉及較多的接口調(diào)用和異步任務(wù)編排,基于異步非阻塞可顯著減少異步線程的依賴
- 響應(yīng)式編程中提供了完善的異常處理機制,特別對異步環(huán)境下的異常處理非常友好
涉及的基礎(chǔ)組件
- 核心響應(yīng)式編程框架: ReactiveX/RxJava
- Http客戶端 Vertx WebClient
- CacheCloud響應(yīng)式客戶端 sohutv-basic / cachecloud-client
- Dubbo響應(yīng)式客戶端sohutv-basic / dubbo-reactive-consumer
同步阻塞模式適配響應(yīng)式異步非阻塞
因后續(xù)章節(jié)代碼示例中主要以dubbo服務(wù)接口調(diào)用為主,所以我以視頻服務(wù)端團隊基于dubbo-2.6.5版本實現(xiàn)的dubbo響應(yīng)式客戶端dubbo-reactive-consumer為例,介紹將傳統(tǒng)的同步阻塞模式適配響應(yīng)式異步非阻塞的思路。
Dubbo是目前視頻服務(wù)端用使用的較多的RPC框架,目前最新的版本是Dubbo3.x(注:由于歷史原因,目前團隊使用的版本還停留在2.6.5版本)。在Spring環(huán)境中,比較簡便的注入dubbo服務(wù)接口代理對象(簡稱接口對象)的方式是通過@com.alibaba.dubbo.config.annotation.Reference注解自動注入,示例如下:
@Component
public class DubboSyncExample {
@Reference
private VideoInfoService videoInfoService;
@Nullable
public VideoInfo getVideoInfo(long vid) throws Exception {
return videoInfoService.getVideoInfo(vid);
}
}
默認(rèn)情況下,這種方式注入的是基于同步阻塞模式接口對象,由于dubbo客戶端基于Netty實現(xiàn),所以它天生是是異步的。一般情況下,如果希望通過異步方式使用dubbo客戶端可以按如下方式操作:
@Component
public class DubboAsyncExample {
/**
* 標(biāo)記為異步調(diào)用
*/
@Reference(async = true)
private VideoInfoService videoInfoService;
@NotNull
public Future<VideoInfo> getVideoInfo(long vid) throws Exception {
// 提交異步請求
videoInfoService.getVideoInfo(vid);
// 通過ThreadLocal保存了對當(dāng)前請求上下文的引用RpcContext
RpcContext rpcContext = RpcContext.getContext();
// 從RpcContext中獲取當(dāng)前請求的Future<T>
Future<VideoInfo> future = rpcContext.getFuture();
return future;
}
}
為支持響應(yīng)式框架,同時保持通過@com.alibaba.dubbo.config.annotation.Reference注解自動注入接口對象的方式,我們的實現(xiàn)過程如下:
將接口對象包裝成響應(yīng)式引用類型:ReactiveReference<Service>
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Single;
import java.util.Optional;
public interface ReactiveReference<Service> {
@NotNull
<T> Maybe<T> maybe(@NotNull Function<Service, T> f);
@NotNull
<T> Single<Optional<T>> single(@NotNull Function<Service, T> f);
}
接口中提供了兩個方法用到了整個單值類型的響應(yīng)式流:
- Maybe<T>: 當(dāng)向下游Observer發(fā)射的對象為null時,會結(jié)束整個流的生命周期
- Single<T>: 在整個流的生命周期中不允許發(fā)射null對象
實現(xiàn)響應(yīng)式引用實現(xiàn)ReactiveReferenceImpl<Service>
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Single;
import org.jetbrains.annotations.NotNull;
import java.util.Optional;
final class RxJavaReactiveReferenceImpl<Service> implements ReactiveReference<Service> {
@NotNull
final Service service;
RxJavaReactiveReferenceImpl(@NotNull Service service) {
this.service = service;
}
/**
* maybe
*/
@Override
@NotNull
public <T> Maybe<T> maybe(@NotNull Function<Service, T> f) {
// 將Single<Optional<T>>解包成Maybe<T>
return this.single(f).mapOptional($ -> $);
}
/**
* single
* @param actual 實際的調(diào)用
*/
@Override
@NotNull
public <T> Single<Optional<T>> single(@NotNull Function<Service, T> actual) {
// 延遲創(chuàng)建一個Single流
return Single.defer(() ->
Single.create(
emitter -> {
DubboSubscription<T> subscription =
new DubboSubscription<>(
() -> actual.apply(service), // 將實際調(diào)用包裝成 java.util.concurrent.Callable<T>
emitter::onSuccess, // 執(zhí)行成功向下游發(fā)射結(jié)果
emitter::onError // 執(zhí)行異常,向下游發(fā)射異常信息
);
emitter.setCancellable(subscription::cancel); // 支持流取消時的回調(diào)
subscription.request(1L);
}
)
);
}
}
DubboSubscription<T>用于同Dubbo框架橋接,實現(xiàn)如下:
import com.alibaba.dubbo.remoting.exchange.ResponseCallback;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.protocol.dubbo.FutureAdapter;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Subscription;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
public class DubboSubscription<T> implements Subscription {
/**
* dubbo rpc調(diào)用的結(jié)果Future引用,用于維護生命周期
* 當(dāng)響應(yīng)式流被取消時需要取消這個Future
*/
@NotNull
private final AtomicReference<Future<T>> futureRef = new AtomicReference<>();
/**
* 實際的接口調(diào)用
*/
@NotNull
private final Callable<T> actual;
/**
* 當(dāng)rpc調(diào)用成功時執(zhí)行
*/
@NotNull
private final Consumer<Optional<T>> onNext;
/**
* 當(dāng)rpc調(diào)用異常時執(zhí)行
*/
@NotNull
private final Consumer<Throwable> onError;
public DubboSubscription(@NotNull Callable<T> actual, @NotNull Consumer<Optional<T>> onNext, @NotNull Consumer<Throwable> onError) {
this.actual = actual;
this.onNext = onNext;
this.onError = onError;
}
@Override
public void request(long n) {
try {
RpcContext rpcContext = RpcContext.getContext();
// 異步執(zhí)行dubbo rpc請求
this.actual.call();
// 取到當(dāng)前請求的Future<T>
Future<T> future = rpcContext.getFuture();
if (!(future instanceof FutureAdapter)) {
// Future<T>類型驗證,要求是 com.alibaba.dubbo.rpc.protocol.dubbo.FutureAdapter,
// 因為FutureAdapter包裝了 com.alibaba.dubbo.remoting.exchange.ResponseFuture對象,它支持設(shè)置結(jié)果回調(diào)
throw new UnsupportedOperationException(String.format("The future is not a instance of [%s]", FutureAdapter.class.getName()));
}
futureRef.set(future); // 設(shè)置Future引用
FutureAdapter<T> futureAdapter = (FutureAdapter<T>) future;
futureAdapter
.getFuture()
.setCallback(
new ResponseCallback() {
@SuppressWarnings("unchecked")
@Override
public void done(Object response) {
try {
T t = (T) ((Result) response).recreate(); // 結(jié)果對象反序列化
DubboSubscription.this.onNext.accept(Optional.ofNullable(t)); // dubbo rpc調(diào)用成功
} catch (Throwable e) {
DubboSubscription.this.onError(e); // 一般為反序列化操作引起的異常
}
}
@Override
public void caught(Throwable e) {
// // dubbo rpc調(diào)用異常
DubboSubscription.this.onError(e);
}
}
);
} catch (Throwable e) {
// 其它異常
onError(e);
}
}
private void onError(Throwable e) {
// 執(zhí)行onError回調(diào)
DubboSubscription.this.onError.accept(e);
}
/**
* 響應(yīng)式流取消時回調(diào)
*/
@Override
public void cancel() {
Future<T> future = futureRef.get();
if (future != null && !future.isDone()) {
future.cancel(true);
}
}
}
于是基于我們的響應(yīng)式dubbo客戶端,服務(wù)接口的調(diào)用方式為:
@Component
public class DubboReactiveExample {
@Reference
private ReactiveReference<VideoInfoService> videoInfoService;
/**
* 響應(yīng)式請求
*/
@NotNull
public Maybe<VideoInfo> maybeVideoInfo(long vid) {
// 提交響應(yīng)式請求
return videoInfoService.maybe(service -> service.getVideoInfo(vid));
}
/**
* 響應(yīng)式請求
*/
@NotNull
public Single<Optional<VideoInfo>> singleVideoInfo(long vid) {
// 提交響應(yīng)式請求
return videoInfoService.single(service -> service.getVideoInfo(vid));
}
}
注:此處的實現(xiàn)為常規(guī)的Spring生命周期處理,與本文無關(guān),具體實現(xiàn)細節(jié)不再贅述
擴展com.alibaba.dubbo.config.spring.beans.factory.annotation.ReferenceAnnotationBeanPostProcessor使dubbo自動注入框架支持ReactiveReference<Service>類型,自動設(shè)置為異步模式,且向目標(biāo)Component中注入的實際類型為ReactiveReferenceImpl<Service>
5、實踐過程
我們在讀懂業(yè)務(wù)流一節(jié)中對視頻對象創(chuàng)建主流程做了說明,然后將主流程分成了5個階段,現(xiàn)在我們使用RxJava代碼來描述這5個階段(注:后續(xù)代碼僅用于演示主流程的實現(xiàn)過程,并不是完整的實現(xiàn)過程)。
輸入?yún)?shù)Args
- 為保證每個流處理結(jié)點的無狀態(tài)性,Args被設(shè)計為不可變
/**
* 入?yún)b
* 為保證每個流處理結(jié)點的無狀態(tài)性,Args被設(shè)計為不可變
*/
@Builder
@Getter
public class Args {
@NotNull
private final long accountId;
private final String title;
private final String coverImg;
private final String other;
/**
* 視頻內(nèi)容校驗值
*/
private final String signature;
... // other fields
}
過程結(jié)果PassObj
- 為保證每個流處理結(jié)點的無狀態(tài)性,PassObj被設(shè)計為不可變
/**
* 響應(yīng)式流處理過程對象包裝
* 為保證每個流處理結(jié)點的無狀態(tài)性,PassObj被設(shè)計為不可變
*/
@Builder(toBuilder = true)
@Getter
public class PassObj {
private final Args args;
private final Account account;
private final boolean argsAccepted;
private final VideoInfo videoInfo;
private final VideoInfoExtend videoInfoExtend;
private final PugcInfo pugcInfo;
private final CDNNodeInfo cdnNodeInfo;
... // other fields
}
響應(yīng)式流主干
- 清晰地表達了業(yè)務(wù)流的執(zhí)行過程
@NotNull
public Single<PassObj> create(@NotNull Args inputArgs) {
return Single.fromCallable(() -> PassObj.builder().args(inputArgs).build())
.flatMap(this::checkStage) // 1 數(shù)據(jù)校驗階段:帳號狀態(tài)/入?yún)⒑弦?guī)/內(nèi)容重復(fù)性
.flatMap(this::createVideoObjectStage)// 2 對象創(chuàng)建階段: 視頻對象存儲
.flatMap(this::createCorrelateObjectStage)// 3 關(guān)聯(lián)數(shù)據(jù)對象創(chuàng)建階段
.flatMap(this::cdnDispatchStage)// 4 上傳準(zhǔn)備:從CDN調(diào)度視頻內(nèi)容存儲結(jié)點\
.doOnSuccess(passObj -> { // 5 結(jié)束
// 視頻對象創(chuàng)建成功
})
.doOnError(e -> log.error(e.getMessage(), e))
;
}
每個階段的詳細實現(xiàn):
數(shù)據(jù)校驗階段:帳號狀態(tài)/入?yún)⒑弦?guī)/內(nèi)容重復(fù)性
/**
* 1 數(shù)據(jù)校驗階段:帳號狀態(tài)/入?yún)⒑弦?guī)/內(nèi)容重復(fù)性
*/
@NotNull
public Single<PassObj> checkStage(@NotNull PassObj passObj) {
return this.checkAndGetAccount(passObj) // 驗證并獲取account對象
.map(account -> passObj.toBuilder().account(account).build())// 帳號狀態(tài)校驗通過, 重建passObj,并裝配account
.flatMap(_passObj ->
this.checkInputArgs(_passObj) // 入?yún)⒑弦?guī)性校驗
.map(argsAccepted -> _passObj.toBuilder().argsAccepted(argsAccepted).build())// 入?yún)⒑弦?guī)性校驗通過, 重建passObj,并裝配校驗結(jié)果
)
.flatMap(_passObj -> // 視頻元數(shù)據(jù)信息校驗
this.videoMetaService.single(s -> s.getVideoMetaBySignature(passObj.getArgs().getSignature()))
.map($videoMeta -> {
if ($videoMeta.isPresent()) {
// 視頻元數(shù)據(jù)信息存在,說明視頻上傳是重復(fù)的,執(zhí)行相關(guān)的異常處理
throw new VideoDuplicatedException();
}
// 視頻沒有重復(fù),可以創(chuàng)建,向后傳遞passObj
return _passObj;
})
)
;
}
- 驗證并獲取Account對象
/**
* 驗證并獲取Account對象
*/
@NotNull
private Single<Account> checkAndGetAccount(@NotNull PassObj passObj) {
return Maybe.fromCallable(() -> passObj.getArgs().getAccountId())// 取accountId
.filter(accountId -> accountId > 0L) // 取基本有效的值
.flatMap(accountId -> this.accountService.maybe(s -> s.checkAccount(accountId)))// 驗證帳號狀態(tài)
.defaultIfEmpty(false)
.doOnSuccess(accountAccepted -> {
if (!accountAccepted) {
// 帳號狀態(tài)校驗失敗
// 通過明確的異常來處理失敗過程
throw new AccountRejectedException();
}
})
.flatMapMaybe(_accountAccepted -> this.accountService.maybe(s -> s.getAccount(passObj.getArgs().getAccountId())))
.switchIfEmpty(Single.error(AccountObjectNullPointerException::new))
;
}
- 入?yún)⒑弦?guī)性校驗
/**
* 入?yún)⒑弦?guī)性校驗
*/
private Single<Boolean> checkInputArgs(@NotNull PassObj passObj) { // 入?yún)⒑弦?guī)性校驗可并發(fā)執(zhí)行
return Single.zip(
this.contentAuditService.maybe(s -> s.titleCheck(passObj.getArgs().getTitle())).switchIfEmpty(Single.error(TitleRejectedException::new)), // title
this.contentAuditService.maybe(s -> s.coverImgCheck(passObj.getArgs().getTitle())).switchIfEmpty(Single.error(CoverImgRejectedException::new)), // coverImage
this.contentAuditService.maybe(s -> s.otherCheck(passObj.getArgs().getTitle())).switchIfEmpty(Single.error(OtherRejectedException::new)), // others
(titleAccepted, coverImgAccepted, otherAccepted) -> titleAccepted && coverImgAccepted && otherAccepted
);
}
對象創(chuàng)建階段: 視頻對象存儲
/**
* 2 對象創(chuàng)建階段: 視頻對象存儲
*/
@NotNull
public Single<PassObj> createVideoObjectStage(@NotNull PassObj passObj) {
return Single.fromCallable(() ->
// 包裝VideoInfo對象
VideoInfo.builder()
.userId(passObj.getAccount().getId())
.title(passObj.getArgs().getTitle())
.coverImg(passObj.getArgs().getCoverImg())
.other(passObj.getArgs().getOther())
.build()
)
.flatMapMaybe(videoInfo ->
this.videoInfoService.maybe(s -> s.createVideoInfo(videoInfo)) // 保存視頻對象
.filter(createdVideoInfo -> createdVideoInfo.getId() > 0L) // 驗證保存結(jié)果
)
.onErrorResumeNext(e -> Maybe.error(() -> new VideoInfoCreateException(e))) // 異常處理
.switchIfEmpty(Single.error(VideoInfoCreateException::new)) // 異常處理
.map(createdVideoInfo -> passObj.toBuilder().videoInfo(createdVideoInfo).build()) // 將保存結(jié)果裝配到passObj中
;
}
關(guān)聯(lián)數(shù)據(jù)對象創(chuàng)建階段
/**
* 3 關(guān)聯(lián)數(shù)據(jù)對象創(chuàng)建階段
*/
@NotNull
public Single<PassObj> createCorrelateObjectStage(@NotNull PassObj passObj) {
return Single.zip( // 必要內(nèi)容:自媒體業(yè)務(wù)/視頻對象擴展 可并發(fā)執(zhí)行
Single.fromCallable(() -> // 視頻對象擴展
VideoInfoExtend.builder()
.userId(passObj.getAccount().getId())
.videoInfoId(passObj.getVideoInfo().getId())
.build()
)
.flatMap(videoInfoExtend ->
this.videoInfoService.maybe(s -> s.createVideoInfoExtend(videoInfoExtend))
.onErrorResumeNext(e -> Maybe.error(() -> new VideoInfoExtendCreateException(e))) // 異常處理
.switchIfEmpty(Single.error(VideoInfoExtendCreateException::new)) // 異常處理
),
Single.fromCallable(() -> // 自媒體業(yè)務(wù)
PugcInfo.builder()
.userId(passObj.getAccount().getId())
.videoInfoId(passObj.getVideoInfo().getId())
.build()
)
.flatMap(pugcInfo ->
this.pugcService.maybe(s -> s.createPugcInfo(pugcInfo))
.onErrorResumeNext(e -> Maybe.error(() -> new PugcInfoCreateException(e))) // 異常處理
.switchIfEmpty(Single.error(PugcInfoCreateException::new)) // 異常處理
),
(createdVideoInfoExtend, createdPugcInfo) ->
passObj.toBuilder()
.videoInfoExtend(createdVideoInfoExtend)
.pugcInfo(createdPugcInfo)
.build()
)
.doOnSuccess(updatedPassObj -> { // 可選內(nèi)容:視頻元數(shù)據(jù)/轉(zhuǎn)碼/其它 不關(guān)注結(jié)果,異步執(zhí)行
Maybe.fromCallable(() ->
VideoMeta.builder()
.signature(updatedPassObj.getArgs().getSignature())
.build()
)
.flatMap(videoMeta -> this.videoMetaService.maybe(s -> s.createVideoMeta(videoMeta)))
.doOnError(e -> log.error(e.getMessage(), e))
.subscribe();
this.videoTranscodingService.maybe(s -> s.createVideoKeyFrame(updatedPassObj.getVideoInfo())).subscribe();
this.videoTranscodingService.maybe(s -> s.createVideoResolution(updatedPassObj.getVideoInfo())).subscribe();
this.otherOptionalService.maybe(s -> s.doSomething(updatedPassObj.getVideoInfo())).subscribe();
})
;
}
上傳準(zhǔn)備:從CDN調(diào)度視頻內(nèi)容存儲結(jié)點
/**
* 上傳準(zhǔn)備:從CDN調(diào)度視頻內(nèi)容存儲結(jié)點
*/
@NotNull
public Single<PassObj> cdnDispatchStage(@NotNull PassObj passObj) {
return cdnDispatchService.maybe(CDNDispatchService::dispatch) // 調(diào)度一個CDN上傳節(jié)點
.onErrorResumeNext(e -> Maybe.error(() -> new CDNDispatchException(e))) // 調(diào)用異常
.switchIfEmpty(Single.error(CDNDispatchException::new)) // 沒有取到節(jié)點, 異常處理
.map(cdnNodeInfo ->
passObj.toBuilder()
.cdnNodeInfo(cdnNodeInfo)
.build()
)
;
}
6、總結(jié)
以上內(nèi)容是搜狐視頻服務(wù)端PUGC團隊,首次在核心業(yè)務(wù)接口中應(yīng)用響應(yīng)式異步非阻塞架構(gòu)的思考和實施過程,文中主要闡述了兩個內(nèi)容:
- Dubbo響應(yīng)式客戶端的實現(xiàn)過程
- 視頻對象創(chuàng)建接口的業(yè)務(wù)分析和實現(xiàn)過程
通過本次重構(gòu),我們獲得了如下收益:
- 重新梳理業(yè)務(wù)流,將業(yè)務(wù)流同響應(yīng)式流整合,基于響應(yīng)式編程規(guī)范業(yè)務(wù)流的實現(xiàn)過程
- 視頻對象創(chuàng)建接口平均響應(yīng)時間從256ms降低到146ms,性能提供顯著
結(jié)束,感謝閱讀!