成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

響應(yīng)式異步非阻塞編程在服務(wù)端的應(yīng)用

開發(fā)
對于服務(wù)端的開發(fā)者而言,我們總有一個共同的目標(biāo),那就是如何用更少的資源獲得足夠的性能來支持我們的服務(wù)!,我們不是在性能優(yōu)化中,就是在性能優(yōu)化的路上。

作者 | 搜狐視頻 趙文浩

1、前言

對于服務(wù)端的開發(fā)者而言,我們總有一個共同的目標(biāo),那就是如何用更少的資源獲得足夠的性能來支持我們的服務(wù)!,我們不是在性能優(yōu)化中,就是在性能優(yōu)化的路上。作為Javaer我們,服務(wù)性能優(yōu)化的武器庫中,異步和并發(fā)是永遠不會過時的兩個。

然而理想很美好,現(xiàn)實很骨感:

  • 異步編程的思維方式同大腦的命令式思維是背道而馳的。在Java的世界中,直到目前Jdk17,也沒有async/await來幫我們解決回調(diào)地獄的問題,強行大量異步,將深陷回調(diào)地獄而不能解脫...
  • 并發(fā)調(diào)用方面,大量編排異步線程任務(wù),不僅會造成資源的快速消耗,也會導(dǎo)致業(yè)務(wù)流程的實現(xiàn)難以理解,正所謂:寫這段代碼時能理解它的只有我和God,一個月后能理解它的就只有God了...。

在服務(wù)端引入響應(yīng)式編程,是解決如上問題的一個好的思路。

下面,我以搜狐視頻服務(wù)端PUGC團隊在PUGC視頻對象創(chuàng)建接口的重構(gòu)工作的實踐為背景,介紹響應(yīng)式(基于RxJava)異步非阻塞編程在服務(wù)端的應(yīng)用在服務(wù)端的應(yīng)用。

2、問題概述

PUGC視頻對象創(chuàng)建接口,從業(yè)務(wù)角度看:用于為用戶上傳視頻數(shù)據(jù)前在服務(wù)端為其分配一個視頻對象記錄,該視頻對象記錄被用于描述當(dāng)前視頻的完整生命周期,從技術(shù)角度看:它是一個聚合接口,通過組合多個上游接口數(shù)據(jù),主業(yè)務(wù)過程涉及:帳號、內(nèi)容審核、視頻對象存儲、轉(zhuǎn)碼、CDN調(diào)度等,實現(xiàn)業(yè)務(wù)過程。

該接口代碼年代久遠,從提交記錄中可查到的最早的歷史在2013年,隨著業(yè)務(wù)的變化,開發(fā)人員的變更,代碼中充斥著各種各樣的味道。不論是從業(yè)務(wù)角度、性能角度亦或是日常維護角度看,都難以滿足需要。

  • 業(yè)務(wù)流處理過程邏輯冗長,邏輯復(fù)雜,可讀性差,難于維護。
  • 并發(fā)任務(wù)較多,但缺少合理的編排措施,是否需要異步或并發(fā)控制完全隨意。
  • 幾乎沒有正確的異常處理。
  • 充滿Ctrl+C/V的味道
  • ......

為了解決以上的諸多問題,我們開始了重構(gòu)(重寫)之路。

3、讀懂業(yè)務(wù)流

重構(gòu)的原則是保證接口實現(xiàn)的業(yè)務(wù)規(guī)則的一致性,通過仔細研讀代碼,整理出接口中實現(xiàn)的諸多特性和業(yè)務(wù)規(guī)則。

視頻對象創(chuàng)建主流程如下圖所示:

圖片

 注:時序圖中描述的是主流程中的關(guān)鍵點,因篇幅所限并未列出每個調(diào)用的具體細節(jié)

從用戶請求到達服務(wù)端開始劃分業(yè)務(wù)執(zhí)行階段:

  1. 數(shù)據(jù)校驗階段:帳號狀態(tài)/入?yún)⒑弦?guī)/內(nèi)容重復(fù)性
  2. 對象創(chuàng)建階段: 視頻對象存儲
  3. 關(guān)聯(lián)數(shù)據(jù)對象創(chuàng)建階段:
  • 3.1 必要內(nèi)容:自媒體業(yè)務(wù)/視頻對象擴展
  • 3.2 可選內(nèi)容:視頻元數(shù)據(jù)/轉(zhuǎn)碼/其它
  • 4 上傳準(zhǔn)備:從CDN調(diào)度視頻內(nèi)容存儲結(jié)點
  • 5 結(jié)束

