詳解 Java 并發(fā)流程控制工具
本文將針對(duì)JUC包下幾個(gè)常見(jiàn)的工具類進(jìn)行深入剖析和演示,通過(guò)針對(duì)本文的閱讀,讀者將會(huì)對(duì)JUC包下的工具有一個(gè)全面的了解和運(yùn)用。
一、CountDownLatch(倒計(jì)時(shí)門閂)
1. CountDownLatch簡(jiǎn)介
在并發(fā)編程的禪意中,CountDownLatch本質(zhì)上就是一種閉鎖,而閉鎖的語(yǔ)義則是等待所有其他活動(dòng)都完成了,才會(huì)繼續(xù)執(zhí)行后續(xù)的操作。
筆者一般稱CountDownLatch為倒計(jì)時(shí)門閂,它主要用于需要某些條件下才能喚醒的需求場(chǎng)景,例如我們線程1必須等到線程2做完某些事,那么就可以設(shè)置一個(gè)CountDownLatch并將數(shù)值設(shè)置為1,一旦線程2完成業(yè)務(wù)邏輯后,將數(shù)值修改為0,此時(shí)線程1就會(huì)被喚醒:
2. 基于CountDownLatch實(shí)現(xiàn)等待多線程就緒
通過(guò)上述的描述可能有點(diǎn)抽象,我們直接通過(guò)幾個(gè)例子演示一下,我們現(xiàn)在有這樣一個(gè)需求,希望等待5個(gè)線程完成之后,打印輸出一句工作完成:
對(duì)應(yīng)的代碼示例如下,可以看到我們創(chuàng)建了數(shù)值為5的CountDownLatch ,一旦線程池里的線程完成工作后就調(diào)用countDown進(jìn)行扣減,一旦數(shù)值變?yōu)?,主線程await就會(huì)放行,執(zhí)行后續(xù)輸出:
int workerSize = 5;
CountDownLatch workCount = new CountDownLatch(workerSize);
ExecutorService threadPool = Executors.newFixedThreadPool(workerSize);
for (int i = 0; i < workerSize; i++) {
final int workerNum = i;
//5個(gè)工人輸出完成工作后,扣減倒計(jì)時(shí)門閂數(shù)
threadPool.submit(() -> {
log.info("worker[{}]完成手頭的工作", workerNum);
workCount.countDown();
});
}
try {
//阻塞當(dāng)前線程(主線程)往后走,只有倒計(jì)時(shí)門閂變?yōu)?之后才能繼續(xù)后續(xù)邏輯
log.info("等待worker工作完成");
workCount.await();
} catch (InterruptedException e) {
log.info("倒計(jì)時(shí)門閂阻塞失敗,失敗原因[{}]", e.getMessage(), e);
}
threadPool.shutdown();
while (!threadPool.isTerminated()) {
}
log.info("所有工人都完成手頭的工作了");
對(duì)應(yīng)的我們也給出輸出結(jié)果,可以看到主線程在線程池線程完成后才輸出:
3. 基于CountDownLatch實(shí)現(xiàn)運(yùn)動(dòng)員賽跑
實(shí)際上CountDownLatch可以讓多個(gè)線程進(jìn)行等待,我們不妨用線程模擬一下所有運(yùn)動(dòng)員就緒后,等待槍響后起跑的場(chǎng)景:
代碼如下,每當(dāng)運(yùn)動(dòng)員即線程池的線程準(zhǔn)備就緒,則調(diào)用await等待槍響,一旦所有運(yùn)動(dòng)員就緒之后,主線程調(diào)用countDown模擬槍響,然后運(yùn)動(dòng)員起跑:
public static void main(String[] args) {
log.info("百米跑比賽開(kāi)始");
int playerNum = 3;
CountDownLatch gun = new CountDownLatch(1);
ExecutorService threadPool = Executors.newFixedThreadPool(playerNum);
for (int i = 0; i < playerNum; i++) {
final int playNo = i;
threadPool.submit(() -> {
log.info("[{}]號(hào)運(yùn)動(dòng)員已就緒", playNo);
try {
gun.await();
} catch (InterruptedException e) {
log.info("[{}]號(hào)運(yùn)動(dòng)員線程阻塞失敗,失敗原因[{}]", playNo, e.getMessage(), e);
}
log.info("[{}]號(hào)運(yùn)動(dòng)員已經(jīng)到達(dá)重點(diǎn)", playNo);
});
}
//按下槍 所有運(yùn)動(dòng)員起跑
gun.countDown();
threadPool.shutdown();
while (!threadPool.isTerminated()) {
}
log.info("百米賽跑已結(jié)束");
}
對(duì)應(yīng)的我們也給出相應(yīng)的輸出結(jié)果:
4. 從源碼角度分析CountDownLatch工作流程
我們以等待所有工人完成工作的例子進(jìn)行解析,實(shí)際上在CountDownLatch是通過(guò)state和一個(gè)抽象隊(duì)列即aqs完成多線程之間的流程調(diào)度,主線程調(diào)用await方法等待其他worker線程,如果其它worker線程沒(méi)有完成工作,那么CountDownLatch就會(huì)將其存入抽象隊(duì)列中。
一旦其他線程將state設(shè)置為0時(shí),await對(duì)應(yīng)的線程就會(huì)從抽象隊(duì)列中釋放并喚醒:
對(duì)應(yīng)我們給出countDown的實(shí)現(xiàn),可以看到該方法底層就是將aqs隊(duì)列中的state進(jìn)行扣減:
public void countDown() {
sync.releaseShared(1);
}
//releaseShared內(nèi)部核心邏輯就是將state扣減1
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
//扣減state并通過(guò)cas修改賦值
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
而countDown本質(zhì)上就是查看這個(gè)state,如果state被扣減為0,則調(diào)用aqs底層doReleaseShared方法將隊(duì)列中等待線程喚醒:
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
//查看是否扣減為0
if (tryReleaseShared(arg)) {
//如果是0則將當(dāng)前等待線程喚醒
doReleaseShared();
return true;
}
return false;
}
上文講解countDown涉及一些關(guān)于AQS的實(shí)用理解和設(shè)計(jì),關(guān)于更多AQS的知識(shí)點(diǎn),感興趣的讀者可以閱讀一下筆者的這篇文章:《AQS 源碼解析:原理與實(shí)踐》。
二、Semaphore(信號(hào)量)
1. 詳解Semaphore
信號(hào)量多用于限流的場(chǎng)景,例如我們希望單位時(shí)間內(nèi)只能有一個(gè)線程工作,我們就可以使用信號(hào)量,只有拿到線程的信號(hào)量才能工作,工作完成后釋放信號(hào)量,其余線程才能爭(zhēng)搶這個(gè)信號(hào)量并進(jìn)行進(jìn)一步的操作。
對(duì)應(yīng)我們給出下面這段代碼,可以看到筆者聲明信號(hào)量數(shù)值為6,每當(dāng)線程拿到3個(gè)信號(hào)量之后就會(huì)執(zhí)行業(yè)務(wù)操作,完成后調(diào)用release釋放3個(gè)令牌,讓其他線程繼續(xù)爭(zhēng)搶:
//設(shè)置可復(fù)用的信號(hào)量,令牌數(shù)為3
Semaphore semaphore = new Semaphore(6, true);
//創(chuàng)建5個(gè)線程
int workSize = 5;
ExecutorService executorService = Executors.newFixedThreadPool(workSize);
for (int i = 0; i < workSize; i++) {
executorService.submit(() -> {
try {
//拿3個(gè)令牌
semaphore.acquire(3);
log.info("進(jìn)行業(yè)務(wù)邏輯處理.......");
ThreadUtil.sleep(1000);
//釋放3個(gè)令牌
semaphore.release(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
while (!executorService.isTerminated()) {
}
對(duì)應(yīng)輸出結(jié)果如下,可以看到每個(gè)線程拿到令牌后都會(huì)休眠1秒,從輸出結(jié)果來(lái)看每秒只有兩個(gè)線程才工作,符合我們的限流需求:
2. 詳解Semaphore工作原理
Semaphore底層也是用到的aqs隊(duì)列,線程進(jìn)行資源獲取時(shí)也是通過(guò)查看state是否足夠,在明確足夠的情況下進(jìn)行state扣減,然后進(jìn)行工作。如果線程發(fā)現(xiàn)state數(shù)量不夠,那么就會(huì)被Semaphore存入aqs底層的抽象隊(duì)列中,直到state數(shù)量足夠后被喚醒:
對(duì)此我們給出Semaphore底層的acquire的邏輯可以看到,它會(huì)讀取state數(shù)值然后進(jìn)行扣減,如果剩余數(shù)量大于0則說(shuō)明令牌獲取成功線程可以執(zhí)行后續(xù)邏輯,反之說(shuō)明當(dāng)前令牌數(shù)不夠,外部邏輯會(huì)將該線程掛到等待隊(duì)列中,等待令牌足夠后將其喚醒:
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
//讀取可用的state
int available = getState();
//計(jì)算剩余的state
int remaining = available - acquires;
//如果小于0說(shuō)明令牌數(shù)不足直接返回出去,讓外部將線程掛起,反之通過(guò)cas修改剩余數(shù),返回大于0的結(jié)果讓持有令牌的線程執(zhí)行后續(xù)邏輯
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
3. 基于Semaphore實(shí)現(xiàn)一個(gè)有界容器
利用Semaphore信號(hào)量并發(fā)獲取且資源循環(huán)可復(fù)用的特性,我們可以通過(guò)實(shí)例封閉技術(shù)落地一個(gè)有界的容器,如下代碼所示,只有得到信號(hào)量且添加成功了信號(hào)量才會(huì)成功扣減,如果沒(méi)有拿到信號(hào)量就阻塞無(wú)法添加,除非其他線程釋放自己的資源。
如下圖,筆者利用信號(hào)量實(shí)現(xiàn)一個(gè)列表容器的限流設(shè)置,可以看到當(dāng)前容器還剩一個(gè)空間,所以信號(hào)量數(shù)也是1,當(dāng)線程0獲得信號(hào)量成功后將元素24添加至容器中。隨后的線程1看到信號(hào)量為0,即知曉容器沒(méi)有可用空間就會(huì)被阻塞等待:
一旦線程1刪除一個(gè)元素成功后,就會(huì)歸還一個(gè)令牌,此時(shí)線程1就會(huì)被信號(hào)量喚醒,嘗試獲取令牌并添加元素,這就是我們有界容器實(shí)現(xiàn)的核心思路:
對(duì)應(yīng)的我們給出有界容器的落地代碼示例:
public class BoundedList<E> {
private final List<E> list;
private final Semaphore semaphore;
/*
初始化一個(gè)并發(fā)的有界容器
*/
public BoundedList(int bound) {
this.list = Collections.synchronizedList(new ArrayList<>());
this.semaphore = new Semaphore(bound);
}
public boolean add(E element) {
boolean wasAdded = false;
try {
//獲取令牌,成功后才會(huì)添加容器
semaphore.acquire();
wasAdded = list.add(element);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
//添加失敗則釋放令牌,讓其他線程可以嘗試到該有界容器中添加
if (!wasAdded)
semaphore.release();
return wasAdded;
}
}
public void remove(E element) {
boolean remove = false;
try {
remove = list.remove(element);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
//只有明確元素移除成功,才會(huì)釋放令牌
if (remove)
semaphore.release();
}
}
@Override
public String toString() {
return JSONUtil.toJsonStr(list);
}
}
對(duì)應(yīng)測(cè)試代碼如下,大體思路為:
- 嘗試讓線程0填滿容器使線程1阻塞
- 隨后線程0移除一個(gè)元素
- 線程1被喚醒,并成功獲取令牌,將元素5成功添加
BoundedList<Integer> list = new BoundedList<>(5);
CountDownLatch countDownLatch = new CountDownLatch(2);
new Thread(() -> {
//添加5個(gè)元素填滿容器
Console.log("線程1添加5個(gè)元素");
for (int i = 0; i < 5; i++) {
list.add(i);
}
ThreadUtil.sleep(5000);
//移除元素2,讓線程2添加元素5成功
Console.log("線程1移除元素2");
list.remove(2);
countDownLatch.countDown();
}).start();
new Thread(() -> {
ThreadUtil.sleep(1000);
Console.log("線程2添加元素5");
list.add(5);
Console.log("線程2添加元素5成功");
countDownLatch.countDown();
}).start();
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
Console.log("線程1和線程2執(zhí)行完畢,有界容器元素:{}", list);
輸出結(jié)果如下,符合我們對(duì)有界容器預(yù)期:
線程1添加5個(gè)元素
線程2添加元素5
線程1移除元素2
線程2添加元素5成功
線程1和線程2執(zhí)行完畢,有界容器元素:[0,1,3,4,5]
4. Semaphore使用注意事項(xiàng)
- 獲取和釋放的時(shí)候都可以指定數(shù)量,但是要保持一致。
- 公平性設(shè)置為true會(huì)更加合理
- 并不必須由獲取許可證的線程釋放許可證。可以是A獲取,B釋放。
三、Condition
1. 詳解Condition
Condition即條件對(duì)象,不是很常用或者直接用到的對(duì)象,常用于線程等待喚醒操作,例如A線程需要等待某個(gè)條件的時(shí)候,我們可以通過(guò)condition.await()方法,A線程就會(huì)進(jìn)入阻塞狀態(tài)。
線程B執(zhí)行condition.signal()方法,則JVM就會(huì)從被阻塞線程中找到等待該condition的線程。線程A收到可執(zhí)行信號(hào)的時(shí)候,他的線程狀態(tài)就會(huì)變成Runnable可執(zhí)行狀態(tài)。
對(duì)此我們給出代碼示例,可以看到我們從ReentrantLock 中拿到一個(gè)Condition 對(duì)象,讓創(chuàng)建的線程進(jìn)入等待狀態(tài),隨后讓主線程調(diào)用condition 的signal將其喚醒:
private ReentrantLock lock = new ReentrantLock();
//條件對(duì)象,操控線程的等待和通知
private Condition condition = lock.newCondition();
public void waitCondition() throws InterruptedException {
lock.lock();
try {
log.info("等待達(dá)到條件后通知");
condition.await();
log.info("收到通知,開(kāi)始執(zhí)行業(yè)務(wù)邏輯");
} finally {
lock.unlock();
log.info("執(zhí)行完成,釋放鎖");
}
}
public void notifyCondition() throws InterruptedException {
lock.lock();
try {
log.info("達(dá)到條件發(fā)起通知");
condition.signal();
log.info("發(fā)起通知結(jié)束");
} finally {
lock.unlock();
log.info("發(fā)起通知執(zhí)行完成,釋放鎖");
}
}
public static void main(String[] args) throws InterruptedException {
Main obj = new Main();
new Thread(() -> {
try {
obj.waitCondition();
//讓出CPU時(shí)間片,交給主線程發(fā)起通知
Thread.sleep(3000);
} catch (InterruptedException e) {
log.error("等待條件通知設(shè)置失敗,失敗原因 [{}]", e.getMessage(), e);
}
}).start();
//休眠3s喚醒等待線程
Thread.sleep(3000);
obj.notifyCondition();
}
對(duì)應(yīng)的我們也給出輸出結(jié)果:
2. 基于條件對(duì)象完成生產(chǎn)者、消費(fèi)者模式
我們假設(shè)用一個(gè)隊(duì)列存放一波生產(chǎn)者生產(chǎn)的資源,當(dāng)資源滿了通知消費(fèi)者消費(fèi)。當(dāng)消費(fèi)者消費(fèi)空了,通知生產(chǎn)者生產(chǎn)。
所以這時(shí)候使用condition控制流程最合適(這也是阻塞的隊(duì)列內(nèi)部的實(shí)現(xiàn)),所以我們要定義兩個(gè)信號(hào),分別為:
- 當(dāng)資源被耗盡,我們就使用資源未滿條件(notFull): 調(diào)用signal通知生產(chǎn)者消費(fèi),消費(fèi)者調(diào)用await進(jìn)入等待。
- 當(dāng)資源被填滿,使用資源為空條件(notEmpty):將生產(chǎn)者用await方法掛起,消費(fèi)者用signal喚醒消費(fèi)告知非空。
很明顯生產(chǎn)者和消費(fèi)者本質(zhì)上就是基于這兩個(gè)標(biāo)識(shí)分別標(biāo)志自己的等待時(shí)機(jī)和通知時(shí)機(jī),以生產(chǎn)者為例,即每生產(chǎn)一個(gè)資源后就可以調(diào)用notEmpty通知消費(fèi)者消費(fèi),當(dāng)生產(chǎn)者速度過(guò)快,則用await等待未滿notFull條件阻塞:
首先我們給出生產(chǎn)者和消費(fèi)者條件和資源隊(duì)列聲明,基于上述條件我們給出一個(gè)經(jīng)典的生產(chǎn)者和消費(fèi)者模式的示例,我們首先給出生產(chǎn)者代碼,可以看到資源滿的時(shí)候調(diào)用notFull.await();將自己掛起等待未滿,生產(chǎn)資源后調(diào)用 notEmpty.signal();通知消費(fèi)者消費(fèi)。
對(duì)應(yīng)消費(fèi)者示例代碼也是一樣,當(dāng)資源消費(fèi)完全,調(diào)用notEmpty.await();等待不空,一旦消費(fèi)定量資源調(diào)用notFull.signal();通知生產(chǎn)者生產(chǎn)。
最終代碼示例如下:
@Slf4j
public class ProducerMode {
//鎖
private static ReentrantLock lock = new ReentrantLock();
// 資源未滿
private Condition notFull = lock.newCondition();
//資源為空
private Condition notEmpty = lock.newCondition();
private Queue<Integer> queue = new PriorityQueue<>(10);
private int queueMaxSize = 10;
/**
* 生產(chǎn)者
*/
private class Producer extends Thread {
@Override
public void run() {
while (true) {
lock.lock();
try {
if (queueMaxSize == queue.size()) {
log.info("當(dāng)前隊(duì)列已滿,通知消費(fèi)者消費(fèi)");
//等待不滿條件觸發(fā)
notFull.await();
}
queue.offer(1);
log.info("生產(chǎn)者補(bǔ)貨,當(dāng)前隊(duì)列有 【{}】", queue.size());
//通知消費(fèi)者隊(duì)列不空,可以消費(fèi)
notEmpty.signal();
} catch (Exception e) {
log.error("生產(chǎn)者報(bào)錯(cuò),失敗原因 [{}]", e.getMessage(), e);
} finally {
lock.unlock();
}
}
}
}
/**
* 消費(fèi)者
*/
private class Consumer extends Thread {
@Override
public void run() {
while (true) {
lock.lock();
try {
if (0 == queue.size()) {
log.info("當(dāng)前隊(duì)列已空,通知生產(chǎn)者補(bǔ)貨");
//等待不空條件達(dá)到
notEmpty.await();
}
queue.poll();
//通知消費(fèi)者不滿了
notFull.signal();
log.info("消費(fèi)者完成消費(fèi),當(dāng)前隊(duì)列還剩余 【{}】個(gè)元素", queue.size());
} catch (Exception e) {
log.error("生產(chǎn)者報(bào)錯(cuò),失敗原因 [{}]", e.getMessage(), e);
} finally {
lock.unlock();
}
}
}
}
public static void main(String[] args) {
ProducerMode mode = new ProducerMode();
Producer producer = mode.new Producer();
ProducerMode.Consumer consumer = mode.new Consumer();
producer.start();
consumer.start();
}
}
對(duì)應(yīng)的我們給出輸出結(jié)果:
四、CyclicBarrier
1. CyclicBarrier 原理和使用示例
CyclicBarrier 也就是循環(huán)柵欄對(duì)象,不是很常用,它主要用于等待線程數(shù)就緒后執(zhí)行公共邏輯的業(yè)務(wù)場(chǎng)景。 例如我們希望每湊齊5個(gè)線程后執(zhí)行后續(xù)邏輯,我們就可以說(shuō)明CyclicBarrier 數(shù)值為5,然后每個(gè)線程到期后調(diào)用await等待其他線程就緒。
一旦到齊5個(gè),CyclicBarrier 就會(huì)通知這些線程開(kāi)始工作,對(duì)應(yīng)的代碼如下所示:
public static void main(String[] args) throws InterruptedException {
int threadCount = 5;
CyclicBarrier barrier = new CyclicBarrier(threadCount);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
System.out.println("線程 " + Thread.currentThread().getName() + " 開(kāi)始執(zhí)行任務(wù)");
try {
// 模擬執(zhí)行任務(wù)
Thread.sleep(1000);
System.out.println("線程 " + Thread.currentThread().getName() + " 到達(dá)屏障");
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("所有線程都到達(dá)屏障,一起繼續(xù)執(zhí)行");
}).start();
}
}
對(duì)應(yīng)的我們給出相應(yīng)輸出示例:
2. CyclicBarrier 下的多核并發(fā)運(yùn)算技巧
利用循環(huán)柵欄的特點(diǎn),我們可以很好基于計(jì)算機(jī)核心數(shù)完成所有的耗時(shí)運(yùn)算,等待所有計(jì)算完成之后,通過(guò)柵欄來(lái)匯聚計(jì)算結(jié)果打印輸出:
對(duì)應(yīng)我們給出主線程的實(shí)現(xiàn),可以看到該處理器會(huì)得到一個(gè)與核心數(shù)一致的列表,并將列表中的每個(gè)子列表交由worker線程處理,每當(dāng)worker完成列表中一個(gè)元素運(yùn)算后,就會(huì)觸發(fā)柵欄的方法打印結(jié)果:
public class ArraySquareCalculator {
private final List<List<Integer>> taskList;
private final Worker[] workers;
private final CyclicBarrier barrier;
public ArraySquareCalculator(List<List<Integer>> taskList) {
if (taskList == null || taskList.isEmpty()) {
throw new RuntimeException("任務(wù)列表不能為空");
}
if (taskList.size() != Runtime.getRuntime().availableProcessors()) {
throw new RuntimeException("任務(wù)列表數(shù)量必須等于CPU數(shù)量");
}
this.taskList = taskList;
barrier = new CyclicBarrier(taskList.size(), () -> {
Console.log("所有線程都到達(dá)屏障,執(zhí)行結(jié)束");
Console.log("執(zhí)行結(jié)果:{}", JSONUtil.toJsonStr(taskList));
});
workers = new Worker[taskList.size()];
for (int i = 0; i < taskList.size(); i++) {
workers[i] = new Worker(i, taskList, barrier);
}
}
//啟動(dòng)核心數(shù)對(duì)應(yīng)的工作線程執(zhí)行運(yùn)算
public synchronized void start() {
for (Worker worker : workers) {
new Thread(worker).start();
}
}
}
對(duì)應(yīng)的我們也給出worker子線程代碼,可以看到核心數(shù)對(duì)應(yīng)的子線程worker完成各自負(fù)責(zé)列表的元素運(yùn)算后,就會(huì)通過(guò)柵欄提交給主線程告知完成:
public class Worker implements Runnable {
private final int elementIdx;
private final List<List<Integer>> list;
private final CyclicBarrier barrier;
Worker(int elementIdx, List<List<Integer>> list, CyclicBarrier cyclicBarrier) {
this.elementIdx = elementIdx;
this.list = list;
this.barrier = cyclicBarrier;
}
@SneakyThrows
@Override
public void run() {
//每個(gè)核心對(duì)應(yīng)的線程處理各自索引列表
List<Integer> workList = list.get(elementIdx);
for (int i = 0; i < workList.size(); i++) {
//完成負(fù)責(zé)列表元素計(jì)算后,通過(guò)屏障等待所有線程完成
workList.set(i, workList.get(i) << 1);
barrier.await();
}
}
}
對(duì)應(yīng)的我們也給出測(cè)試代碼:
//創(chuàng)建一個(gè)與核心數(shù)一樣的列表
int size = Runtime.getRuntime().availableProcessors();
List<List<Integer>> list = new ArrayList<>();
//添加元素到列表中
for (int i = 0; i < size; i++) {
ArrayList<Integer> arrayList = new ArrayList<>();
for (int j = 1; j <= 3; j++) {
arrayList.add(j);
}
list.add(arrayList);
}
//啟動(dòng)并行運(yùn)算處理器
ArraySquareCalculator calculator = new ArraySquareCalculator(list);
calculator.start();
輸出結(jié)果如下,與預(yù)期一致:
3. CyclicBarrier如何控制并發(fā)
以上面并行核心線程運(yùn)算邏輯為例,本質(zhì)上await方法調(diào)用后底層就會(huì)完成count扣減,當(dāng)count為0后就會(huì)觸發(fā)一次主線程邏輯調(diào)用,也就是我們的打印輸出,即通過(guò)count來(lái)完成線程之間的循環(huán)并發(fā)流程阻塞和通知:
對(duì)應(yīng)的我們也給出await的源碼,可以看到其內(nèi)部是通過(guò)調(diào)用dowait執(zhí)行上述所說(shuō)邏輯:
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
查看dowait即可印證我們的邏輯:
- 所有線程調(diào)用await執(zhí)行count扣減
- count為0調(diào)用barrierCommand也就是我們初始化時(shí)設(shè)置的打印輸出方法
- 完成barrierCommand任務(wù)執(zhí)行后調(diào)用nextGeneration將count重置為初始化時(shí)的數(shù)值,對(duì)應(yīng)的我們的代碼就是CPU核心數(shù)
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
//......
int index = --count;
//count扣減為0 步入執(zhí)行邏輯
if (index == 0) { // tripped
boolean ranAction = false;
try {
//調(diào)用barrierCommand執(zhí)行歸并邏輯運(yùn)算
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
//將count重置為初始值
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
//......
} finally {
lock.unlock();
}
}
4. CyclicBarrier 與CountDownLatch區(qū)別(重點(diǎn))
CountDownLatch用戶事件即主要是業(yè)務(wù)流程上的控制并不是針對(duì)線程,CyclicBarrier 循環(huán)柵欄作用于線程,如上代碼必須等待線程到齊后觸發(fā)。
循環(huán)柵欄可重復(fù)使用,CountDownLatch則不能。