服務(wù)down機(jī)了,線程池中的數(shù)據(jù)如何保證不丟失?
前言
最近有位小伙伴在我的技術(shù)群里,問(wèn)了我一個(gè)問(wèn)題:服務(wù)down機(jī)了,線程池中如何保證不丟失數(shù)據(jù)?
這個(gè)問(wèn)題挺有意思的,今天通過(guò)這篇文章,拿出來(lái)跟大家一起探討一下。
1 什么是線程池?
之前沒(méi)有線程池的時(shí)候,我們?cè)诖a中,創(chuàng)建一個(gè)線程有兩種方式:
- 繼承Thread類
- 實(shí)現(xiàn)Runnable接口
雖說(shuō)通過(guò)這兩種方式創(chuàng)建一個(gè)線程,非常方便。
但也帶來(lái)了下面的問(wèn)題:
- 創(chuàng)建和銷毀一個(gè)線程,都是比較耗時(shí),頻繁的創(chuàng)建和銷毀線程,非常影響系統(tǒng)的性能。
- 無(wú)限制的創(chuàng)建線程,會(huì)導(dǎo)致內(nèi)存不足。
- 有新任務(wù)過(guò)來(lái)時(shí),必須要先創(chuàng)建好線程才能執(zhí)行,不能直接復(fù)用線程。
為了解決上面的這些問(wèn)題,Java中引入了:線程池。
它相當(dāng)于一個(gè)存放線程的池子。
使用線程池帶來(lái)了下面3個(gè)好處:
- 降低資源消耗。通過(guò)重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。
- 提高響應(yīng)速度。當(dāng)任務(wù)到達(dá)時(shí),可以直接使用已有空閑的線程,不需要的等到線程創(chuàng)建就能立即執(zhí)行。
- 提高線程的可管理性。線程是稀缺資源,如果無(wú)限制的創(chuàng)建,不僅會(huì)消耗系統(tǒng)資源,還會(huì)降低系統(tǒng)的穩(wěn)定性。而如果我們使用線程池,可以對(duì)線程進(jìn)行統(tǒng)一的分配、管理和監(jiān)控。
2 線程池原理
先看看線程池的構(gòu)造器:
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize:核心線程數(shù),線程池維護(hù)的最少線程數(shù)。
- maximumPoolSize:最大線程數(shù),線程池允許創(chuàng)建的最大線程數(shù)。
- keepAliveTime:線程存活時(shí)間,當(dāng)線程數(shù)超過(guò)核心線程數(shù)時(shí),多余的空閑線程的存活時(shí)間。
- unit:時(shí)間單位。
- workQueue:任務(wù)隊(duì)列,用于保存等待執(zhí)行的任務(wù)。
- threadFactory:線程工廠,用于創(chuàng)建新線程。
- handler:拒絕策略,當(dāng)任務(wù)無(wú)法執(zhí)行時(shí)的處理策略。
線程池的核心流程圖如下:
圖片
線程池的工作過(guò)程如下:
- 線程池初始化:根據(jù)corePoolSize初始化核心線程。
- 任務(wù)提交:當(dāng)任務(wù)提交到線程池時(shí),根據(jù)當(dāng)前線程數(shù)判斷:
- 若當(dāng)前線程數(shù)小于corePoolSize,創(chuàng)建新的線程執(zhí)行任務(wù)。
- 若當(dāng)前線程數(shù)大于或等于corePoolSize,任務(wù)被加入workQueue隊(duì)列。
- 任務(wù)處理:當(dāng)有空閑線程時(shí),從workQueue中取出任務(wù)執(zhí)行。
- 線程擴(kuò)展:若隊(duì)列已滿且當(dāng)前線程數(shù)小于maximumPoolSize,創(chuàng)建新的線程處理任務(wù)。
- 線程回收:當(dāng)線程空閑時(shí)間超過(guò)keepAliveTime,多余的線程會(huì)被回收,直到線程數(shù)不超過(guò)corePoolSize。
- 拒絕策略:若隊(duì)列已滿且當(dāng)前線程數(shù)達(dá)到maximumPoolSize,則根據(jù)拒絕策略處理新任務(wù)。
說(shuō)白了在線程池中,多余的任務(wù)會(huì)被放到workQueue任務(wù)隊(duì)列中。
這個(gè)任務(wù)隊(duì)列的數(shù)據(jù)保存在內(nèi)存中。
這樣就會(huì)出現(xiàn)一些問(wèn)題。
接下來(lái),看看線程池有哪些問(wèn)題。
3 線程池有哪些問(wèn)題?
在JDK中為了方便大家創(chuàng)建線程池,專門(mén)提供了Executors這個(gè)工具類。
3.1 隊(duì)列過(guò)大
Executors.newFixedThreadPool,它可以創(chuàng)建固定線程數(shù)量的線程池,任務(wù)隊(duì)列使用的是LinkedBlockingQueue,默認(rèn)最大容量是Integer.MAX_VALUE。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads,
nThreads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
如果向newFixedThreadPool線程池中提交的任務(wù)太多,可能會(huì)導(dǎo)致LinkedBlockingQueue非常大,從而出現(xiàn)OOM問(wèn)題。
3.2 線程太多
Executors.newCachedThreadPool,它可以創(chuàng)建可緩沖的線程池,最大線程數(shù)量是Integer.MAX_VALUE,任務(wù)隊(duì)列使用的是SynchronousQueue。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
如果向newCachedThreadPool線程池中提交的任務(wù)太多,可能會(huì)導(dǎo)致創(chuàng)建大量的線程,也會(huì)出現(xiàn)OOM問(wèn)題。
3.3 數(shù)據(jù)丟失
如果線程池在執(zhí)行過(guò)程中,服務(wù)突然被重啟了,可能會(huì)導(dǎo)致線程池中的數(shù)據(jù)丟失。
上面的OOM問(wèn)題,我們?cè)谌粘i_(kāi)發(fā)中,可以通過(guò)自定義線程池的方式解決。
比如創(chuàng)建這樣的線程池:
new ThreadPoolExecutor(8,
10,
30L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(300),
threadFactory);
自定義了一個(gè)最大線程數(shù)量和任務(wù)隊(duì)列都在可控范圍內(nèi)線程池。
這樣做基本上不會(huì)出現(xiàn)OOM問(wèn)題。
但線程池的數(shù)據(jù)丟失問(wèn)題,光靠自身的功能很難解決。
4 如何保證數(shù)據(jù)不丟失?
線程池中的數(shù)據(jù),是保存到內(nèi)存中的,一旦遇到服務(wù)器重啟了,數(shù)據(jù)就會(huì)丟失。
之前的系統(tǒng)流程是這樣的:
圖片
用戶請(qǐng)求過(guò)來(lái)之后,先處理業(yè)務(wù)邏輯1,它是系統(tǒng)的核心功能。
然后再將任務(wù)提交到線程池,由它處理業(yè)務(wù)邏輯2,它是系統(tǒng)的非核心功能。
但如果線程池在處理的過(guò)程中,服務(wù)down機(jī)了,此時(shí),業(yè)務(wù)邏輯2的數(shù)據(jù)就會(huì)丟失。
那么,如何保證數(shù)據(jù)不丟失呢?
答:需要提前做持久化。
我們優(yōu)化的系統(tǒng)流程如下:
圖片
用戶請(qǐng)求過(guò)來(lái)之后,先處理業(yè)務(wù)邏輯1,緊接著向DB中寫(xiě)入一條任務(wù)數(shù)據(jù),狀態(tài)是:待執(zhí)行。
處理業(yè)務(wù)邏輯1和向DB寫(xiě)任務(wù)數(shù)據(jù),可以在同一個(gè)事務(wù)中,方便出現(xiàn)異常時(shí)回滾。
然后有一個(gè)專門(mén)的定時(shí)任務(wù),每個(gè)一段時(shí)間,按添加時(shí)間升序,分頁(yè)查詢狀態(tài)是待執(zhí)行的任務(wù)。
最早的任務(wù),最先被查出來(lái)。
然后將查出的任務(wù)提交到線程池中,由它處理業(yè)務(wù)邏輯2。
處理成功之后,修改任務(wù)的待執(zhí)行狀態(tài)為:已執(zhí)行。
需要注意的是:業(yè)務(wù)邏輯2的處理過(guò)程,要做冪等性設(shè)計(jì),同一個(gè)請(qǐng)求允許被執(zhí)行多次,其結(jié)果不會(huì)有影響。
如果此時(shí),線程池在處理的過(guò)程中,服務(wù)down機(jī)了,業(yè)務(wù)邏輯2的數(shù)據(jù)會(huì)丟失。
但此時(shí)DB中保存了任務(wù)的數(shù)據(jù),并且丟失那些任務(wù)的狀態(tài)還是:待執(zhí)行。
在下一次定時(shí)任務(wù)周期開(kāi)始執(zhí)行時(shí),又會(huì)將那些任務(wù)數(shù)據(jù)重新查詢出來(lái),重新提交到線程池中。
業(yè)務(wù)邏輯2丟失的數(shù)據(jù),又自動(dòng)回來(lái)了。
如果要考慮失敗的情況,還需要在任務(wù)表中增加一個(gè)失敗次數(shù)字段。
在定時(shí)任務(wù)的線程池中執(zhí)行業(yè)務(wù)邏輯2失敗了,在下定時(shí)任務(wù)執(zhí)行時(shí)可以自動(dòng)重試。
但不可能無(wú)限制的一直重試下去。
當(dāng)失敗超過(guò)了一定的次數(shù),可以將任務(wù)狀態(tài)改成:失敗。
這樣后續(xù)可以人工處理。