這其中每個階段的內(nèi)容都需要通過若干個上游接口調(diào)用協(xié)作來完成,因篇幅所限,并未完全描述每個階段的具體實現(xiàn)細節(jié)。

4、選擇合適的架構(gòu)

通過分析業(yè)務(wù)流的特點:

  • 業(yè)務(wù)流程鏈路較長
  • 需要協(xié)作的上游接口較多
  • IO操作為主

結(jié)合接口重構(gòu)核心目標(biāo):

  • 代碼應(yīng)該準(zhǔn)確描述業(yè)務(wù)流
  • 合理的并發(fā)任務(wù)編排
  • 準(zhǔn)確異常處理
  • 降低資源占用,提升接口性能

我們決定基于響應(yīng)式異步非阻塞架構(gòu)對接口進行重構(gòu):

  • 響應(yīng)式編程是面向數(shù)據(jù)流的,業(yè)務(wù)流的特點可知,它有明確的階段性,數(shù)據(jù)在每個階段變換和流動
  • 整個過程涉及較多的接口調(diào)用和異步任務(wù)編排,基于異步非阻塞可顯著減少異步線程的依賴
  • 響應(yīng)式編程中提供了完善的異常處理機制,特別對異步環(huán)境下的異常處理非常友好

涉及的基礎(chǔ)組件

  • 核心響應(yīng)式編程框架: ReactiveX/RxJava
  • Http客戶端 Vertx WebClient
  • CacheCloud響應(yīng)式客戶端 sohutv-basic / cachecloud-client
  • Dubbo響應(yīng)式客戶端sohutv-basic / dubbo-reactive-consumer

同步阻塞模式適配響應(yīng)式異步非阻塞

因后續(xù)章節(jié)代碼示例中主要以dubbo服務(wù)接口調(diào)用為主,所以我以視頻服務(wù)端團隊基于dubbo-2.6.5版本實現(xiàn)的dubbo響應(yīng)式客戶端dubbo-reactive-consumer為例,介紹將傳統(tǒng)的同步阻塞模式適配響應(yīng)式異步非阻塞的思路。

Dubbo是目前視頻服務(wù)端用使用的較多的RPC框架,目前最新的版本是Dubbo3.x(注:由于歷史原因,目前團隊使用的版本還停留在2.6.5版本)。在Spring環(huán)境中,比較簡便的注入dubbo服務(wù)接口代理對象(簡稱接口對象)的方式是通過@com.alibaba.dubbo.config.annotation.Reference注解自動注入,示例如下:

@Component
public class DubboSyncExample {

@Reference
private VideoInfoService videoInfoService;

@Nullable
public VideoInfo getVideoInfo(long vid) throws Exception {
return videoInfoService.getVideoInfo(vid);
}
}

默認(rèn)情況下,這種方式注入的是基于同步阻塞模式接口對象,由于dubbo客戶端基于Netty實現(xiàn),所以它天生是是異步的。一般情況下,如果希望通過異步方式使用dubbo客戶端可以按如下方式操作:

@Component
public class DubboAsyncExample {

/**
* 標(biāo)記為異步調(diào)用
*/
@Reference(async = true)
private VideoInfoService videoInfoService;

@NotNull
public Future<VideoInfo> getVideoInfo(long vid) throws Exception {
// 提交異步請求
videoInfoService.getVideoInfo(vid);
// 通過ThreadLocal保存了對當(dāng)前請求上下文的引用RpcContext
RpcContext rpcContext = RpcContext.getContext();
// 從RpcContext中獲取當(dāng)前請求的Future<T>
Future<VideoInfo> future = rpcContext.getFuture();
return future;
}
}

為支持響應(yīng)式框架,同時保持通過@com.alibaba.dubbo.config.annotation.Reference注解自動注入接口對象的方式,我們的實現(xiàn)過程如下:

