
1、背景介紹
供應(yīng)鏈倉(cāng)儲(chǔ)域子域繁多,例如庫(kù)存域,lpn域等,平時(shí)開發(fā)的過程中涉及很多分布式事務(wù)的場(chǎng)景,例如收貨加庫(kù)存,發(fā)貨扣庫(kù)存,揀貨入箱,發(fā)貨出箱等一些分布式事務(wù)場(chǎng)景,所以迫切需要出一套分布式事務(wù)處理方案,在調(diào)研了市場(chǎng)上的分布式事務(wù)解決方案,結(jié)合wms自身業(yè)務(wù)域不是強(qiáng)一致性的特色,選擇了最終一致性,且使用本地消息表去實(shí)現(xiàn)它。
本地消息表這個(gè)方案最初是ebay提出的,核心就是將需要分布式處理的任務(wù)通過本地消息日志存儲(chǔ)的方式來異步執(zhí)行。該方案可以存到本地文本,數(shù)據(jù)庫(kù)或消息隊(duì)列,再通過異步線程或者自動(dòng)job發(fā)起重試。
2、設(shè)計(jì)前的思考
在操作本地事務(wù)的同時(shí),需要額外寫入一張需要最終一致性業(yè)務(wù)記錄的表,即本地消息表,且該業(yè)務(wù)記錄是有狀態(tài)的,在本地事務(wù)提交后,再執(zhí)行需要最終一致性的方法,成功后更新記錄狀態(tài)為成功,如果失敗了,還要引入兜底重試機(jī)制,下圖能簡(jiǎn)單介紹它的功能:

為了實(shí)現(xiàn)以上最終一致性的功能。我們同樣還需要做到以下幾點(diǎn):
- 無侵入:這個(gè)好理解,對(duì)于需要最終一致性的場(chǎng)景,可以很簡(jiǎn)單的實(shí)現(xiàn)
- 策略化:包括重試次數(shù),重試的間隔時(shí)間,是否使用異步方式等
- 通用性:最好是無改動(dòng)(或者很小改動(dòng))的支持絕大部分的場(chǎng)景,拿過來直接可用
- 復(fù)用性:對(duì)于異常場(chǎng)景存在需要重試場(chǎng)景,同時(shí)希望把正常邏輯和重試邏輯復(fù)用
3、架構(gòu)設(shè)計(jì)
調(diào)研了大家對(duì)一致性框架的訴求,最終定義了如,入?yún)⒆远x序列化,環(huán)境隔離,同步異步執(zhí)行切換,注解支持,自定義執(zhí)行攔截器,以及適配得物倉(cāng)儲(chǔ)業(yè)務(wù)的業(yè)務(wù)上下文訂制以及持久化等一系列的核心能力,底層依賴了Spring的生態(tài),在數(shù)據(jù)存儲(chǔ)依賴了Mysql,Mongodb,緩存分布式鎖上依賴了Redis等一系列主流的中間件,最終以jar包形式實(shí)現(xiàn),盡可能做到即拿即用。

4、詳細(xì)設(shè)計(jì)
基于在以上的架構(gòu)設(shè)計(jì)后,做了以下設(shè)計(jì)
4.1 注解支持:@EnableConsistency
為了讓用戶更快,更方便的接入一致性框架,我們?cè)谠缙诘某橄箢惱^承的方案上做了一版本升級(jí),使用注解,使得使用方式跟@Transactional注解一樣,只要加上@EnableConsistency就支持最終一致性的支持,非常方便。
4.2 自動(dòng)重試&重試等待策略可配:
最終一致性有個(gè)天然的組成部分就是需要重試,一致性框架也不例外,引用了Spring的
ScheduledTask實(shí)現(xiàn)定時(shí)重試那些運(yùn)行失敗的記錄,另外重試等待策略同樣可配置:
4.2.1 重試等待策略
固定時(shí)間重試
支持配置固定時(shí)間間隔重試
延遲指數(shù)重試
底層采用Math.pow函數(shù)在重試次數(shù)越多次,執(zhí)行間隔時(shí)間呈指數(shù)級(jí)增長(zhǎng)

