并發編程線程池限流的哲學
收到不少讀者反饋,其中有這幾道關于線程池的問題:
- 在進行線程池設計時,如何選擇拒絕策略?
- 如果不允許丟棄任務任務,應該選擇哪個拒絕策略?
- 使用CallerRunsPolicy這個拒絕策略有什么風險?有沒有更好的處理方式呢?
一、詳解拒絕策略常見問題
1. 線程池是如何工作的
我們先來復習一下線程池的工作流程,每次任務提交時,線程池都會嘗試將任務提交到核心線程上,如果線程數小于核心線程數,線程池就會添加工作線程并執行當前任務。 若核心線程都處于工作狀態,這就表明當前線程池有些忙碌,那么這些無法及時處理的任務就會提交到阻塞任務隊列中。 隨著任務的遞增,任務隊列無法容納最新的任務,線程池就會認為現處于高峰期,便臨時增加應急線程處理任務。隨著任務逐步處理完成,線程在指定時間內沒有要處理的任務,這些線程也就會依次退出。
圖片
對應我們也給出ThreadPoolExecutor提交任務的execute方法的源碼:
public void execute(Runnable command) {
//任務判空
if (command == null)
throw new NullPointerException();
//查看當前運行的線程數量
int c = ctl.get();
//若小于核心線程則直接添加一個工作線程并執行任務
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果線程數等于核心線程數則嘗試將任務入隊
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//入隊失敗,調用addWorker參數為false,嘗試創建應急線程處理突發任務
else if (!addWorker(command, false))
//如果創建應急線程失敗,說明當前線程數已經大于最大線程數,這個任務只能拒絕了
reject(command);
}
上文提到了應急線程長時間沒有要處理的任務就會被銷毀的邏輯,這里我們也簡單的介紹一下,首先在線程池中每一個線程是以Worker的形式封裝呈現,其本質就是對Thread的封裝,Worker啟動后會調用run方法調用runWorker方法輪詢處理任務:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
final Thread thread;
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
//......
}
查看runWorker方法,一旦在規定時間內getTask沒有拿到任務就會退出循環,直接通過processWorkerExit結束這個工作線程:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//對應時間內沒有拿到task則退出循環
while (task != null || (task = getTask()) != null) {
//略
}
completedAbruptly = false;
} finally {
//結束這個工作線程
processWorkerExit(w, completedAbruptly);
}
}
自此,我們將線程池整體工作流程簡單的梳理完畢。
2. 拒絕策略的選擇
先來說說第一道題,關于拒絕策略的選擇,我們不妨直接查看RejectedExecutionHandler 子類的源碼進行說明。 先來看看CallerRunsPolicy ,該拒絕策略會直接用當前調用者執行當前任務:
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
//直接基于當前線程調用run方法執行任務
r.run();
}
}
}
然后就是AbortPolicy,也很簡單,直接拋異常:
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
DiscardPolicy 則是什么也不做,這也就意味著這個任務沒有任務處理,等同于丟棄:
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
//不做任何事情任務直接丟棄
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
最后一個就是DiscardOldestPolicy ,該策略會將隊首部任務丟棄,然后嘗試將再次execute這個任務:
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
//丟掉隊首的任務,然后往線程池提交當前任務
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
不同拒絕策略都有著不同的使用場景:
- 如果我們的任務不算耗時還要保證能夠被執行,那么CallerRunsPolicy則是第一選擇。
- 若突增大量任務導致無法及時處理從業務的角度認為是異常的話,那么我們則建議拋出AbortPolicy讓開發介入及時調優處理,前提是當前業務正處于業務提測階段。
- 對于那些需要提交實時性消息的監控型任務,那么新提交的任務勢必實時性會由于更早的任務,這種場景使用DiscardOldestPolicy 即可。
- 如果這些任務相較于系統可靠性來說,如果不是很重要,那么直接采用rejectedExecution丟棄任務即可。
3. 主流框架對于拒絕策略的選擇
只要繼承RejectedExecutionHandler 就可以實現相應的拒絕策略,所以我們也不妨看看一些主流的框架是如何使用拒絕策略的吧。
tomcat線程池的拒絕策略也是拋出異常:
private static class RejectHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r,
java.util.concurrent.ThreadPoolExecutor executor) {
throw new RejectedExecutionException();
}
}
而Dubbo則相對友好一些,它會優先打印一個日志,并告知異常堆棧信息,然后拋出異常:
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
//......
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("Thread pool is EXHAUSTED!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
url.getProtocol(), url.getIp(), url.getPort());
logger.warn(msg);
dumpJStack();
throw new RejectedExecutionException(msg);
}
private void dumpJStack() {
//省略實現
}
}
Netty就相對穩健一些,它的拒絕策略則是直接創建一個線程池以外的線程處理這些任務,為了保證任務的實時處理,這種做法可能需要良好的硬件設備且臨時創建的線程無法做到準確的監控:
private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
NewThreadRunsPolicy() {
super();
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
final Thread t = new Thread(r, "Temporary task executor");
t.start();
} catch (Throwable e) {
throw new RejectedExecutionException(
"Failed to start a new thread", e);
}
}
}
ActiveMq則是嘗試在指定的時效內盡可能的爭取將任務入隊,以保證最大交付:
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
try {
executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
}
throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
}
});
4. CallerRunsPolicy存在的問題及解決對策
默認情況下,我們都會為了保證任務不被丟棄都優先考慮CallerRunsPolicy,這也是相對維穩的做法,這種做法的隱患是假設走到CallerRunsPolicy的任務是個非常耗時的任務,就會導致主線程就很卡死。
下面就是筆者通過主線程使用線程池的方法,該線程池限定了最大線程數為2還有阻塞隊列大小為1,這意味著第4個任務就會走到拒絕策略:
//創建線程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,
2,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
new ThreadPoolExecutor.CallerRunsPolicy());
threadPoolExecutor.execute(() -> {
log.info("核心線程執行");
ThreadUtil.sleep(1, TimeUnit.DAYS);
});
threadPoolExecutor.execute(() -> {
log.info("任務入隊");
ThreadUtil.sleep(1, TimeUnit.DAYS);
});
threadPoolExecutor.execute(() -> {
log.info("應急線程處理");
ThreadUtil.sleep(1, TimeUnit.DAYS);
});
threadPoolExecutor.execute(() -> {
log.info("CallerRunsPolicy task");
ThreadUtil.sleep(1, TimeUnit.DAYS);
});
threadPoolExecutor.execute(() -> {
log.info("因為主線程卡住,無法被處理的任務");
});
從輸出結果可以看出,因為CallerRunsPolicy這個拒絕策略,導致耗時的任務用了主線程執行,導致線程池阻塞,進而導致后續任務無法及時執行,嚴重的情況下很可能導致OOM:
2024-04-03 00:08:12.617 INFO 20804 --- [ main] com.sharkChili.ThreadPoolApplication : 啟動成功!!
2024-04-03 00:08:15.739 INFO 20804 --- [pool-1-thread-1] com.sharkChili.ThreadPoolApplication : 核心線程執行
2024-04-03 00:08:36.768 INFO 20804 --- [pool-1-thread-2] com.sharkChili.ThreadPoolApplication : 應急線程處理
2024-04-03 00:08:49.333 INFO 20804 --- [ main] com.sharkChili.ThreadPoolApplication : CallerRunsPolicy task
我們從問題的本質入手,調用者采用CallerRunsPolicy是希望所有的任務都能夠被執行,按照筆者的經驗,假如我們的場景是偶發這種突發場景,在內存允許的情況下,我們建議增加阻塞隊列BlockingQueue的大小并調整堆內存以容納更多的任務,確保任務能夠被準確執行。
若當前服務器內存資源緊張,但我們配置線程池還為盡可能利用到CPU,我們建議調整線程中maximumPoolSize以保證盡可能壓榨CPU資源:
如果服務器資源以達到可利用的極限,這就意味我們要在設計策略上改變線程池的調度了,我們都知道,導致主線程卡死的本質就是因為我們不希望任何一個任務被丟棄。換個思路,有沒有辦法既能保證任務不被丟棄且在服務器有余力時及時處理呢?
這里筆者提供的一種思路,即任務持久化,注意這里筆者更多強調的是思路而不是實現,這里所謂的任務持久化,包括但不限于:
- 設計一張任務表間任務存儲到MySQL數據庫中。
- Redis緩存任務。
- 將任務提交到消息隊列中。
筆者以方案一為例,通過繼承BlockingQueue實現一個混合式阻塞隊列,該隊列包含JDK自帶的ArrayBlockingQueue和一個自定義的隊列(數據表),通過魔改隊列的添加邏輯達到任務可以存入ArrayBlockingQueue或者數據表的目的。
如此一來,一旦我們的線程池中線程以達到滿載時,我們就可以通過拒絕策略將最新任務持久化到MySQL數據庫中,等到線程池有了有余力處理所有任務時,讓其優先處理數據庫中的任務以避免"饑餓"問題。
這里筆者也給出混合隊列實現的核心源碼,即通過繼承BlockingQueue魔改了入隊和出隊的邏輯:
public class HybridBlockingQueue<E> implements BlockingQueue<E> {
private Object mysqlLock = new Object();
private ArrayBlockingQueue<E> arrayBlockingQueue;
//構造方法初始化阻塞隊列大小
public HybridBlockingQueue(int maxSize) {
arrayBlockingQueue = new ArrayBlockingQueue<>(maxSize);
}
/**
* 線程池會調用的入隊方法
* @param e
* @return
*/
@Override
public boolean offer(E e) {
return arrayBlockingQueue.offer(e);
}
/**
* 取任務時,優先從數據庫中讀取最早的任務
*
* @return
* @throws InterruptedException
*/
@Override
public E take() throws InterruptedException {
synchronized (mysqlLock) {
//從數據庫中讀取任務,通過上鎖讀取避免重復消費
TaskInfoMapper taskMapper = SpringUtil.getBean(TaskInfoMapper.class);
TaskInfo taskInfo = taskMapper.selectByExample(null).stream()
.findFirst()
.orElse(null);
//若數據庫存在該任務,則先刪后返回
if (ObjUtil.isNotEmpty(taskInfo)) {
taskMapper.deleteByPrimaryKey(taskInfo.getId());
Task task = new Task(taskInfo.getData());
return (E) task;
}
}
//若數據庫沒有要處理的任務則從內存中獲取
return arrayBlockingQueue.poll();
}
/**
* 帶有時間限制的任務獲取
*
* @param timeout
* @param unit
* @return
* @throws InterruptedException
*/
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
//從數據庫中讀取任務,通過上鎖讀取避免重復消費
synchronized (mysqlLock) {
//從數據庫中讀取任務,
TaskInfoMapper taskMapper = SpringUtil.getBean(TaskInfoMapper.class);
TaskInfo taskInfo = taskMapper.selectByExample(null).stream()
.findFirst()
.orElse(null);
//若數據庫存在該任務,則先刪后返回
if (ObjUtil.isNotEmpty(taskInfo)) {
taskMapper.deleteByPrimaryKey(taskInfo.getId());
Task task = new Task(taskInfo.getData());
return (E) task;
}
}
//若數據庫沒有要處理的任務則從內存中獲取
return arrayBlockingQueue.poll(timeout, unit);
}
//......
}
接下來就是自定義拒絕策略了,很明顯我們的拒絕策略就叫持久化策略:
public class PersistentTaskPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//任務入庫
TaskInfoMapper taskMapper = SpringUtil.getBean(TaskInfoMapper.class);
Task task = (Task) r;
TaskInfo taskInfo = new TaskInfo();
taskInfo.setData(JSONUtil.toJsonStr(task.getTaskInfo()));
taskMapper.insertSelective(taskInfo);
}
}
最終我們的使用示例如下:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,
2,
60, TimeUnit.SECONDS,
new HybridBlockingQueue<>(1),
new PersistentTaskPolicy());
threadPoolExecutor.execute(new Task("core thread"));
threadPoolExecutor.execute(new Task("queueTask"));
threadPoolExecutor.execute(new Task("max thread"));
threadPoolExecutor.execute(new Task("insert into mysql database"));
最終我們的insert into mysql database因為線程池無法及時處理而走了我們自定義的拒絕策略而持久化入庫,等待線程池中其他任務完成后被取出執行:
2024-04-14 11:30:16.865 INFO 1052 --- [ main] com.sharkChili.PersistentTaskPolicy : 任務持久化,taskInfo:{"data":"insert into mysql database"}
2024-04-14 11:31:08.516 INFO 1052 --- [pool-1-thread-2] com.sharkChili.Task : task execution completed,task info:max thread
2024-04-14 11:31:08.516 INFO 1052 --- [pool-1-thread-1] com.sharkChili.Task : task execution completed,task info:core thread
2024-04-14 11:32:08.563 INFO 1052 --- [pool-1-thread-1] com.sharkChili.Task : task execution completed,task info:queueTask
2024-04-14 11:32:08.563 INFO 1052 --- [pool-1-thread-2] com.sharkChili.Task : task execution completed,task info:insert into mysql database
二、更高維度的思考——線程池限流的藝術
上文我們以大篇幅的維度探討拒絕策略上優化,需要保證準確、有效執行的任務能夠被線程池處理,且不會破壞程序的穩定性,即提交的任務能夠被正確處理且線程池不會被打死。 這一點,結合《Java并發編程實戰》的說法,我們也可以結合信號量Semaphore作為令牌,只有拿到令牌的線程才能將任務提交到線程池,保證線程池可以在單位時間內按照我們設定的并發數執行任務:
通過利用信號量完成線程池的限流,保證任務可被執行和工作線程池的穩定性,即將性能瓶頸和程序穩定性穩定拋給更高層級提交任務的線程,尤其根據需要決定當前任務是等待被線程池處理,還是直接中斷結束。
對應的我們給出流控性質的線程池代碼示例,讀者可參考筆者所說的思路和注釋了解一下落地思路:
public class RateLimitedExecutor {
private final ExecutorService threadPool;
private final Semaphore semaphore;
//基于bound創建對應并發度的線程池和流控令牌
public RateLimitedExecutor(int bound) {
this.threadPool = Executors.newFixedThreadPool(bound);
this.semaphore = new Semaphore(bound, true);
}
public void submitTask(final Runnable command) throws InterruptedException {
semaphore.acquire();
Console.log("{}獲取令牌成功,執行時間:{}", Thread.currentThread().getName(), DateUtil.now());
try {
threadPool.execute(() -> {
try {
//執行任務
command.run();
} finally {
//線程執行完成后釋放令牌
semaphore.release();
}
});
} catch (RejectedExecutionException e) {//異常兜底
semaphore.release();
}
}
}
對應的我們也給出是使用示例,可以看到我們創建了流控為5的線程池,并創建10個并發線程執行提交操作:
public static void main(String[] args) {
RateLimitedExecutor executor = new RateLimitedExecutor(5);
for (int i = 0; i < 10; i++) {
new Thread(new Task("任務" + i, executor)).start();
}
}
private static class Task implements Runnable {
private final String threadName;
private final RateLimitedExecutor executor;
public Task(String threadName, RateLimitedExecutor executor) {
this.threadName = threadName;
this.executor = executor;
}
@SneakyThrows
@Override
public void run() {
executor.submitTask(() -> {
ThreadUtil.sleep(5000);
Console.log("{}執行任務完成", threadName);
});
}
}
輸出結果如下,可以看到流控符合預期為5,同時我們也將程序穩定性和性能瓶頸等各方面的壓力轉移給上層調用者,避免了非必要的拒絕策略處理,讓線程池專注于并發度的優化:
Thread-1獲取令牌成功,執行時間:2025-07-02 09:47:04
Thread-9獲取令牌成功,執行時間:2025-07-02 09:47:04
Thread-5獲取令牌成功,執行時間:2025-07-02 09:47:04
Thread-8獲取令牌成功,執行時間:2025-07-02 09:47:04
Thread-3獲取令牌成功,執行時間:2025-07-02 09:47:04
任務5執行任務完成
任務9執行任務完成
任務3執行任務完成
任務1執行任務完成
任務8執行任務完成
Thread-6獲取令牌成功,執行時間:2025-07-02 09:47:10
Thread-0獲取令牌成功,執行時間:2025-07-02 09:47:10
Thread-7獲取令牌成功,執行時間:2025-07-02 09:47:10
Thread-2獲取令牌成功,執行時間:2025-07-02 09:47:10
Thread-4獲取令牌成功,執行時間:2025-07-02 09:47:10
任務0執行任務完成
任務4執行任務完成
任務6執行任務完成
任務7執行任務完成
任務2執行任務完成
三、小結
針對線程池拒絕策略的設計和使用更多是考察讀者對于線程池源碼的理解和使用經驗,這里筆者僅在思路上給出示例,當然實現上也存在很多不完美的地方,例如:
- 如何保證持久化任務被可靠消費。
- 如何保證數據庫和內存中任務的公平調度。
- 持久化任務是先刪后返回還是先返回處理完成后刪除如何決定?
文章結束,希望對你有幫助。