將接口對象包裝成響應(yīng)式引用類型:ReactiveReference<Service>

import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Single;

import java.util.Optional;

public interface ReactiveReference<Service> {

@NotNull
<T> Maybe<T> maybe(@NotNull Function<Service, T> f);

@NotNull
<T> Single<Optional<T>> single(@NotNull Function<Service, T> f);
}

接口中提供了兩個方法用到了整個單值類型的響應(yīng)式流:

  • Maybe<T>: 當(dāng)向下游Observer發(fā)射的對象為null時,會結(jié)束整個流的生命周期
  • Single<T>: 在整個流的生命周期中不允許發(fā)射null對象

實現(xiàn)響應(yīng)式引用實現(xiàn)ReactiveReferenceImpl<Service>

import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Single;
import org.jetbrains.annotations.NotNull;

import java.util.Optional;

final class RxJavaReactiveReferenceImpl<Service> implements ReactiveReference<Service> {

@NotNull
final Service service;

RxJavaReactiveReferenceImpl(@NotNull Service service) {
this.service = service;
}

/**
* maybe
*/
@Override
@NotNull
public <T> Maybe<T> maybe(@NotNull Function<Service, T> f) {
// 將Single<Optional<T>>解包成Maybe<T>
return this.single(f).mapOptional($ -> $);
}

/**
* single
* @param actual 實際的調(diào)用
*/
@Override
@NotNull
public <T> Single<Optional<T>> single(@NotNull Function<Service, T> actual) {
// 延遲創(chuàng)建一個Single流
return Single.defer(() ->
Single.create(
emitter -> {
DubboSubscription<T> subscription =
new DubboSubscription<>(
() -> actual.apply(service), // 將實際調(diào)用包裝成 java.util.concurrent.Callable<T>
emitter::onSuccess, // 執(zhí)行成功向下游發(fā)射結(jié)果
emitter::onError // 執(zhí)行異常,向下游發(fā)射異常信息
);
emitter.setCancellable(subscription::cancel); // 支持流取消時的回調(diào)
subscription.request(1L);
}
)
);
}
}

DubboSubscription<T>用于同Dubbo框架橋接,實現(xiàn)如下:

import com.alibaba.dubbo.remoting.exchange.ResponseCallback;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.protocol.dubbo.FutureAdapter;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Subscription;

import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

public class DubboSubscription<T> implements Subscription {

/**
* dubbo rpc調(diào)用的結(jié)果Future引用,用于維護生命周期
* 當(dāng)響應(yīng)式流被取消時需要取消這個Future
*/
@NotNull
private final AtomicReference<Future<T>> futureRef = new AtomicReference<>();

/**
* 實際的接口調(diào)用
*/
@NotNull
private final Callable<T> actual;
/**
* 當(dāng)rpc調(diào)用成功時執(zhí)行
*/
@NotNull
private final Consumer<Optional<T>> onNext;
/**
* 當(dāng)rpc調(diào)用異常時執(zhí)行
*/
@NotNull
private final Consumer<Throwable> onError;

public DubboSubscription(@NotNull Callable<T> actual, @NotNull Consumer<Optional<T>> onNext, @NotNull Consumer<Throwable> onError) {
this.actual = actual;
this.onNext = onNext;
this.onError = onError;
}

@Override
public void request(long n) {
try {
RpcContext rpcContext = RpcContext.getContext();
// 異步執(zhí)行dubbo rpc請求
this.actual.call();
// 取到當(dāng)前請求的Future<T>
Future<T> future = rpcContext.getFuture();
if (!(future instanceof FutureAdapter)) {
// Future<T>類型驗證,要求是 com.alibaba.dubbo.rpc.protocol.dubbo.FutureAdapter,
// 因為FutureAdapter包裝了 com.alibaba.dubbo.remoting.exchange.ResponseFuture對象,它支持設(shè)置結(jié)果回調(diào)
throw new UnsupportedOperationException(String.format("The future is not a instance of [%s]", FutureAdapter.class.getName()));
}
futureRef.set(future); // 設(shè)置Future引用
FutureAdapter<T> futureAdapter = (FutureAdapter<T>) future;
futureAdapter
.getFuture()
.setCallback(
new ResponseCallback() {
@SuppressWarnings("unchecked")
@Override
public void done(Object response) {
try {
T t = (T) ((Result) response).recreate(); // 結(jié)果對象反序列化
DubboSubscription.this.onNext.accept(Optional.ofNullable(t)); // dubbo rpc調(diào)用成功
} catch (Throwable e) {
DubboSubscription.this.onError(e); // 一般為反序列化操作引起的異常
}
}

@Override
public void caught(Throwable e) {
// // dubbo rpc調(diào)用異常
DubboSubscription.this.onError(e);
}
}
);
} catch (Throwable e) {
// 其它異常
onError(e);
}
}

private void onError(Throwable e) {
// 執(zhí)行onError回調(diào)
DubboSubscription.this.onError.accept(e);
}

/**
* 響應(yīng)式流取消時回調(diào)
*/
@Override
public void cancel() {
Future<T> future = futureRef.get();
if (future != null && !future.isDone()) {
future.cancel(true);
}
}
}

