成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

DDD與微服務(wù)集成的第一戰(zhàn)役:客戶端重試&服務(wù)端冪等

數(shù)據(jù)庫 其他數(shù)據(jù)庫
一般 redis 不會對數(shù)據(jù)進(jìn)行持久存儲,只能保障在一段時間內(nèi)的冪等性,超出時間后,由于 key 被自動清理,冪等將不再生效。對于業(yè)務(wù)場景不太嚴(yán)格但性能要求較高的場景才可使用,比如為過濾系統(tǒng)中由于 retry 機(jī)制造成的重復(fù)請求。

當(dāng)一個接口從簡單的內(nèi)部調(diào)用升級為遠(yuǎn)程方法調(diào)用(RPC)會面臨很多問題,比如:

  1. 本地事務(wù)失效。在內(nèi)部調(diào)用時,多個方法通常在同一事務(wù)中執(zhí)行,可以使用本地?cái)?shù)據(jù)庫事務(wù)來確保數(shù)據(jù)的一致性。但是,在遠(yuǎn)程方法調(diào)用中,由于涉及到網(wǎng)絡(luò)通信,事務(wù)的邊界會擴(kuò)展到多個系統(tǒng)之間,因此無法直接使用本地事務(wù)。如果遠(yuǎn)程方法調(diào)用出現(xiàn)異常,可能會導(dǎo)致事務(wù)提交失敗,從而產(chǎn)生數(shù)據(jù)不一致;
  2. 第三狀態(tài)影響。網(wǎng)絡(luò)不確定性可能導(dǎo)致遠(yuǎn)程調(diào)用無法成功獲得結(jié)果,例如網(wǎng)絡(luò)連接中斷、網(wǎng)絡(luò)超時等。在這種情況下,客戶端無法獲得期望的結(jié)果,調(diào)用會以網(wǎng)絡(luò)錯誤或超時的方式結(jié)束;
  3. 服務(wù)版本兼容性問題。如果服務(wù)接口發(fā)生變化,客戶端和服務(wù)端的版本不匹配可能導(dǎo)致調(diào)用失敗;
  4. 性能問題、可用性問題、安全問題等;

由于涉及的問題比較多,這里重點(diǎn)分析和解決 RPC 調(diào)用時的第三狀態(tài)問題。

1. 什么是第三狀態(tài)

當(dāng)一個客戶端發(fā)起一個RPC請求時,服務(wù)端可能會返回不同的狀態(tài),包括:

  1. 成功:服務(wù)端成功完成了客戶端發(fā)送的請求,并返回對應(yīng)的響應(yīng)結(jié)果;
  2. 失敗:服務(wù)端無法成功處理客戶端發(fā)送的請求,并返回錯誤信息或異常;
  3. 超時:調(diào)用方無法在規(guī)定時間收到服務(wù)端的處理結(jié)果,所以無法知道請求的最終處理結(jié)果;

如下圖所示:

圖片圖片

一般情況下,調(diào)用方在規(guī)定時間收到被調(diào)用方的返回結(jié)果,能夠非常明確的知道處理結(jié)果是成功還是失敗。

當(dāng)網(wǎng)絡(luò)或被調(diào)用方出問題,就會觸發(fā)超時,比如下圖所示:

圖片圖片

如果被調(diào)用方異常或者網(wǎng)絡(luò)發(fā)生阻塞,調(diào)用方發(fā)送的 Request 請求沒有被正常處理,那調(diào)用方只能在等待若干時間后拋出異常進(jìn)行流程中斷。

又或者如下圖所示:

圖片圖片

被調(diào)用方處理時間過長或者網(wǎng)絡(luò)發(fā)生阻塞,調(diào)用方無法在規(guī)定時間獲得最終結(jié)果,也只能觸發(fā)超時中斷。

可見,如果發(fā)生 超時 情況,對于處理結(jié)果就處于未知狀態(tài),這就是所謂的“第三狀態(tài)”:

  1. 被調(diào)用方成功接收到請求并完成了處理;
  2. 被調(diào)用方完全沒有接收到請求;

在出現(xiàn)第三狀態(tài)時,在不做任何處理前,根本就無法獲取最終的處理結(jié)果。在該場景下,最通用的解決方案便是 客戶端重試 + 服務(wù)端冪等。

  1. 客戶端重試。指的是在RPC調(diào)用出現(xiàn)超時的情況下,客戶端自動重新發(fā)送相同的請求。然而,在客戶端重試時需要注意避免重復(fù)執(zhí)行有副作用的操作,比如避免重復(fù)插入數(shù)據(jù);
  2. 服務(wù)端冪等。指的是對于相同的輸入請求,服務(wù)端能夠產(chǎn)生相同的結(jié)果,而且不會對系統(tǒng)狀態(tài)造成影響。冪等性是為了應(yīng)對由于重試等原因?qū)е轮貜?fù)執(zhí)行的副作用;

通過客戶端重試和服務(wù)端冪等的方式,可以增加RPC調(diào)用的可靠性和數(shù)據(jù)一致性。客戶端重試可以處理網(wǎng)絡(luò)不穩(wěn)定、服務(wù)端故障等導(dǎo)致的失敗情況,而服務(wù)端冪等性能保證相同的請求不會重復(fù)執(zhí)行或引起數(shù)據(jù)不一致的問題。這兩個技術(shù)結(jié)合使用,可以提高分布式系統(tǒng)中RPC調(diào)用的健壯性和可靠性。

2. 客戶端重試

客戶端重試指的是在RPC調(diào)用失敗的情況下,客戶端自動重新發(fā)送相同的請求。客戶端可以設(shè)置重試的次數(shù)和時間間隔,直到得到預(yù)期的成功響應(yīng)或達(dá)到最大重試次數(shù)。客戶端重試機(jī)制可以彌補(bǔ)網(wǎng)絡(luò)不穩(wěn)定性或服務(wù)端異常導(dǎo)致的調(diào)用失敗。然而,在客戶端重試時需要注意避免重復(fù)執(zhí)行有副作用的操作,比如避免重復(fù)插入數(shù)據(jù)。此外,還需要合理設(shè)置重試策略,避免因頻繁的重試導(dǎo)致網(wǎng)絡(luò)負(fù)荷增加或服務(wù)端壓力過大。

如下圖所示:

圖片圖片

  1. 第一次獲取商品信息時,由于網(wǎng)絡(luò)異常導(dǎo)致請求超時;
  2. 網(wǎng)絡(luò)異常觸發(fā) retry 機(jī)制,重新發(fā)起新的請求;
  3. 新請求成功發(fā)送至商品服務(wù)并獲取信息,從而使得流程從異常中恢復(fù),最終正常執(zhí)行完成;

由此可見,重試的工作原理非常簡單。

Spring Retry是Spring Framework提供的一個模塊,用于簡化和增強(qiáng)應(yīng)用程序中的重試操作。它提供了一種聲明式的方式來處理方法調(diào)用的重試,以應(yīng)對在分布式系統(tǒng)或有限資源環(huán)境下可能出現(xiàn)的失敗情況。

Spring Retry模塊的主要特性包括:

  1. 聲明式注解:通過使用注解(例如@Retryable、@Recover等),可以將重試行為與方法關(guān)聯(lián)起來。通過在方法上添加@Retryable注解,可以指定需要進(jìn)行重試的異常類型,最大重試次數(shù),重試間隔等信息;
  2. 容錯策略:Spring Retry提供了多種容錯策略,包括簡單重試、指數(shù)退避重試、固定時間間隔重試等,開發(fā)者可以根據(jù)具體需求選擇合適的策略;
  3. 回退機(jī)制:如果重試次數(shù)超過限定值仍然失敗,Spring Retry可以通過@Recover注解來指定一個回退方法,用于執(zhí)行備選邏輯或返回默認(rèn)值;

要使用Spring Retry,需要在項(xiàng)目中引入相應(yīng)的依賴:

<dependency>
    <groupId>org.springframework.retry</groupId>
    <artifactId>spring-retry</artifactId>
</dependency>

2.1. Retryable

重試行為主要是由 @Retryable 注解完成,通過為目標(biāo)方法添加注解,將其納入重試管理,Spring Retry將負(fù)責(zé)在出現(xiàn)失敗情況時自動進(jìn)行重試。

以下是 @Retryable 注解代碼示例:

@Service
public class MyService {

    @Retryable(value = {SomeException.class}, maxAttempts = 3)
    public void doSomething() {
        // 需要進(jìn)行重試的業(yè)務(wù)邏輯
    }
}

上述示例中的 doSomething() 方法標(biāo)記為需要進(jìn)行重試,在捕獲到 SomeException 異常時觸發(fā)重試,最多重試3次。

下表列出了 @Retryable 注解的所有屬性及其說明:

屬性名

說明

value

定義重試的異常類型。默認(rèn)情況下,將捕獲所有Throwable類型的異常進(jìn)行重試。可以通過指定一個或多個異常類來限定重試的異常類型,例如@Retryable(value = {SQLException.class, TimeoutException.class})

maxAttempts

定義最大重試次數(shù),默認(rèn)為3次。當(dāng)達(dá)到最大重試次數(shù)后,如果仍然失敗,則不再進(jìn)行重試。

backoff

定義重試間隔策略。可以使用@Backoff注解來配置重試間隔的退避策略,包括delay(初始延遲時間,默認(rèn)為0)和maxDelay(最大延遲時間,默認(rèn)為無限制)。例如,@Backoff(delay = 1000, maxDelay = 5000)表示初始延遲為1秒,最大延遲為5秒。

delayExpression

定義重試間隔的SpEL表達(dá)式。可以使用#lastException表示上一次重試時捕獲的異常對象。

multiplier

定義指數(shù)退避策略的倍數(shù),默認(rèn)為1。在使用指數(shù)退避策略時,每次重試的間隔時間將是delay * (multiplier ^ (重試次數(shù)-1))

random

定義是否啟用隨機(jī)化延遲。默認(rèn)為false,即不啟用隨機(jī)化延遲。當(dāng)設(shè)置為true時,重試間隔將隨機(jī)波動一定的范圍。

exceptionExpression

定義一個SpEL表達(dá)式,用于決定是否進(jìn)行重試。該表達(dá)式可以使用#result表示目標(biāo)方法的返回值,#lastException表示上一次重試時捕獲的異常對象。如果表達(dá)式的結(jié)果為true,則進(jìn)行重試;否則,不進(jìn)行重試。

include

定義需要包含在重試行為中的異常類型,默認(rèn)為空數(shù)組,表示包括所有異常類型。可以指定一個或多個異常類來明確指定需要處理的異常類型。例如,@Retryable(include = {IOException.class, TimeoutException.class})表示僅包括IOExceptionTimeoutException異常。

exclude

定義需要排除在重試行為之外的異常類型,默認(rèn)為空數(shù)組,表示排除所有異常類型。可以指定一個或多個異常類來明確排除某些異常類型。例如,@Retryable(exclude = {NullPointerException.class})表示排除NullPointerException異常。

這些屬性可以根據(jù)具體的需求進(jìn)行配置,以實(shí)現(xiàn)靈活的重試策略。

2.2. Recover

Spring Retry 的 fallback 機(jī)制是在重試失敗后,執(zhí)行備選邏輯的一種處理方式。通過使用 @Recover 注解,在重試失敗后,可以調(diào)用備選邏輯方法來完成錯誤處理、數(shù)據(jù)清理等操作。

具體來說,當(dāng) @Retryable 注解標(biāo)記的方法達(dá)到最大重試次數(shù)或者拋出了無法重試的異常時,Spring Retry將會嘗試查找與該方法參數(shù)類型相同的方法,并在找到時調(diào)用它。

@Retryable(maxAttempts = 3)
public void someMethod(String arg1, int arg2) {
    // 重試業(yè)務(wù)邏輯
}

@Recover
public void recover(String arg1, int arg2) {
    // 備選邏輯
}

當(dāng) someMethod() 方法達(dá)到最大重試次數(shù)或拋出無法重試的異常時,Spring Retry 將會查找并調(diào)用 recover() 方法,并傳遞相同的方法參數(shù)。在 recover() 方法中,應(yīng)該針對重試失敗的情況編寫備選邏輯,例如記錄日志、發(fā)送通知等操作。

需要注意的是,@Recover 注解必須放置在其對應(yīng)的重試方法所屬的類中,并且方法參數(shù)類型必須與重試方法一致。如果有多個重試方法,每個重試方法都可以對應(yīng)一個@Recover方法,以滿足不同的備選邏輯需求。

