一文徹底搞懂阿里開源 TransmittableThreaLocal 的原理和使用
今天來聊一聊阿里的 TTL 也就是TransmittableThreadLocal。
對于實現父子線程的傳參使用的一般就是InheritableThreadLocal,對于 InheritableThreadLocal 是如何實現的父子傳參可以參考之前發表的這篇文章。
有的同學就會問了,既然有了InheritableThreadLocal能夠實現父子線程的傳參,那么阿里為什么還要在開源一個自己的TransmittableThreadLocal出來呢?
下面就說一下TransmittableThreadLocal解決了什么問題?
版本:TransmittableTreadLocal v2.14.5
代碼示例中都沒有做remove操作,實際使用中不要忘記哦。本文代碼示例加入remove方法不影響測試結果。
一、TransmittableThreadLocal解決了什么問題?
先思考一個問題,在業務開發中,如果想異步執行這個任務可以使用哪些方式?
- 使用@Async注解
- new Thread()
- 線程池
- MQ
- 其它
上述的幾種方式中,暫時只探討線程的方式,MQ等其他方式暫不在本文的探討范圍內。
不管是使用@Async注解,還是使用線程或者線程池,底層原理都是通過另一個子線程執行的。
對于@Async注解原理不了解的點擊鏈接跳轉進行查閱。
既然是子線程,那么在涉及到父子線程之間變量傳參的時候你們是通過什么方式實現的呢?
父子線程之間進行變量的傳遞可以通過InheritableThreadLocal實現。
InheritableThreadLocal實現父子線程傳參的原理可以參考這篇。
《InheritableThreadLocal 是如何實現的父子線程局部變量的傳遞》
本文可以說是對InheritableThreadLocal的一個補充。
當我們在使用new Thread()時,直接通過設置一個ThreadLocal即可實現變量的傳遞。
需要注意的是,此處傳值需要使用InheritableThreadLocal,因為ThreadLocal無法實現在子線程中獲取到父線程的值。
由于工作中大部分場景都是使用的線程池,所以我們上面的方式還可以生效嗎?
線程池中線程的數量是可以指定的,并且線程是由線程池創建好,池化之后反復使用的。所以此時的父子線程關系中的變量傳遞就沒有了意義,我們需要的是任務提交到線程池時的ThreadLocal變量值傳遞到任務執行時的線程。
在InheritableThreadLocal原理這篇文章的末尾,我們提到了線程池的傳參方式,本質上也是通過InheritableThreadLocal進行的變量傳遞。
而阿里的TransmittableThreadLocal類是繼承加強的InheritableThreadLocal。
TransmittableThreadLocal可以解決線程池中復用線程時,將值傳遞給實際執行業務的線程,解決異步執行時的上下文傳遞問題。
除此之外,還有幾個典型場景例子:
- 分布式跟蹤系統或者全鏈路壓測(鏈路打標)。
- 日志收集系統上下文。
- Session 級 Cache。
- 應用容器或者上層框架跨應用代碼給下層 SDK 傳遞信息。
二、TransmittableThreadLocal 怎么用?
上面我們知道了TransmittableThreadLocal可以用來做什么,解決的是線程池中池化線程復用線程時的值傳遞問題。
下面我們就一起來看下怎么使用?
1.ThreadLocal
所有代碼示例都在 springboot 中演示。
ThreadLocal 在父子線程間是如法傳參的,使用方式如下:
@RestController
@RequestMapping("/test2")
public class Test2Controller {
ThreadLocal<String> stringThreadLocal = new ThreadLocal<>();
@RequestMapping("/set")
public Object set(){
stringThreadLocal.set("主線程給的值:stringThreadLocal");
Thread thread = new Thread(() -> {
System.out.println("讀取父線程stringThreadLocal的值:" + stringThreadLocal.get());
});
thread.start();
return "";
}
}
啟動之后訪問 /test2/set,顯示如下:
通過上面的輸出可以看出來,并沒有讀取到父線程的值。
所以為了實現父子傳參,需要把 ThreadLocal 修改為 InheritableThreadLocal 。
2.InheritableThreadLocal
代碼修改完成之后如下:
@RestController
@RequestMapping("/test2")
public class Test2Controller {
ThreadLocal<String> stringThreadLocal = new ThreadLocal<>();
ThreadLocal<String> inheritableThreadLocal = new InheritableThreadLocal<>();
@RequestMapping("/set")
public Object set(){
stringThreadLocal.set("主線程給的值:stringThreadLocal");
inheritableThreadLocal.set("主線程給的值:inheritableThreadLocal");
Thread thread = new Thread(() -> {
System.out.println("讀取父線程stringThreadLocal的值:" + stringThreadLocal.get());
System.out.println("讀取父線程inheritableThreadLocal的值:" + inheritableThreadLocal.get());
});
thread.start();
return "";
}
}
同樣的執行一下看輸出:
在上面的演示例子中,都是直接用的new Thread(),下面我們改為線程池的方式試試。
修改完成之后的代碼如下所示:
@RestController
@RequestMapping("/test2")
public class Test2Controller {
ThreadLocal<String> stringThreadLocal = new ThreadLocal<>();
ThreadLocal<String> inheritableThreadLocal = new InheritableThreadLocal<>();
ThreadLocal<String> transmittableThreadLocal = new TransmittableThreadLocal<>();
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>());
@RequestMapping("/set")
public Object set(){
for (int i = 0; i < 10; i++) {
String val = "主線程給的值:inheritableThreadLocal:"+i;
System.out.println("主線程set;"+val);
inheritableThreadLocal.set(val);
executor.execute(()->{
System.out.println("線程池:讀取父線程 inheritableThreadLocal 的值:" + inheritableThreadLocal.get());
});
}
return "";
}
}
同樣的看下輸出:
通過輸出我們可以得出結論,當使用線程池時,因為線程都是復用的,在子線程中獲取父線程的值,可能獲取出來的是上一個線程 的值,所以這里會有線程安全問題。
線程池中的線程并不一定每次都是新創建的,所以對于InheritableThreadLocal是無法實現父子傳參的。
如果感覺輸出不夠明顯可以輸出子線程的線程名稱。
下面我們看下怎么使用 TransmittableThreadLocal解決線程池中父子變量傳遞問題。
3.TransmittableThreadLocal
繼續對上面代碼進行改造,改造完成之后如下所示:
修改部分:TransmittableThreadLocal 的第一種使用方式,TtlRunnable.get() 封裝。
@RestController
@RequestMapping("/test2")
public class Test2Controller {
ThreadLocal<String> transmittableThreadLocal = new TransmittableThreadLocal<>();
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>());
@RequestMapping("/set")
public Object set(){
for (int i = 0; i < 10; i++) {
String val = "主線程給的值:TransmittableThreadLocal:"+i;
System.out.println("主線程set3;"+val);
transmittableThreadLocal.set(val);
executor.execute(TtlRunnable.get(()->{
System.out.println("線程池線程:"+Thread.currentThread().getName()+
"讀取父線程 TransmittableThreadLocal 的值:"
+ transmittableThreadLocal.get());
}));
}
return "";
}
}
執行結果如下所示:
通過日志輸出可以看到,子線程的輸出已經把父線程中設置的值全部輸出了,并沒有像 InheritableThreadLocal 那樣一直使用那幾個值。
可以得出結論,TransmittableThreadLocal可以解決線程池中復用線程時,將值傳遞給實際執行業務的線程,解決異步執行時的上下文傳遞問題。
那么這樣就沒問題了嗎,看起來使用真的很簡單,僅僅需要將 Runnable 封裝下即可,下面我們將ThreadLocal中存儲的 String 類型的值改為 Map在試試。
三、TransmittableThreadLocal 中的深拷貝
我們將 ThreadLocal 中存儲的值改為 Map,修改完代碼如下:
@RestController
@RequestMapping("/test2")
public class Test2Controller {
ThreadLocal<Map<String,Object>> transmittableThreadLocal = new TransmittableThreadLocal<>();
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>());
@RequestMapping("/set")
public Object set(){
Map<String, Object> map = new HashMap<>();
map.put("mainThread","主線程給的值:main");
System.out.println("主線程賦值:"+ map);
transmittableThreadLocal.set(map);
executor.execute(TtlRunnable.get(()->{
System.out.println("線程池線程:"+Thread.currentThread().getName()+
"讀取父線程 TransmittableThreadLocal 的值:"
+ transmittableThreadLocal.get());
}));
return "";
}
}
調用接口執行結果如下:
可以看到沒啥問題,下面我們簡單改一下代碼。
- 在主線程提交子線程的任務之后再次修改 ThreadLocal 的值。
- 在子線程中修改 ThreadLocal 的值。
修改完成的代碼如下所示:
@RestController
@RequestMapping("/test2")
public class Test2Controller {
ThreadLocal<Map<String, Object>> transmittableThreadLocal = new TransmittableThreadLocal<>();
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>());
@RequestMapping("/set")
public Object set() {
Map<String, Object> map = transmittableThreadLocal.get();
if (null == map) {map = new HashMap<>();}
map.put("mainThread", "主線程給的值:main");
System.out.println("主線程賦值:" + map);
transmittableThreadLocal.set(map);
executor.execute(TtlRunnable.get(() -> {
System.out.println("子線程輸出:" + Thread.currentThread().getName() + "讀取父線程 TransmittableThreadLocal 的值:" + transmittableThreadLocal.get());
Map<String, Object> childMap = transmittableThreadLocal.get();
if (null == childMap){childMap = new HashMap<>();}
childMap.put("childThread","子線程添加值");
}));
Map<String, Object> stringObjectMap = transmittableThreadLocal.get();
if (null == stringObjectMap) {
stringObjectMap = new HashMap<>();
}
stringObjectMap.put("mainThread-2", "主線程第二次賦值");
transmittableThreadLocal.set(stringObjectMap);
try{
Thread.sleep(1000);
}catch (InterruptedException e){e.printStackTrace();}
System.out.println("主線程第二次輸出ThreadLocal:"+transmittableThreadLocal.get());
return "";
}
}
調用接口輸出如下:
通過日志輸出可以得出結論,當 ThreadLocal 存儲的是對象時,父子線程共享同一個對象。
也就是說父子線程之間的修改都是可見的,原因就是父子線程持有的 Map 都是同一個,在父線程第二次設置值的時候,因為修改的都是同一個 Map,所以子線程也可以讀取到。
這一點需要特別的注意,如果有嚴格的業務邏輯,且共享同一個ThreadLocal,需要注意這個線程安全問題。
那么怎么解決呢,那就是深拷貝,對象的深拷貝,保證父子線程獨立,在修改的時候就不會出現父子線程共享同一個對象的事情。
TransmittableThreadLocal 其中有一個 copy 方法,copy 方法就是復制父線程值的,在此處返回一個新的對象,而不是父線程的對象即可,代碼修改如下:
為什么是 copy 方法,后文會有介紹。
@RestController
@RequestMapping("/test2")
public class Test2Controller {
ThreadLocal<Map<String, Object>> transmittableThreadLocal = new TransmittableThreadLocal(){
@Override
public Object copy(Object parentValue) {
return new HashMap<>((Map)parentValue);
}
};
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>());
@RequestMapping("/set")
public Object set() {
Map<String, Object> map = transmittableThreadLocal.get();
if (null == map) {map = new HashMap<>();}
map.put("mainThread", "主線程給的值:main");
System.out.println("主線程賦值:" + map);
transmittableThreadLocal.set(map);
executor.execute(TtlRunnable.get(() -> {
System.out.println("子線程輸出:" + Thread.currentThread().getName() + "讀取父線程 TransmittableThreadLocal 的值:" + transmittableThreadLocal.get());
Map<String, Object> childMap = transmittableThreadLocal.get();
if (null == childMap){childMap = new HashMap<>();}
childMap.put("childThread","子線程添加值");
}));
Map<String, Object> stringObjectMap = transmittableThreadLocal.get();
if (null == stringObjectMap) {
stringObjectMap = new HashMap<>();
}
stringObjectMap.put("mainThread-2", "主線程第二次賦值");
transmittableThreadLocal.set(stringObjectMap);
try{
Thread.sleep(1000);
}catch (InterruptedException e){e.printStackTrace();}
System.out.println("主線程第二次輸出ThreadLocal:"+transmittableThreadLocal.get());
return "";
}
}
修改部分如下:
調用接口,查看執行結果可以發現,父子線程的修改已經是獨立的對象在修改,不再是共享的。
相信到了這,對于 TransmittableThreadLocal 如何使用應該會了吧,下面我們就一起來看下 TransmittableThreadLocal到底是如何做到的父子線程變量的傳遞的。
四、TransmittableThreadLocal 原理
TransmittableThreadLocal 簡稱 TTL。
在開始之前先放一張官方的時序圖,結合圖看源碼更容易懂哦!
1.TransmittableThreadLocal 使用方式
(1) 修飾 Runnable 和Callable
這種方式就是上面代碼示例中的形式,通過 TtlRunnable和TtlCallable 修改傳入線程池的 Runnable 和 Callable。
(2) 修飾線程池
修飾線程池可以使用TtlExecutors工具類實現,其中有如下方法可以使用。
(3) Java Agent
Agent 的形式不會對代碼入侵,具體的使用可以參考官網,這里就不再說了,官網鏈接我會放在文章末尾。
需要注意的是,如果需要和其他 Agent (如Skywalking、Promethues)一起使用,需要把 TransmittableThreadLocal Java Agent 放在第一位。
2.源碼分析
先簡單的概括下:
- 修飾 Runnable ,將主線程的 TTL 值傳入到 TtlRunnable 的構造方法中。
- 將子線程的 TTL 進行備份,主線程的值設置到子線程中。
- 子線程執行業務邏輯。
- 刪除子線程新增的 TTL,將備份重新設置到子線程中。
(1) TtlRunnable#run 方法做了什么
先從TtlRunnable#run方法入手。
從整體流程來看,整個上下文的傳遞流程可以規范成快照、回放、恢復(CRR)三個操作。
- captured 是主線程(線程A)傳遞的 TTL的值。
- backup 是子線程(線程B)中當前存在的 TTL 的值。
- replay 操作會將主線程中(線程A)的 TTL 的值回放到當前子線程(線程B)中,并返回回放前的 TTL 值的備份也就是上面的 backup。
- runnable.run() 是待執行的方法。
- restore 是恢復子線程(線程B)進入之時備份的 TTL 的值。因為子線程的 TTL 可能已經發生變化,所以該方法就是回滾到子線程執行 replay 方法之前的 TTL 值。
(2) captured 快照是什么時候做的
同學們思考下,快照又是什么時候做的呢?
通過上面 run 方法可以看到,在該方法的第一行已經是獲取快照的值了,所以生成快照肯定不在run方法內了。
提示一下,開頭放的時序圖還記得嗎,可以看下4.1。
還記得我們封裝了線程嗎,使用TtlRunnable.get()進行封裝的,返回的是TtlRunnable。
答案就在這個方法內部,來看下方法內部做了哪些事情。
@Nullable
@Contract(value = "null -> null; !null -> !null", pure = true)
public static TtlRunnable get(@Nullable Runnable runnable) {
return get(runnable, false, false);
}
@Nullable
@Contract(value = "null, _, _ -> null; !null, _, _ -> !null", pure = true)
public static TtlRunnable get(@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) {
if (runnable == null) return null;
if (runnable instanceof TtlEnhanced) {
// avoid redundant decoration, and ensure idempotency
if (idempotent) return (TtlRunnable) runnable;
else throw new IllegalStateException("Already TtlRunnable!");
}
return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun);
}
private TtlRunnable(@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
this.capturedRef = new AtomicReference<>(capture());
this.runnable = runnable;
this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
}
可以看到在調用TtlRunnable.get() 方法的最后,調用了TtlRunnable的構造方法,在該方法內部,又調用了capture方法。
capture 方法內部是真正做快照的地方。
其中的transmittee.capture()調用的ttlTransmittee的。
需要注意的是,threadLocal.copyValue()拷貝的是引用,所以如果是對象,就需要重寫copy方法。
public T copy(T parentValue) {
return parentValue;
}
代碼中的 holder 是一個InheritableThreadLocal,他的值類型是WeakHashMap。
key 是TransmittableThreadLocal,value 始終是 null且始終沒有使用。
里面維護了所有使用到的 TransmittableThreadLocal,統一添加到 holder中。
到了這又有了一個疑問?holder 中的 值什么時候添加的?
陷入看源碼的誤區,一個一個的來,不要一個方法一直擴散,要有一條主線,對于我們這里,已經知道了什么時候進行的快照,如何快照的就可以了,對于 holder中的值在哪里添加的,這就是另一個問題了。
(3) holder 中在哪賦值的
holder 中賦值的地方在 addThisToHolder方法中實現。
具體可以在transmittableThreadLocal.get()與transmittableThreadLocal.set()中查看。
@Override
public final T get() {
T value = super.get();
if (disableIgnoreNullValueSemantics || value != null) addThisToHolder();
return value;
}
@Override
public final void set(T value) {
if (!disableIgnoreNullValueSemantics && value == null) {
// may set null to remove value
remove();
} else {
super.set(value);
addThisToHolder();
}
}
private void addThisToHolder() {
if (!holder.get().containsKey(this)) {
holder.get().put((TransmittableThreadLocal<Object>) this, null); // WeakHashMap supports null value.
}
}
addThisToHolder 中將此 TransmittableThreadLocal實例添加到 holder 的 key 中。
通過此方法,可以將所有用到的 TransmittableThreadLocal 實例記錄。
(4) replay 備份與回放數據
replay方法只做了兩件事。
- 將快照中(主線程傳遞)的數據設置到當前子線程中。
- 返回當前線程的 TTL 值(快照回放當前子線程之前的TTL)。
在 transmittee.replay 方法中真正的執行了備份與回放操作。
(5) restore 恢復
我們看下 CRR 操作的最后一步 restore 恢復。
restore 的功能就是將當前線程的 TTL 恢復到方法執行前備份的值。
restore 方法內部調用了transmittee.restore方法。
思考一下:為什么要在任務執行結束之后執行 restore 操作呢?
首先就是為了保持線程的干凈,線程池中的線程都是復用的。
當一個線程重復執行多個任務的時候,第一個任務修改了 TTL 的值,如果不進行 restore ,第二個任務開始時就會獲取到第一個任務修改之后的值,而不是預期的初始的值。
五、TransmittableThreadLocal的初始化方法
對于TransmittableThreadLocal相關的初始化方法有三個,如圖所示。
1.ThreadLocal#initialValue()
ThreadLocal 沒有值時取值的方法,該方法在ThreadLocal#get 觸發。
需要注意的是ThreadLocal#initialValue()是懶加載的,也就是創建ThreadLocal實例的時候并不會觸發ThreadLocal#initialValue()的調用。
如果我們先進行了 ThreadLocal.set(T)操作,在進行取值操作,也不會觸發ThreadLocal#initialValue(),因為已經有值了,即使是設置的NULL也不會觸發該初始化操作。
如果調用了remove 方法,在取值會觸發初始化ThreadLocal#initialValue()操作。
2.InheritableThreadLocal#childValue(T)
childValue方法用于在創建新線程時,初始化子線程的InheritableThreadLocal值。
3.TransmittableThreadLocal#copy(T)
在TtlRunnable或者TtlCallable 創建的時候觸發。
例如 TtlRunnable.get()快照時觸發。
用于初始化在例如:TtlRunnable執行中的TransmittableThreadLocal值。
六、總結
本文通過代碼示例依次演示ThreadLocal,InheritableThreadLocal,TransmittableThreadLocal實現父子線程傳參演化過程。
得出結論如下:
- 使用ThreadLocal無法實現父子線程傳參。
- InheritableThreadLocal可以實現父子傳參,但是線程池場景復用線程問題無法解決。
- TransmittableThreadLocal可以解決線程池復用線程的問題。
需要注意的是TransmittableThreadLocal保存對象時有深拷貝需求的需要重寫TransmittableThreadLocal#copy(T)方法。