于是基于我們的響應(yīng)式dubbo客戶端,服務(wù)接口的調(diào)用方式為:

@Component
public class DubboReactiveExample {

@Reference
private ReactiveReference<VideoInfoService> videoInfoService;

/**
* 響應(yīng)式請求
*/
@NotNull
public Maybe<VideoInfo> maybeVideoInfo(long vid) {
// 提交響應(yīng)式請求
return videoInfoService.maybe(service -> service.getVideoInfo(vid));
}

/**
* 響應(yīng)式請求
*/
@NotNull
public Single<Optional<VideoInfo>> singleVideoInfo(long vid) {
// 提交響應(yīng)式請求
return videoInfoService.single(service -> service.getVideoInfo(vid));
}
}

注:此處的實現(xiàn)為常規(guī)的Spring生命周期處理,與本文無關(guān),具體實現(xiàn)細節(jié)不再贅述

擴展com.alibaba.dubbo.config.spring.beans.factory.annotation.ReferenceAnnotationBeanPostProcessor使dubbo自動注入框架支持ReactiveReference<Service>類型,自動設(shè)置為異步模式,且向目標(biāo)Component中注入的實際類型為ReactiveReferenceImpl<Service>

5、實踐過程

我們在讀懂業(yè)務(wù)流一節(jié)中對視頻對象創(chuàng)建主流程做了說明,然后將主流程分成了5個階段,現(xiàn)在我們使用RxJava代碼來描述這5個階段(注:后續(xù)代碼僅用于演示主流程的實現(xiàn)過程,并不是完整的實現(xiàn)過程)。

輸入?yún)?shù)Args

  • 為保證每個流處理結(jié)點的無狀態(tài)性,Args被設(shè)計為不可變
/**
* 入?yún)b
* 為保證每個流處理結(jié)點的無狀態(tài)性,Args被設(shè)計為不可變
*/
@Builder
@Getter
public class Args {
@NotNull
private final long accountId;

private final String title;
private final String coverImg;
private final String other;

/**
* 視頻內(nèi)容校驗值
*/
private final String signature;
... // other fields
}

過程結(jié)果PassObj

  • 為保證每個流處理結(jié)點的無狀態(tài)性,PassObj被設(shè)計為不可變
/**
* 響應(yīng)式流處理過程對象包裝
* 為保證每個流處理結(jié)點的無狀態(tài)性,PassObj被設(shè)計為不可變
*/
@Builder(toBuilder = true)
@Getter
public class PassObj {
private final Args args;
private final Account account;
private final boolean argsAccepted;
private final VideoInfo videoInfo;
private final VideoInfoExtend videoInfoExtend;
private final PugcInfo pugcInfo;
private final CDNNodeInfo cdnNodeInfo;
... // other fields
}

響應(yīng)式流主干

  • 清晰地表達了業(yè)務(wù)流的執(zhí)行過程