在使用fallback機(jī)制時,應(yīng)該仔細(xì)考慮備選邏輯的實(shí)現(xiàn)方式,確保其能夠正確處理重試失敗的情況,并對數(shù)據(jù)的一致性和完整性產(chǎn)生最小的影響。

2.3. 不同場景下的 Retry 和 Fallback

@Retryable 和 @Recover 都是添加在類方法上的注解,不管是什么場景下的請求只會走固定流程。這樣的設(shè)計(jì)在復(fù)雜場景下是否夠用?

之前,我們看到通過 Retry 恢復(fù)網(wǎng)絡(luò)抖動的場景;接下來讓我們看另一個場景,如下圖所示:

圖片圖片

image

商品服務(wù)流量激增,導(dǎo)致 DB CPU 飆升,出現(xiàn)大量的慢 SQL,這時觸發(fā)了系統(tǒng)的 Retry 會是怎樣的結(jié)果?

  1. 在獲取商品失敗后,系統(tǒng)自動觸發(fā) Retry 機(jī)制;
  2. 由于是商品服務(wù)本身出了問題,第二次請求仍舊失敗;
  3. 服務(wù)又觸發(fā)了第三次請求,仍未獲取結(jié)果;
  4. 達(dá)到最大重試次數(shù),仍舊無法獲取商品,只能通過異常中斷用戶請求;

通過 Retry 機(jī)制未能將流程從異常中恢復(fù)過來,反而給下游的 商品服務(wù) 造成了巨大傷害。

  1. 商品服務(wù)壓力大,響應(yīng)時間長;
  2. 上游系統(tǒng)由于超時觸發(fā)自動重試;
  3. 自動重試增大了對商品服務(wù)的調(diào)用;
  4. 商品服務(wù)請求量更大,更難以從故障中恢復(fù);

這就是常說的“讀放大”,假設(shè)用戶驗(yàn)證是否能夠購買請求的請求量為 n,那極端情況下 商品服務(wù)的請求量為 3n (其中 2n 是由 Retry 機(jī)制造成)

此時,最優(yōu)解不是進(jìn)行 Retry 而是直接走 Fallback,給下游服務(wù)一定的恢復(fù)機(jī)會。

同樣是對商品服務(wù)接口(同一個接口)的調(diào)用,在不同的場景需要使用不同的策略用以適配不同的業(yè)務(wù)流程,通常情況下:

  1. Command 場景優(yōu)先使用 Retry 策略

這種流量即為重要,最好能保障流程的完整性

通常寫流量比較小,小范圍 Retry 不會對下游系統(tǒng)造成巨大影響

  1. Query 場景優(yōu)選使用 Fallback 策略
  • 大多數(shù)展示場景,哪怕部分信息沒有獲取到對整體的影響也比較小

  • 通常讀場景流量較高,Retry 對下游系統(tǒng)的傷害不容忽視

面對不同的業(yè)務(wù)場景,你會怎么做呢?準(zhǔn)備兩組不同的方法根據(jù)業(yè)務(wù)場景分別調(diào)用?

2.4. SmartFailure

SmartFailure 不是為不同的場景使用不同的方法,而是根據(jù)請求上下文信息,動態(tài)的走 Retry 或 Fallback,從而更好的適應(yīng)復(fù)雜的業(yè)務(wù)場景。

整體設(shè)計(jì)如下:

圖片圖片

整體流程如下:

  1. 讀取配置信息,將請求類型(ActionType)綁定到線程上下文;
  2. 然后執(zhí)行正常業(yè)務(wù)邏輯
  3. 當(dāng)調(diào)用 @SmartFault 注解的方法時,會被 SmartFaultMethodInterceptor 攔截器攔截

攔截器通過 ActionTypeProvider 獲取當(dāng)前的 ActionType;

根據(jù) ActionType 對請求進(jìn)行路由;

如果是 COMMAND 操作,將使用 RetryTemplate 執(zhí)行請求,在發(fā)生異常時,通過重試配置進(jìn)行請求重發(fā),從而最大限度的獲得遠(yuǎn)程結(jié)果;

如果是 QUERY 操作,將使用 FallbackTemplate(重試次數(shù)為0的 RetryTemplate)執(zhí)行請求,當(dāng)發(fā)生異常時,調(diào)用 fallback 方法,執(zhí)行配置的 recover 方法,直接使用返回結(jié)果;

  1. 獲取遠(yuǎn)程結(jié)果后,執(zhí)行后續(xù)的業(yè)務(wù)邏輯;
  2. 最后,ActionAspect 將 ActionType 從線程上下文中移除;

使用前需添加 lego 依賴,具體如下:

<dependency>
    <groupId>com.geekhalo.lego</groupId>
    <artifactId>lego-starter</artifactId>
    <version>最新版本</version>
</dependency>
2.4.1. ActionTypeProvider

首先,需要準(zhǔn)備一個 ActionTypeProvider 用以提供上下文信息。ActionTypeProvider 接口定義如下:

public interface ActionTypeProvider {
    ActionType get();
}

public enum ActionType {
    COMMAND, QUERY
}

通常情況下,我們使用 ThreadLocal 組件將 ActionType 存儲于線程上下文,在使用時從上下中獲取相關(guān)信息。

public class ActionContext {
    private static final ThreadLocal<ActionType> ACTION_TYPE_THREAD_LOCAL = new ThreadLocal<>();

    public static void set(ActionType actionType){
        ACTION_TYPE_THREAD_LOCAL.set(actionType);
    }

    public static ActionType get(){
        return ACTION_TYPE_THREAD_LOCAL.get();
    }

    public static void clear(){
        ACTION_TYPE_THREAD_LOCAL.remove();
    }
}

有了上下文之后,ActionBasedActionTypeProvider 直接從 Context 中獲取 ActionType 具體如下:

@Component
public class ActionBasedActionTypeProvider implements ActionTypeProvider {
    @Override
    public ActionType get() {
        return ActionContext.get();
    }
}

如何對請求進(jìn)行標(biāo)記?如何對 ActionType 進(jìn)行管理(包括信息綁定和信息清理)?最常用的方式便是:

  1. 提供一個注解,在方法上添加注解用于對 ActionType 的配置;
  2. 提供一個攔截器,對方法調(diào)用進(jìn)行攔截:

方法調(diào)用前,從注解中獲取配置信息并綁定到上下文;

執(zhí)行具體的業(yè)務(wù)方法;

方法調(diào)用后,主動清理上下文信息;

核心實(shí)現(xiàn)為:

@Target({ ElementType.METHOD, ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
public @interface Action {
    ActionType type();
}

@Aspect
@Component
@Order(Integer.MIN_VALUE)
public class ActionAspect {
    @Pointcut("@annotation(com.geekhalo.lego.faultrecovery.smart.Action)")
    public void pointcut() {
    }

    @Around(value = "pointcut()")
    public Object action(ProceedingJoinPoint joinPoint) throws Throwable {
        MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
        Action annotation = methodSignature.getMethod().getAnnotation(Action.class);
        ActionContext.set(annotation.type());
        try {
            return joinPoint.proceed();
        }finally {
            ActionContext.clear();
        }
    }
}

在這些組件的幫助下,我們只需在方法上基于 @Action 注解進(jìn)行標(biāo)記,便能夠?qū)?ActionType 綁定到上下文。

2.4.2. @SmartFault

在完成 ActionType 綁定到上下文之后,接下來要做的便是對 遠(yuǎn)程接口 進(jìn)行配置。遠(yuǎn)程接口的配置工作主要由 @SmartFault 來完成。其核心配置項(xiàng)包括:

配置項(xiàng)

含義

默認(rèn)配置

recover

fallback 方法名稱


maxRetry

最大重試次數(shù)

3

include

觸發(fā)重試的異常類型


exclude

不需要重新的異常類型


測試 Demo 如下:

@Service
@Slf4j
@Getter
public class RetryService3 {
    private int count = 0;

    private int retryCount = 0;
    private int fallbackCount = 0;
    private int recoverCount = 0;

    public void clean(){
        this.retryCount = 0;
        this.fallbackCount = 0;
        this.recoverCount = 0;
    }

    /**
    * Command 請求,啟動重試機(jī)制
    */
    @Action(type = ActionType.COMMAND)
    @SmartFault(recover = "recover")
    public Long retry(Long input) throws Throwable{
        this.retryCount ++;
        return doSomething(input);
    }

    /**
    * Query 請求,啟動Fallback機(jī)制
    */
    @Action(type = ActionType.QUERY)
    @SmartFault(recover = "recover")
    public Long fallback(Long input) throws Throwable{
        this.fallbackCount ++;
        return doSomething(input);
    }

    @Recover
    public Long recover(Throwable e, Long input){
        this.recoverCount ++;
        log.info("recover-{}", input);
        return input;
    }

    private Long doSomething(Long input) {
        // 偶數(shù)拋出異常
        if (count ++ % 2 == 0){
            log.info("Error-{}", input);
            throw new RuntimeException();
        }
        log.info("Success-{}", input);
        return input;
    }
}
2.4.3. 測試

最后,對代碼進(jìn)行簡單測試:

@SpringBootTest(classes = DemoApplication.class)
public class RetryService3Test {
    @Autowired
    private RetryService3 retryService;

    @BeforeEach
    public void setup(){
        retryService.clean();
    }

    @Test
    public void retry() throws Throwable{
        for (int i = 0; i < 100; i++){
            retryService.retry(i + 0L);
        }

        Assertions.assertTrue(retryService.getRetryCount() > 0);
        Assertions.assertTrue(retryService.getRecoverCount() == 0);
        Assertions.assertTrue(retryService.getFallbackCount() == 0);
    }

    @Test
    public void fallback() throws Throwable{
        for (int i = 0; i < 100; i++){
            retryService.fallback(i + 0L);
        }

        Assertions.assertTrue(retryService.getRetryCount() == 0);
        Assertions.assertTrue(retryService.getRecoverCount() > 0);
        Assertions.assertTrue(retryService.getFallbackCount() > 0);
    }
}

運(yùn)行 retry 測試,日志如下:

[main] c.g.l.c.f.smart.SmartFaultExecutor       : action type is COMMAND
[main] c.g.l.faultrecovery.smart.RetryService3  : Error-0
[main] c.g.l.c.f.smart.SmartFaultExecutor       : Retry method public java.lang.Long com.geekhalo.lego.faultrecovery.smart.RetryService3.retry(java.lang.Long) throws java.lang.Throwable use [0]
[main] c.g.l.faultrecovery.smart.RetryService3  : Success-0

可見,當(dāng) action type 為 COMMAND 時:

  • 第一次調(diào)用時,觸發(fā)異常,打印:Error-0
  • 此時 SmartFaultExecutor 主動進(jìn)行重試,打印:Retry method xxxx
  • 方法重試成功,RetryService3 打印:Success-0

方法主動進(jìn)行重試,流程從異常中恢復(fù),處理過程和效果符合預(yù)期。

運(yùn)行 fallback 測試,日志如下:

[main] c.g.l.c.f.smart.SmartFaultExecutor       : action type is QUERY
[main] c.g.l.faultrecovery.smart.RetryService3  : Error-0
[main] c.g.l.c.f.smart.SmartFaultExecutor       : recover From ERROR for method ReflectiveMethodInvocation: public java.lang.Long com.geekhalo.lego.faultrecovery.smart.RetryService3.fallback(java.lang.Long) throws java.lang.Throwable; target is of class [com.geekhalo.lego.faultrecovery.smart.RetryService3]
[main] c.g.l.faultrecovery.smart.RetryService3  : recover-0

可見,當(dāng) action type 為 QUERY 時:

  • 第一次調(diào)用時,觸發(fā)異常,打印:Error-0
  • SmartFaultExecutor 執(zhí)行 Fallback 策略,打印:recover From ERROR for method xxxx
  • 調(diào)用RetryService3的 recover 方法,獲取最終返回值。RetryService3 打印:recover-0

異常后自動執(zhí)行 fallback,將流程從異常中恢復(fù)過來,處理過程和效果符合預(yù)期。

3. 服務(wù)端冪等

服務(wù)端冪等指的是對于相同的輸入請求,服務(wù)端能夠產(chǎn)生相同的結(jié)果,而且不會對系統(tǒng)狀態(tài)造成影響。冪等性是為了防止由于重試等原因?qū)е碌闹貜?fù)執(zhí)行帶來的副作用。在RPC中,服務(wù)端需要設(shè)計(jì)和實(shí)現(xiàn)冪等性的方法,確保多次接收到相同的請求時能正確處理,而不會重復(fù)執(zhí)行對系統(tǒng)狀態(tài)有改變的操作。