4.2.2 自定義重試次數(shù)
注解式支持重試次數(shù)的定義
4.3 自定義攔截器
在執(zhí)行需要最終一致性方法的時(shí)候,我們同樣提供了如Spring AOP一樣被代理方法的前置,成功,異常后需要做的一些切面功能,非常方便的滿足使用者的擴(kuò)展,解耦了實(shí)現(xiàn)與擴(kuò)展。

/**
* 在記錄重試次數(shù)失敗后 執(zhí)行
* @param context
* @param lastException
*/
void close(ConsistencyContext context, Throwable lastException);
/**
* 開始執(zhí)行重試前 攔截器
* 如果返回false 則 執(zhí)行期不繼續(xù)進(jìn)行
* @param context
* @return
*/
boolean open(ConsistencyContext context);
/**
* 發(fā)生異常攔截器
* @param context
* @param throwable
*/
void onError(ConsistencyContext context, Throwable throwable);
4.4 業(yè)務(wù)上下文&持久化
在業(yè)務(wù)系統(tǒng)的開發(fā)中,存在一個(gè)ThreadLocal的上下文,記錄了用戶的組織架構(gòu),簽到等一系列用戶信息。在設(shè)計(jì)一致性框架的時(shí)候我們考慮到用戶上下文的存在,暴露了業(yè)務(wù)上下文擴(kuò)展,以及存儲(chǔ)業(yè)務(wù)上下文供重試時(shí)使用的能力。

public interface ContextListener {
/**
* 獲取上下文內(nèi)容
* @return
*/
Map<String,String> getContext();
/**
* 設(shè)置用戶上下文
* @param contextMap
*/
void setContext(Map<String, String> contextMap);
/**
* 清除用戶上下文
* @param contextMap
*/
void clear(Map<String, String> contextMap);
}
4.5 環(huán)境隔離
由于得物的預(yù)發(fā)環(huán)境與生產(chǎn)環(huán)境采用的同一批數(shù)據(jù)庫(kù),故也將一致性框架記錄采用了spring.profile.active的值做環(huán)境隔離,確保重試時(shí)不會(huì)預(yù)發(fā)的跑到生產(chǎn)的數(shù)據(jù)。

4.6 入?yún)⒆远x序列化
由于需要在本地消息表中記錄需要重試的方法的入?yún)ⅲ示蜕婕暗饺雲(yún)⑿蛄谢膯栴},在思考良久之后,只提供默認(rèn)的Json方式的序列化與反序列化,如果用戶需要額外的序列化與反序列化方法,我們也支持,提供了暴露序列化與反序列化的口子供用戶實(shí)現(xiàn)。
public interface SerializerListener{
/**
* 進(jìn)行序列化
* @param params
* @return
*/
String serializer(Object[] params);
/**
* 反序列化
* @param str
* @return
*/
Object[] deserializer(String str);
}
4.7 執(zhí)行模式可配置
一般用本地消息執(zhí)行最終一致性的部分,開始的設(shè)計(jì)是異步化執(zhí)行,后續(xù)收到使用者用戶的反饋,也有部分同步執(zhí)行的場(chǎng)景,故增加了同步異步執(zhí)行開關(guān),開發(fā)者自行選擇。
4.8 數(shù)據(jù)模型&狀態(tài)機(jī)