@NotNull
public Single<PassObj> create(@NotNull Args inputArgs) {
return Single.fromCallable(() -> PassObj.builder().args(inputArgs).build())
.flatMap(this::checkStage) // 1 數(shù)據(jù)校驗階段:帳號狀態(tài)/入?yún)⒑弦?guī)/內(nèi)容重復(fù)性
.flatMap(this::createVideoObjectStage)// 2 對象創(chuàng)建階段: 視頻對象存儲
.flatMap(this::createCorrelateObjectStage)// 3 關(guān)聯(lián)數(shù)據(jù)對象創(chuàng)建階段
.flatMap(this::cdnDispatchStage)// 4 上傳準(zhǔn)備:從CDN調(diào)度視頻內(nèi)容存儲結(jié)點\
.doOnSuccess(passObj -> { // 5 結(jié)束
// 視頻對象創(chuàng)建成功
})
.doOnError(e -> log.error(e.getMessage(), e))
;
}

每個階段的詳細實現(xiàn):

數(shù)據(jù)校驗階段:帳號狀態(tài)/入?yún)⒑弦?guī)/內(nèi)容重復(fù)性

/**
* 1 數(shù)據(jù)校驗階段:帳號狀態(tài)/入?yún)⒑弦?guī)/內(nèi)容重復(fù)性
*/
@NotNull
public Single<PassObj> checkStage(@NotNull PassObj passObj) {
return this.checkAndGetAccount(passObj) // 驗證并獲取account對象
.map(account -> passObj.toBuilder().account(account).build())// 帳號狀態(tài)校驗通過, 重建passObj,并裝配account
.flatMap(_passObj ->
this.checkInputArgs(_passObj) // 入?yún)⒑弦?guī)性校驗
.map(argsAccepted -> _passObj.toBuilder().argsAccepted(argsAccepted).build())// 入?yún)⒑弦?guī)性校驗通過, 重建passObj,并裝配校驗結(jié)果
)
.flatMap(_passObj -> // 視頻元數(shù)據(jù)信息校驗
this.videoMetaService.single(s -> s.getVideoMetaBySignature(passObj.getArgs().getSignature()))
.map($videoMeta -> {
if ($videoMeta.isPresent()) {
// 視頻元數(shù)據(jù)信息存在,說明視頻上傳是重復(fù)的,執(zhí)行相關(guān)的異常處理
throw new VideoDuplicatedException();
}
// 視頻沒有重復(fù),可以創(chuàng)建,向后傳遞passObj
return _passObj;
})
)
;
}
  • 驗證并獲取Account對象
/**
* 驗證并獲取Account對象
*/
@NotNull
private Single<Account> checkAndGetAccount(@NotNull PassObj passObj) {
return Maybe.fromCallable(() -> passObj.getArgs().getAccountId())// 取accountId
.filter(accountId -> accountId > 0L) // 取基本有效的值
.flatMap(accountId -> this.accountService.maybe(s -> s.checkAccount(accountId)))// 驗證帳號狀態(tài)
.defaultIfEmpty(false)
.doOnSuccess(accountAccepted -> {
if (!accountAccepted) {
// 帳號狀態(tài)校驗失敗
// 通過明確的異常來處理失敗過程
throw new AccountRejectedException();
}
})
.flatMapMaybe(_accountAccepted -> this.accountService.maybe(s -> s.getAccount(passObj.getArgs().getAccountId())))
.switchIfEmpty(Single.error(AccountObjectNullPointerException::new))
;
}
  • 入?yún)⒑弦?guī)性校驗
/**
* 入?yún)⒑弦?guī)性校驗
*/
private Single<Boolean> checkInputArgs(@NotNull PassObj passObj) { // 入?yún)⒑弦?guī)性校驗可并發(fā)執(zhí)行
return Single.zip(
this.contentAuditService.maybe(s -> s.titleCheck(passObj.getArgs().getTitle())).switchIfEmpty(Single.error(TitleRejectedException::new)), // title
this.contentAuditService.maybe(s -> s.coverImgCheck(passObj.getArgs().getTitle())).switchIfEmpty(Single.error(CoverImgRejectedException::new)), // coverImage
this.contentAuditService.maybe(s -> s.otherCheck(passObj.getArgs().getTitle())).switchIfEmpty(Single.error(OtherRejectedException::new)), // others
(titleAccepted, coverImgAccepted, otherAccepted) -> titleAccepted && coverImgAccepted && otherAccepted
);
}