3.1. 冪等與冪等接口

冪等是指對同一個操作的多次執(zhí)行所產(chǎn)生的影響與一次執(zhí)行的影響相同,即不會因?yàn)槎啻螆?zhí)行而產(chǎn)生額外的副作用。在分布式系統(tǒng)中,由于網(wǎng)絡(luò)等各種因素的存在,可能會導(dǎo)致對同一個操作進(jìn)行多次執(zhí)行,此時如果該操作是冪等的,那么就可以避免數(shù)據(jù)沖突和重復(fù)處理問題。

冪等接口是指對外提供的接口,它們所提供的業(yè)務(wù)操作具有冪等性。也就是說,在客戶端多次調(diào)用該接口時,每次調(diào)用都會產(chǎn)生相同的結(jié)果,并且不會產(chǎn)生額外的副作用。

例如,銀行轉(zhuǎn)賬操作就應(yīng)該是一個冪等接口。假設(shè)客戶端已經(jīng)成功地向銀行發(fā)起了一次轉(zhuǎn)賬請求,但由于網(wǎng)絡(luò)不穩(wěn)定等原因,導(dǎo)致該請求的響應(yīng)沒有及時返回給客戶端。此時客戶端可能會誤以為轉(zhuǎn)賬請求失敗,進(jìn)而再次發(fā)送同樣的轉(zhuǎn)賬請求。如果銀行的轉(zhuǎn)賬接口是冪等的,那么無論客戶端發(fā)送多少次轉(zhuǎn)賬請求,最終的結(jié)果都應(yīng)該是相同的——只有一次轉(zhuǎn)賬操作會被執(zhí)行,其他的請求都會被忽略。

3.2. 天然冪接口

有些接口天然就具備冪等性。這些接口通常是對資源的查詢操作,不會對資源進(jìn)行修改。以下是一些天然具備冪等性的場景:

  • 查詢接口:對于只用于查詢數(shù)據(jù)的接口,例如獲取用戶信息、獲取訂單詳情等,多次調(diào)用不會對數(shù)據(jù)產(chǎn)生影響;
  • 獲取接口:對于獲取資源的接口,例如獲取圖片、獲取文件等,無論調(diào)用多少次,得到的都是相同的資源副本;
  • 計(jì)算接口:對于純計(jì)算的接口,例如加法、乘法等數(shù)學(xué)運(yùn)算接口,多次調(diào)用得到的結(jié)果是相同的;
  • 驗(yàn)證接口:對于驗(yàn)證某個條件的接口,例如檢查用戶名是否已存在、驗(yàn)證手機(jī)號是否有效等,多次調(diào)用得到的驗(yàn)證結(jié)果都是一致的;

這些接口在設(shè)計(jì)上更容易滿足冪等性的要求,因?yàn)樗鼈兊牟僮鞅旧聿]有產(chǎn)生副作用,不會對數(shù)據(jù)進(jìn)行修改。在分布式系統(tǒng)中,可以安全地多次調(diào)用這些接口,而不會引發(fā)數(shù)據(jù)沖突。

3.3. 非冪等接口

非冪等接口是指對資源進(jìn)行修改、狀態(tài)進(jìn)行變更而產(chǎn)生副作用的接口,多次調(diào)用可能會導(dǎo)致不同的結(jié)果。以下是一些常見的非冪等接口:

  • 創(chuàng)建資源接口:例如創(chuàng)建用戶、創(chuàng)建訂單等操作,多次調(diào)用會生成多個相同的資源實(shí)例,每次調(diào)用都會產(chǎn)生不同的結(jié)果;
  • 修改資源接口:例如更新用戶信息、修改文章內(nèi)容等操作,多次調(diào)用會對同一個資源進(jìn)行多次修改,每次修改都會產(chǎn)生不同的結(jié)果;
  • 刪除資源接口:例如刪除文件、刪除訂單等操作,多次調(diào)用會多次刪除同一個資源,每次調(diào)用都會產(chǎn)生不同的結(jié)果;

這些非冪等接口在設(shè)計(jì)上需要特別注意,并采取合適的措施來確保數(shù)據(jù)的一致性和操作的正確性。

3.4. 重復(fù)請求的處理方式

當(dāng)系統(tǒng)檢測出當(dāng)前請求是重復(fù)請求時,通常會有兩種處理策略:

  • 直接拋出冪等異常,用以說明該請求為重復(fù)請求;
  • 直接返回上次請求一致的返回結(jié)果;

這兩種方案在實(shí)現(xiàn)上差異不大,但在客戶端使用中差異巨大。

  • 如果是異常方案,客戶端在調(diào)用冪等接口時需要對異常進(jìn)行捕獲,然后通過其他 API 獲取上次請求的處理結(jié)果,根據(jù)結(jié)果不同來決定接下來的處理流程;
  • 如果是直接返回上次請求的處理結(jié)果,則客戶端不需要做額外的處理,直接使用重試機(jī)制對請求進(jìn)行重新發(fā)送即可,獲取結(jié)果后自然進(jìn)入到下面的流程;

所以,在冪等設(shè)計(jì)中,優(yōu)先使用 “直接返回上次請求結(jié)果” 方案。

綜上分析,冪等可以做為一種通用能力與業(yè)務(wù)處理邏輯進(jìn)行充分解構(gòu),這就是組件封裝的前提。

3.5. 冪等組件

我們可以將冪等封裝成一種能力,然后在需要冪等保護(hù)的業(yè)務(wù)方法上進(jìn)行配置,從而實(shí)現(xiàn)冪等能力與業(yè)務(wù)方法的徹底解耦。

整體設(shè)計(jì)如下圖所示:

圖片圖片