4.9 核心代碼展示
4.9.1ExecutorAutoConfiguration框架初始化
/**
* 加載為切面增強(qiáng)提供織入接口advice,和注入advice的pointcut
*/
@PostConstruct
public void init() {
Set<Class<? extends Annotation>> eventualConsistencyAnnotationTypes = new LinkedHashSet<Class<? extends Annotation>>(1);
eventualConsistencyAnnotationTypes.add(EventualConsistency.class);
this.pointcut = buildPointcut(eventualConsistencyAnnotationTypes);
this.advice = buildAdvice();
buildExecutorManager();
if (this.advice instanceof BeanFactoryAware) {
((BeanFactoryAware) this.advice).setBeanFactory(beanFactory);
}
}
4.9.2 核心注解@EventualConsistency
使用最終一致性方法的核心注解:
public @interface EventualConsistency {
/**
* 第一次執(zhí)行是否異步執(zhí)行
* @return
*/
boolean async() default true;
/**
* 最大重試次數(shù)
* @return
*/
int maxRetryTimes() default 6;
/**
* 延遲時(shí)間
* @return
*/
Delay delay() default @Delay;
/**
* Bean names 攔截器
* @return retry listeners bean names
*/
String[] listeners() default {};
@Deprecated
String label() default "";
String beanName() default "";
/**
* Bean names 攔截器 用來進(jìn)行序列化和反序列化
* @return retry listeners bean names
*/
String serializerListener() default "";
String referenceNo() default "";
@AliasFor(annotation = Transactional.class)
Class<? extends Throwable>[] rollbackFor() default Exception.class;
@AliasFor(annotation = Transactional.class)
Class<? extends Throwable>[] noRollbackFor() default {};
boolean manageContext() default true;
}
4.9.3 核心實(shí)現(xiàn)Advice,MethodInterceptor的AnnotationAwareRetryOperationsInterceptor

@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
// 獲取最終執(zhí)行攔截的委托
MethodInterceptor delegate = getDelegate(invocation.getThis(), invocation.getMethod());
if (delegate != null) {
return delegate.invoke(invocation);
} else {
return invocation.proceed();
}
}
4.9.4 延遲執(zhí)行策略
public interface DelayPolicy {
/**
* 獲取下次執(zhí)行時(shí)間
* @return
*/
Long nextTime();
}
/**
* 延遲指數(shù)
*/
public class ExponentialDelayPolicy implements DelayPolicy {
public static final long DEFAULT_INITIAL_INTERVAL = 2L;
public static final int DEFAULT_MULTIPLIER = 3;
/**
* 默認(rèn)重試最大延遲時(shí)間 (24小時(shí))
*/
public static final long DEFAULT_MAX_INTERVAL = 86400L;
/**
* 初始間隔
*/
private volatile long initialInterval = DEFAULT_INITIAL_INTERVAL;
private volatile int multiplier = DEFAULT_MULTIPLIER;
/**
* 最大延遲時(shí)間
*/
private volatile long maxInterval = DEFAULT_MAX_INTERVAL;
@Override
public Long nextTime() {
ConsistencyContext context = ConsistencyContextHolder.getContext();
Double pow = Math.pow(initialInterval + context.getRetryCounts(), multiplier);
if(pow > maxInterval){
return maxInterval;
}
return pow.longValue();
}
}
/**
* 固定時(shí)間
*/
public class FixedDelayPolicy implements DelayPolicy {
private static final long DEFAULT_DELAY_PERIOD = 10L;
private volatile long delayPeriod = DEFAULT_DELAY_PERIOD;
@Override
public Long nextTime() {
return this.delayPeriod;
}
}
4.9.5 AsyncConsistencyExecutor異步最終一致性執(zhí)行

5、未來展望
(1)后臺(tái)管理頁(yè)面設(shè)計(jì),支持報(bào)表查詢,以及錯(cuò)誤異常處理
(2)trace監(jiān)控接入,方便定位問題
(3)適配業(yè)務(wù)支持類型DB
(4)自定義歸檔策略
最終一致性框架是由wms全組同學(xué)一起設(shè)計(jì)和開發(fā)完成,并且陪伴了得物快速發(fā)展的兩年多,經(jīng)歷了2個(gè)618以及3個(gè)雙十一,若干個(gè)情人節(jié),圣誕節(jié)的考驗(yàn)。系統(tǒng)運(yùn)行健康,無性能瓶頸,提升了很多場(chǎng)景下最終一致性的開發(fā)速度。目前仍在安全穩(wěn)健的保障著倉(cāng)儲(chǔ)域服務(wù)的正常運(yùn)轉(zhuǎn)。