對象創(chuàng)建階段: 視頻對象存儲

/**
* 2 對象創(chuàng)建階段: 視頻對象存儲
*/
@NotNull
public Single<PassObj> createVideoObjectStage(@NotNull PassObj passObj) {
return Single.fromCallable(() ->
// 包裝VideoInfo對象
VideoInfo.builder()
.userId(passObj.getAccount().getId())
.title(passObj.getArgs().getTitle())
.coverImg(passObj.getArgs().getCoverImg())
.other(passObj.getArgs().getOther())
.build()
)
.flatMapMaybe(videoInfo ->
this.videoInfoService.maybe(s -> s.createVideoInfo(videoInfo)) // 保存視頻對象
.filter(createdVideoInfo -> createdVideoInfo.getId() > 0L) // 驗證保存結(jié)果
)
.onErrorResumeNext(e -> Maybe.error(() -> new VideoInfoCreateException(e))) // 異常處理
.switchIfEmpty(Single.error(VideoInfoCreateException::new)) // 異常處理
.map(createdVideoInfo -> passObj.toBuilder().videoInfo(createdVideoInfo).build()) // 將保存結(jié)果裝配到passObj中
;
}

關(guān)聯(lián)數(shù)據(jù)對象創(chuàng)建階段

/**
* 3 關(guān)聯(lián)數(shù)據(jù)對象創(chuàng)建階段
*/
@NotNull
public Single<PassObj> createCorrelateObjectStage(@NotNull PassObj passObj) {
return Single.zip( // 必要內(nèi)容:自媒體業(yè)務(wù)/視頻對象擴展 可并發(fā)執(zhí)行
Single.fromCallable(() -> // 視頻對象擴展
VideoInfoExtend.builder()
.userId(passObj.getAccount().getId())
.videoInfoId(passObj.getVideoInfo().getId())
.build()
)
.flatMap(videoInfoExtend ->
this.videoInfoService.maybe(s -> s.createVideoInfoExtend(videoInfoExtend))
.onErrorResumeNext(e -> Maybe.error(() -> new VideoInfoExtendCreateException(e))) // 異常處理
.switchIfEmpty(Single.error(VideoInfoExtendCreateException::new)) // 異常處理
),
Single.fromCallable(() -> // 自媒體業(yè)務(wù)
PugcInfo.builder()
.userId(passObj.getAccount().getId())
.videoInfoId(passObj.getVideoInfo().getId())
.build()
)
.flatMap(pugcInfo ->
this.pugcService.maybe(s -> s.createPugcInfo(pugcInfo))
.onErrorResumeNext(e -> Maybe.error(() -> new PugcInfoCreateException(e))) // 異常處理
.switchIfEmpty(Single.error(PugcInfoCreateException::new)) // 異常處理
),
(createdVideoInfoExtend, createdPugcInfo) ->
passObj.toBuilder()
.videoInfoExtend(createdVideoInfoExtend)
.pugcInfo(createdPugcInfo)
.build()
)
.doOnSuccess(updatedPassObj -> { // 可選內(nèi)容:視頻元數(shù)據(jù)/轉(zhuǎn)碼/其它 不關(guān)注結(jié)果,異步執(zhí)行
Maybe.fromCallable(() ->
VideoMeta.builder()
.signature(updatedPassObj.getArgs().getSignature())
.build()
)
.flatMap(videoMeta -> this.videoMetaService.maybe(s -> s.createVideoMeta(videoMeta)))
.doOnError(e -> log.error(e.getMessage(), e))
.subscribe();
this.videoTranscodingService.maybe(s -> s.createVideoKeyFrame(updatedPassObj.getVideoInfo())).subscribe();
this.videoTranscodingService.maybe(s -> s.createVideoResolution(updatedPassObj.getVideoInfo())).subscribe();
this.otherOptionalService.maybe(s -> s.doSomething(updatedPassObj.getVideoInfo())).subscribe();
})
;
}

上傳準(zhǔn)備:從CDN調(diào)度視頻內(nèi)容存儲結(jié)點