整體設(shè)計(jì)比較簡單,運(yùn)行流程如下:

  • IdempotentInterceptor 會對 @Idempotent 注解標(biāo)記的方法進(jìn)行攔截;
  • 當(dāng)方法第一次被調(diào)用時,會讀取 @Idempotent 注解上的配置信息,使用 IdempotentExecutorFactoris 為每個方法創(chuàng)建一個 IdempotentExecutor 實(shí)例;
  • 在方法調(diào)用時,將請求直接路由到 IdempotentExecutor 實(shí)例,由 IdempotentExecutor 完成核心流程;
  • 其中,IdempotentExecutorFactories 擁有多個 IdempotentExecutorFactory 實(shí)例,并根據(jù) @Idempotent 上配置的 executorFactory 屬性使用對應(yīng)的實(shí)例完成創(chuàng)建工作;

從設(shè)計(jì)上看,系統(tǒng)中可以同時配置多個 IdempotentExecutorFactory,然后根據(jù)不同的業(yè)務(wù)場景設(shè)置不同的 executorFactory。

冪等處理的核心流程如下:

圖片圖片

IdempotentExecutor 處理核心流程如下:

  • 通過 SpringEL 表達(dá)式從入?yún)⒅刑崛?unique key 信息;
  • 根據(jù) group 和 unique key 從 ExecutionRecordRepository 中讀取執(zhí)行記錄 ExecutionRecord;
  • 如果 ExecutionRecord 為已完成狀態(tài),則根據(jù)配置直接返回 ExecutionRecord 的執(zhí)行結(jié)果 或者 直接拋出 RepeatedSubmitException 異常;
  • 如果 ExecutionRecord 為執(zhí)行中,則出現(xiàn)并發(fā)問題,直接拋出 ConcurrentRequestException 異常;
  • 如果 ExecutionRecord 為未執(zhí)行,先執(zhí)行方法獲取返回值,然后使用 ExecutionRecordRepository 對 ExecutionRecord 進(jìn)行更新,然后返回執(zhí)行結(jié)果;

使用前需要引入 lego starter,在 maven pom 中添加如下信息:

<groupId>com.geekhalo.lego</groupId>
<artifactId>lego-starter</artifactId>
<version>最新版本</version>
3.5.1. 配置 dbExecutorFactory

以 JpaRepository 為例實(shí)現(xiàn)對 IdempotentExecutorFactory 的配置,具體如下:

@Configuration
public class IdempotentConfiguration extends IdempotentConfigurationSupport {

    @Bean("dbExecutorFactory")
    public IdempotentExecutorFactory dbExecutorFactory(JpaBasedExecutionRecordRepository recordRepository){
        return createExecutorFactory(recordRepository);
    }
}

其中,IdempotentConfigurationSupport 已經(jīng)提供 idempotent 所需的很多 Bean,同時提供 createExecutorFactory(repository) 方法,用以完成 IdempotentExecutorFactory 的創(chuàng)建。

使用 Jpa 需要調(diào)整 EnableJpaRepositories 相關(guān)配置,具體如下:

@Configuration
@EnableJpaRepositories(basePackages = {
        "com.geekhalo.lego.core.idempotent.support.repository"
}, repositoryFactoryBeanClass = JpaBasedQueryObjectRepositoryFactoryBean.class)
public class SpringDataJpaConfiguration {
}

其中,com.geekhalo.lego.core.idempotent.support.repository 為固定包名,指向 Jpa 默認(rèn)實(shí)現(xiàn) JpaBasedExecutionRecordRepository,Spring Data Jpa 會自動生成實(shí)現(xiàn)的代理對象。

最后,在數(shù)據(jù)庫中增加 冪等所需表,sql 如下:

CREATE TABLE `idempotent_execution_record` (
   `id` bigint(20) NOT NULL AUTO_INCREMENT,
   `type` int(11) NOT NULL,
   `unique_key` varchar(64) NOT NULL,
   `status` int(11) NOT NULL,
   `result` varchar(1024) DEFAULT NULL,
   `create_date` datetime DEFAULT NULL,
   `update_date` datetime DEFAULT NULL,
   PRIMARY KEY (`id`),
   UNIQUE KEY `unq_type_key` (`type`,`unique_key`)
) ENGINE=InnoDB;

至此,便完成了基本配置。

【注】關(guān)于 Spring data jpa 配置,可以自行到網(wǎng)上進(jìn)行檢索。

3.5.2. 冪等保護(hù)示例

在方法上增加 @Idempotent 注解便可以使其具備冪等保護(hù),示例如下:

@Idempotent(executorFactory = "dbExecutorFactory", group = 1, keyEl = "#key",
        handleType = IdempotentHandleType.RESULT)
@Transactional
public Long putForResult(String key, Long data){
    return put(key, data);
}

其中 @Idempotent 為核心配置,詳細(xì)信息如下:

  • executorFactory 為 IdempotentExecutorFactory,及在 IdempotentConfiguration 中配置的bean,默認(rèn)為 DEFAULT_EXECUTOR_FACTORY
  • group 為組信息,用于區(qū)分不同的業(yè)務(wù)場景,同一業(yè)務(wù)場景使用相同的配置;
  • keyEl 為提取冪等鍵所用的 SpringEl 表達(dá)式,#key 說明入?yún)⒌?key 將作為冪等鍵,group + key 為一個完整的冪等鍵,唯一識別一次請求;
  • handleType 是處理類型,及重復(fù)提交時如何處理請求

RESULT,直接返回上次的執(zhí)行結(jié)果

ERROR,直接拋出 RepeatedSubmitException 異常

編寫簡單的測試用例如下:

@Test
void putForResult() {
    BaseIdempotentService idempotentService = getIdempotentService();
    String key = String.valueOf(RandomUtils.nextLong());
    Long value = RandomUtils.nextLong();

    {   // 第一次操作,返回值和最終結(jié)果符合預(yù)期
        Long result = idempotentService.putForResult(key, value);
        Assertions.assertEquals(value, result);
        Assertions.assertEquals(value, idempotentService.getValue(key));
    }

    {   // 第二次操作,返回值和最終結(jié)果 與第一次一致(直接獲取返回值,沒有執(zhí)行業(yè)務(wù)邏輯)
        Long valueNew = RandomUtils.nextLong();
        Long result = idempotentService.putForResult(key, valueNew);

        Assertions.assertEquals(value, result);
        Assertions.assertEquals(value, idempotentService.getValue(key));
    }
}

運(yùn)行測試用例,測試通過,可得出如下結(jié)論:

  • 第一次操作,與正常方法一致,成功返回結(jié)果值;
  • 第二次操作,邏輯方法未執(zhí)行,直接返回第一次的運(yùn)行結(jié)果;

這是最常見的一種工作模式,除直接返回上次執(zhí)行結(jié)果外,當(dāng)發(fā)生重復(fù)提交時也可以拋出異常中斷流程,只需將 handleType 設(shè)置為 ERROR 即可,具體如下:

@Idempotent(executorFactory = "dbExecutorFactory", group = 1, keyEl = "#key",
    handleType = IdempotentHandleType.ERROR)
@Transactional
public Long putForError(String key, Long data){
    return put(key, data);
}

編寫測試用例,具體如下:

@Test
void putForError() {
    BaseIdempotentService idempotentService = getIdempotentService();
    String key = String.valueOf(RandomUtils.nextLong());
    Long value = RandomUtils.nextLong();

    { // 第一次操作,返回值和最終結(jié)果符合預(yù)期
        Long result = idempotentService.putForError(key, value);
        Assertions.assertEquals(value, result);
        Assertions.assertEquals(value, idempotentService.getValue(key));
    }

    { // 第二次操作,直接拋出異常,結(jié)果與第一次一致
        Assertions.assertThrows(RepeatedSubmitException.class, () ->{
            Long valueNew = RandomUtils.nextLong();
            idempotentService.putForError(key, valueNew);
        });
        Assertions.assertEquals(value, idempotentService.getValue(key));
    }
}

運(yùn)行測試用例,測試通過,可以得出:

  • 第一次操作,與正常方法一致,成功返回結(jié)果值;
  • 第二次操作,直接拋出 RepeatedSubmitException 異常,同時方法未執(zhí)行,結(jié)果與第一次調(diào)用一致;
3.5.3. 冪等與異常

異常是一種特殊的返回值!!!

如果將異常看做是一種特殊的返回值,那冪等接口在第二次請求時同樣需要拋出異常,示例代碼如下:

@Idempotent(executorFactory = "dbExecutorFactory", group = 1, keyEl = "#key",
        handleType = IdempotentHandleType.RESULT)
@Transactional
public Long putExceptionForResult(String key, Long data) {
    return putException(key, data);
}
protected Long putException(String key, Long data){
    this.data.put(key, data);
    throw new IdempotentTestException();
}

@Idempotent 注解沒有變化,只是在 putException 方法執(zhí)行后拋出 IdempotentTestException 異常。

編寫簡單測試用例如下:

@Test
void putExceptionForResult(){
    BaseIdempotentService idempotentService = getIdempotentService();
    String key = String.valueOf(RandomUtils.nextLong());
    Long value = RandomUtils.nextLong();

    {   // 第一次操作,拋出異常
        Assertions.assertThrows(IdempotentTestException.class,
                ()->idempotentService.putExceptionForResult(key, value));
        Assertions.assertEquals(value, idempotentService.getValue(key));
    }

    {   // 第二次操作,返回值和最終結(jié)果 與第一一致(直接獲取返回值,沒有執(zhí)行業(yè)務(wù)邏輯)
        Long valueNew = RandomUtils.nextLong();
        Assertions.assertThrows(IdempotentTestException.class,
                ()->idempotentService.putExceptionForResult(key, valueNew));
        Assertions.assertEquals(value, idempotentService.getValue(key));
    }
}

運(yùn)行測試用例,用例通過,可知:

  • 第一次操作,與方法邏輯一致,更新數(shù)據(jù)并拋出 IdempotentTestException 異常;
  • 第二次操作,直接拋出 IdempotentTestException 異常,同時方法未執(zhí)行,結(jié)果與第一次一致;
3.5.4. 并發(fā)保護(hù)

如果上一個請求執(zhí)行尚未結(jié)束,新的請求已經(jīng)開啟,那會如何?

這就是最常見的并發(fā)場景,idempotent 對其也進(jìn)行了支持,當(dāng)出現(xiàn)并發(fā)請求時會直接拋出 ConcurrentRequestException,用于中斷處理。

首先,使用 sleep 模擬一個耗時的方法,具體如下:

@Idempotent(executorFactory = "dbExecutorFactory", group = 1, keyEl = "#key",
            handleType = IdempotentHandleType.RESULT)
@Transactional
public Long putWaitForResult(String key, Long data) {
    return putForWait(key, data);
}
protected Long putForWait(String key, Long data){
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return put(key, data);
}

putWaitForResult 方法調(diào)用時會主動 sleep 3 秒,然后才執(zhí)行真正的邏輯。

編寫測試代碼如下:

@Test
void putWaitForResult(){
    String key = String.valueOf(RandomUtils.nextLong());
    Long value = RandomUtils.nextLong();

    // 主線程拋出 ConcurrentRequestException 
    Assertions.assertThrows(ConcurrentRequestException.class, () ->
        testForConcurrent(baseIdempotentService ->
            baseIdempotentService.putWaitForResult(key, value))
    );
}
private void testForConcurrent(Consumer<BaseIdempotentService> consumer) throws InterruptedException {
    // 啟動一個線程執(zhí)行任務(wù),模擬并發(fā)場景
    Thread thread = new Thread(() -> consumer.accept(getIdempotentService()));
    thread.start();

    // 主線程 sleep 1 秒,與異步線程并行執(zhí)行任務(wù)
    TimeUnit.SECONDS.sleep(1);
    consumer.accept(getIdempotentService());
}

運(yùn)行單元測試,測試通過,核心測試邏輯如下:

  • 創(chuàng)建一個線程,執(zhí)行耗時方法;
  • 等待 1 秒后,主線程也執(zhí)行耗時方法;
  • 此時,兩個線程并發(fā)執(zhí)行耗時方法,后進(jìn)入的主線程直接拋出 ConcurrentRequestException;
3.5.5. Redis 支持

DB 具有非常好的一致性,但性能存在一定的問題。在一致性要求不高,性能要求高的場景,可以使用 Redis 作為 ExecutionRecord 的存儲引擎。

引入 redis 非常簡單,大致分兩步:

  • 在 IdempotentConfiguration 中注冊 redisExecutorFactory bean;
  • @Idempotent 注解中使用 redisExecutorFactory 即可;