/**
* 上傳準(zhǔn)備:從CDN調(diào)度視頻內(nèi)容存儲結(jié)點
*/
@NotNull
public Single<PassObj> cdnDispatchStage(@NotNull PassObj passObj) {
return cdnDispatchService.maybe(CDNDispatchService::dispatch) // 調(diào)度一個CDN上傳節(jié)點
.onErrorResumeNext(e -> Maybe.error(() -> new CDNDispatchException(e))) // 調(diào)用異常
.switchIfEmpty(Single.error(CDNDispatchException::new)) // 沒有取到節(jié)點, 異常處理
.map(cdnNodeInfo ->
passObj.toBuilder()
.cdnNodeInfo(cdnNodeInfo)
.build()
)
;
}

6、總結(jié)

以上內(nèi)容是搜狐視頻服務(wù)端PUGC團隊,首次在核心業(yè)務(wù)接口中應(yīng)用響應(yīng)式異步非阻塞架構(gòu)的思考和實施過程,文中主要闡述了兩個內(nèi)容:

  • Dubbo響應(yīng)式客戶端的實現(xiàn)過程
  • 視頻對象創(chuàng)建接口的業(yè)務(wù)分析和實現(xiàn)過程

通過本次重構(gòu),我們獲得了如下收益:

  • 重新梳理業(yè)務(wù)流,將業(yè)務(wù)流同響應(yīng)式流整合,基于響應(yīng)式編程規(guī)范業(yè)務(wù)流的實現(xiàn)過程
  • 視頻對象創(chuàng)建接口平均響應(yīng)時間從256ms降低到146ms,性能提供顯著

結(jié)束,感謝閱讀!

責(zé)任編輯:未麗燕 來源: 搜狐技術(shù)產(chǎn)品
相關(guān)推薦

2021-02-27 16:08:17

Java異步非阻塞

2015-07-03 10:12:04

編程同步非阻塞

2024-09-05 09:41:57

2012-05-29 10:44:17

WebApp

2019-07-23 11:01:57

Python同步異步

2022-06-22 08:16:29

異步非阻塞框架

2012-02-22 21:15:41

unixIO阻塞

2021-05-25 08:20:37

編程技能開發(fā)

2021-03-04 08:34:55

同步阻塞非阻塞

2023-07-12 08:16:54

JVM工具包Vert.x

2012-10-10 10:00:27

同步異步開發(fā)Java

2024-12-02 00:57:17

非阻塞異步編程

2022-09-01 08:00:00

響應(yīng)式編程集成

2009-08-21 14:25:23

C#異步傳輸字符串

2025-02-17 13:23:34

Python同步阻塞MySQL

2018-03-28 08:52:53

阻塞非阻塞I

2024-12-10 08:09:15

2023-12-06 07:28:47

阻塞IO異步IO

2024-09-23 17:15:28

Python并發(fā)并行

2021-06-04 18:14:15

阻塞非阻塞tcp
點贊
收藏

51CTO技術(shù)棧公眾號

主站蜘蛛池模板: 久久精品国产一区二区 | 欧美一a一片一级一片 | 亚洲国产精品99久久久久久久久 | av资源中文在线 | a级在线观看| 久久亚洲国产精品 | 亚洲 欧美 日韩在线 | 日本高清aⅴ毛片免费 | 国产精品一区在线观看你懂的 | 黑人精品欧美一区二区蜜桃 | 国产精品视频一二三区 | 亚洲一区二区免费看 | 精品视频www | 欧美激情区 | 欧美一区二区在线 | 天堂一区二区三区 | 99免费视频 | 成人在线视频网 | 羞羞的视频在线观看 | 国产在线观看一区二区 | 国家一级黄色片 | 亚洲三级av | 中文字幕在线视频一区二区三区 | 国产精品免费在线 | 日本午夜在线视频 | 一区二区三区国产精品 | 午夜午夜精品一区二区三区文 | 日本一区二区三区在线观看 | 亚洲国产精品成人综合久久久 | www.久草| 久久久av| www.亚洲.com | 99在线免费观看视频 | 久久久久国产精品一区二区 | 国产成人精品免费视频大全最热 | 亚洲女人天堂成人av在线 | 国产一区黄色 | 国产精品久久久久久妇女6080 | 亚洲色图在线观看 | 一区精品视频 | 亚洲一区二区精品视频在线观看 |