添加 redisExecutorFactory Bean,具體如下:

@Configuration
public class IdempotentConfiguration extends IdempotentConfigurationSupport {

    @Bean("redisExecutorFactory")
    public IdempotentExecutorFactory redisExecutorFactory(ExecutionRecordRepository executionRecordRepository){
        return createExecutorFactory(executionRecordRepository);
    }

    @Bean
    public ExecutionRecordRepository executionRecordRepository(RedisTemplate<String, ExecutionRecord> recordRedisTemplate){
        return new RedisBasedExecutionRecordRepository("ide-%s-%s", Duration.ofDays(7), recordRedisTemplate);
    }

    @Bean
    public RedisTemplate<String, ExecutionRecord> recordRedisTemplate(RedisConnectionFactory redisConnectionFactory){
        RedisTemplate<String, ExecutionRecord> redisTemplate = new RedisTemplate();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.configure(FAIL_ON_UNKNOWN_PROPERTIES, false);
        Jackson2JsonRedisSerializer<ExecutionRecord> executionRecordJackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(ExecutionRecord.class);
        executionRecordJackson2JsonRedisSerializer.setObjectMapper(objectMapper);
        redisTemplate.setValueSerializer(executionRecordJackson2JsonRedisSerializer);
        return redisTemplate;
    }
}

@Idempotent 注解調(diào)整如下:

@Idempotent(executorFactory = "redisExecutorFactory", group = 1, keyEl = "#key",
        handleType = IdempotentHandleType.RESULT)
@Override
public Long putForResult(String key, Long data){
    return put(key, data);
}

這樣,所有的冪等信息都會存儲在 redis 中。

【注】一般 redis 不會對數(shù)據(jù)進(jìn)行持久存儲,只能保障在一段時間內(nèi)的冪等性,超出時間后,由于 key 被自動清理,冪等將不再生效。對于業(yè)務(wù)場景不太嚴(yán)格但性能要求較高的場景才可使用,比如為過濾系統(tǒng)中由于 retry 機(jī)制造成的重復(fù)請求。

4. 小結(jié)

當(dāng)系統(tǒng)開啟微服務(wù)化后,服務(wù)調(diào)用的第三狀態(tài)就成為了不可回避的話題。通常情況下,會綜合使用 客戶端重試 和 服務(wù)端冪等 兩個方案來解決:

  • 客戶端重試。需要根據(jù)不同的場景選擇不同的 Retry 和 Fallback 機(jī)制:

寫請求,建設(shè)使用 Retry 機(jī)制,保障最重要的流量不被浪費(fèi)

讀請求,建議使用 Fallback 機(jī)制,避免重試流量對下游服務(wù)造成巨大壓力

  • 服務(wù)端冪等。冪等作為一種通用能力,建議與業(yè)務(wù)邏輯進(jìn)行分離,在需要的時候直接在業(yè)務(wù)方法上進(jìn)行配置即可,但仍舊有一些最佳實(shí)踐:
  • 使用直接返回上次的處理結(jié)果替代異常中斷,使調(diào)用方更加簡潔;
  • 業(yè)務(wù)異常是一種特殊的返回值,也需要進(jìn)行冪等保護(hù);
  • 有冪等保護(hù)后,仍舊需要對并發(fā)請求進(jìn)行處理,此時直接通過異常對重復(fù)流程進(jìn)行中斷即可;

以上兩種場景,lego 對其都進(jìn)行了封裝,可以方便的應(yīng)用于業(yè)務(wù)系統(tǒng),從而降低微服務(wù)的傷害。

責(zé)任編輯:武曉燕 來源: geekhalo
相關(guān)推薦

2024-03-06 14:58:52

客戶端微服務(wù)架構(gòu)

2009-08-21 16:14:52

服務(wù)端與客戶端通信

2011-09-09 09:44:23

WCF

2009-08-21 15:59:22

服務(wù)端與客戶端通信

2009-08-21 15:36:41

服務(wù)端與客戶端

2009-08-21 15:54:40

服務(wù)端與客戶端

2010-11-19 14:22:04

oracle服務(wù)端

2010-03-18 17:47:07

Java 多客戶端通信

2023-03-06 08:01:56

MySQLCtrl + C

2021-11-03 16:09:40

Java泛型代碼

2013-08-07 14:19:39

移動互聯(lián)網(wǎng)BAT三巨頭移動市場

2015-01-13 10:32:23

RestfulWeb框架

2022-09-05 14:36:26

服務(wù)端TCP連接

2023-04-03 08:13:05

MySQLCtrl + C

2009-04-21 14:43:31

2013-06-07 10:10:39

2016-11-10 11:19:23

阿里云計(jì)算大數(shù)據(jù)

2021-10-19 08:58:48

Java 語言 Java 基礎(chǔ)

2021-06-11 06:54:34

Dubbo客戶端服務(wù)端

2022-06-14 15:07:04

IPC客戶端服務(wù)端
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號

主站蜘蛛池模板: 欧美一区二区 | 麻豆国产一区二区三区四区 | 国产精品久久免费观看 | 久久99精品国产自在现线小黄鸭 | 亚洲国产精品99久久久久久久久 | 91黄在线观看 | 午夜电影网站 | 成人精品一区二区三区中文字幕 | 中文字幕免费在线 | 日韩一区二区在线视频 | 久久高清 | 二区国产 | 亚洲激情一区二区三区 | 亚洲国产精品久久人人爱 | 亚洲精品久久久一区二区三区 | 男人av在线播放 | 欧产日产国产精品99 | 欧美一区免费 | 成人亚洲视频 | 日韩成人在线免费视频 | 成人片免费看 | 黄色成人免费在线观看 | 欧美日韩视频在线第一区 | av黄色在线观看 | 中文字幕 国产精品 | 久久久久久久网 | 天天爽夜夜骑 | 超级黄色一级片 | 老牛嫩草一区二区三区av | 亚洲欧洲精品成人久久奇米网 | 亚洲一区欧美 | 欧美日韩成人在线观看 | 婷婷中文在线 | 浮生影院免费观看中文版 | 亚洲精品久久久久久国产精华液 | 影音先锋中文字幕在线观看 | 91色在线 | 祝你幸福电影在线观看 | 久久久这里都是精品 | 日韩中文一区 | 免费成人在线网站 |