成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

詳解 Java 并發(fā)流程控制工具

開(kāi)發(fā)
本文將針對(duì)JUC包下幾個(gè)常見(jiàn)的工具類進(jìn)行深入剖析和演示,通過(guò)針對(duì)本文的閱讀,讀者將會(huì)對(duì)JUC包下的工具有一個(gè)全面的了解和運(yùn)用。

本文將針對(duì)JUC包下幾個(gè)常見(jiàn)的工具類進(jìn)行深入剖析和演示,通過(guò)針對(duì)本文的閱讀,讀者將會(huì)對(duì)JUC包下的工具有一個(gè)全面的了解和運(yùn)用。

一、CountDownLatch(倒計(jì)時(shí)門閂)

1. CountDownLatch簡(jiǎn)介

在并發(fā)編程的禪意中,CountDownLatch本質(zhì)上就是一種閉鎖,而閉鎖的語(yǔ)義則是等待所有其他活動(dòng)都完成了,才會(huì)繼續(xù)執(zhí)行后續(xù)的操作。

筆者一般稱CountDownLatch為倒計(jì)時(shí)門閂,它主要用于需要某些條件下才能喚醒的需求場(chǎng)景,例如我們線程1必須等到線程2做完某些事,那么就可以設(shè)置一個(gè)CountDownLatch并將數(shù)值設(shè)置為1,一旦線程2完成業(yè)務(wù)邏輯后,將數(shù)值修改為0,此時(shí)線程1就會(huì)被喚醒:

2. 基于CountDownLatch實(shí)現(xiàn)等待多線程就緒

通過(guò)上述的描述可能有點(diǎn)抽象,我們直接通過(guò)幾個(gè)例子演示一下,我們現(xiàn)在有這樣一個(gè)需求,希望等待5個(gè)線程完成之后,打印輸出一句工作完成:

對(duì)應(yīng)的代碼示例如下,可以看到我們創(chuàng)建了數(shù)值為5的CountDownLatch ,一旦線程池里的線程完成工作后就調(diào)用countDown進(jìn)行扣減,一旦數(shù)值變?yōu)?,主線程await就會(huì)放行,執(zhí)行后續(xù)輸出:

int workerSize = 5;
        CountDownLatch workCount = new CountDownLatch(workerSize);
        ExecutorService threadPool = Executors.newFixedThreadPool(workerSize);

        for (int i = 0; i < workerSize; i++) {
            final int workerNum = i;
            //5個(gè)工人輸出完成工作后,扣減倒計(jì)時(shí)門閂數(shù)
            threadPool.submit(() -> {
                log.info("worker[{}]完成手頭的工作", workerNum);
                workCount.countDown();
            });
        }

        try {
            //阻塞當(dāng)前線程(主線程)往后走,只有倒計(jì)時(shí)門閂變?yōu)?之后才能繼續(xù)后續(xù)邏輯
            log.info("等待worker工作完成");
            workCount.await();
        } catch (InterruptedException e) {
            log.info("倒計(jì)時(shí)門閂阻塞失敗,失敗原因[{}]", e.getMessage(), e);
        }

        threadPool.shutdown();
        while (!threadPool.isTerminated()) {

        }

        log.info("所有工人都完成手頭的工作了");

對(duì)應(yīng)的我們也給出輸出結(jié)果,可以看到主線程在線程池線程完成后才輸出:

3. 基于CountDownLatch實(shí)現(xiàn)運(yùn)動(dòng)員賽跑

實(shí)際上CountDownLatch可以讓多個(gè)線程進(jìn)行等待,我們不妨用線程模擬一下所有運(yùn)動(dòng)員就緒后,等待槍響后起跑的場(chǎng)景:

代碼如下,每當(dāng)運(yùn)動(dòng)員即線程池的線程準(zhǔn)備就緒,則調(diào)用await等待槍響,一旦所有運(yùn)動(dòng)員就緒之后,主線程調(diào)用countDown模擬槍響,然后運(yùn)動(dòng)員起跑:

public static void main(String[] args) {
        log.info("百米跑比賽開(kāi)始");

        int playerNum = 3;
        CountDownLatch gun = new CountDownLatch(1);
        ExecutorService threadPool = Executors.newFixedThreadPool(playerNum);
        
        for (int i = 0; i < playerNum; i++) {
            final int playNo = i;
            
            threadPool.submit(() -> {
                log.info("[{}]號(hào)運(yùn)動(dòng)員已就緒", playNo);
                try {
                    gun.await();
                } catch (InterruptedException e) {
                    log.info("[{}]號(hào)運(yùn)動(dòng)員線程阻塞失敗,失敗原因[{}]", playNo, e.getMessage(), e);
                }
                log.info("[{}]號(hào)運(yùn)動(dòng)員已經(jīng)到達(dá)重點(diǎn)", playNo);
            });
        }

        //按下槍 所有運(yùn)動(dòng)員起跑
        gun.countDown();

        threadPool.shutdown();
        while (!threadPool.isTerminated()) {

        }

        log.info("百米賽跑已結(jié)束");
    }

對(duì)應(yīng)的我們也給出相應(yīng)的輸出結(jié)果:

4. 從源碼角度分析CountDownLatch工作流程

我們以等待所有工人完成工作的例子進(jìn)行解析,實(shí)際上在CountDownLatch是通過(guò)state和一個(gè)抽象隊(duì)列即aqs完成多線程之間的流程調(diào)度,主線程調(diào)用await方法等待其他worker線程,如果其它worker線程沒(méi)有完成工作,那么CountDownLatch就會(huì)將其存入抽象隊(duì)列中。

一旦其他線程將state設(shè)置為0時(shí),await對(duì)應(yīng)的線程就會(huì)從抽象隊(duì)列中釋放并喚醒:

對(duì)應(yīng)我們給出countDown的實(shí)現(xiàn),可以看到該方法底層就是將aqs隊(duì)列中的state進(jìn)行扣減:

public void countDown() {
        sync.releaseShared(1);
    }

//releaseShared內(nèi)部核心邏輯就是將state扣減1
protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                //扣減state并通過(guò)cas修改賦值
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

而countDown本質(zhì)上就是查看這個(gè)state,如果state被扣減為0,則調(diào)用aqs底層doReleaseShared方法將隊(duì)列中等待線程喚醒:

public void countDown() {
        sync.releaseShared(1);
    }


public final boolean releaseShared(int arg) {
  //查看是否扣減為0
        if (tryReleaseShared(arg)) {
        //如果是0則將當(dāng)前等待線程喚醒
            doReleaseShared();
            return true;
        }
        return false;
    }

上文講解countDown涉及一些關(guān)于AQS的實(shí)用理解和設(shè)計(jì),關(guān)于更多AQS的知識(shí)點(diǎn),感興趣的讀者可以閱讀一下筆者的這篇文章:《AQS 源碼解析:原理與實(shí)踐》。

二、Semaphore(信號(hào)量)

1. 詳解Semaphore

信號(hào)量多用于限流的場(chǎng)景,例如我們希望單位時(shí)間內(nèi)只能有一個(gè)線程工作,我們就可以使用信號(hào)量,只有拿到線程的信號(hào)量才能工作,工作完成后釋放信號(hào)量,其余線程才能爭(zhēng)搶這個(gè)信號(hào)量并進(jìn)行進(jìn)一步的操作。

對(duì)應(yīng)我們給出下面這段代碼,可以看到筆者聲明信號(hào)量數(shù)值為6,每當(dāng)線程拿到3個(gè)信號(hào)量之后就會(huì)執(zhí)行業(yè)務(wù)操作,完成后調(diào)用release釋放3個(gè)令牌,讓其他線程繼續(xù)爭(zhēng)搶:

//設(shè)置可復(fù)用的信號(hào)量,令牌數(shù)為3
        Semaphore semaphore = new Semaphore(6, true);
        //創(chuàng)建5個(gè)線程
        int workSize = 5;
        ExecutorService executorService = Executors.newFixedThreadPool(workSize);


        for (int i = 0; i < workSize; i++) {
            executorService.submit(() -> {
                try {
                    //拿3個(gè)令牌
                    semaphore.acquire(3);

                    log.info("進(jìn)行業(yè)務(wù)邏輯處理.......");
                    ThreadUtil.sleep(1000);

                    //釋放3個(gè)令牌
                    semaphore.release(3);

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        executorService.shutdown();
        while (!executorService.isTerminated()) {

        }

對(duì)應(yīng)輸出結(jié)果如下,可以看到每個(gè)線程拿到令牌后都會(huì)休眠1秒,從輸出結(jié)果來(lái)看每秒只有兩個(gè)線程才工作,符合我們的限流需求:

2. 詳解Semaphore工作原理

Semaphore底層也是用到的aqs隊(duì)列,線程進(jìn)行資源獲取時(shí)也是通過(guò)查看state是否足夠,在明確足夠的情況下進(jìn)行state扣減,然后進(jìn)行工作。如果線程發(fā)現(xiàn)state數(shù)量不夠,那么就會(huì)被Semaphore存入aqs底層的抽象隊(duì)列中,直到state數(shù)量足夠后被喚醒:

對(duì)此我們給出Semaphore底層的acquire的邏輯可以看到,它會(huì)讀取state數(shù)值然后進(jìn)行扣減,如果剩余數(shù)量大于0則說(shuō)明令牌獲取成功線程可以執(zhí)行后續(xù)邏輯,反之說(shuō)明當(dāng)前令牌數(shù)不夠,外部邏輯會(huì)將該線程掛到等待隊(duì)列中,等待令牌足夠后將其喚醒:

protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                //讀取可用的state    
                int available = getState();
                //計(jì)算剩余的state
                int remaining = available - acquires;
                //如果小于0說(shuō)明令牌數(shù)不足直接返回出去,讓外部將線程掛起,反之通過(guò)cas修改剩余數(shù),返回大于0的結(jié)果讓持有令牌的線程執(zhí)行后續(xù)邏輯
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

3. 基于Semaphore實(shí)現(xiàn)一個(gè)有界容器

利用Semaphore信號(hào)量并發(fā)獲取且資源循環(huán)可復(fù)用的特性,我們可以通過(guò)實(shí)例封閉技術(shù)落地一個(gè)有界的容器,如下代碼所示,只有得到信號(hào)量且添加成功了信號(hào)量才會(huì)成功扣減,如果沒(méi)有拿到信號(hào)量就阻塞無(wú)法添加,除非其他線程釋放自己的資源。

如下圖,筆者利用信號(hào)量實(shí)現(xiàn)一個(gè)列表容器的限流設(shè)置,可以看到當(dāng)前容器還剩一個(gè)空間,所以信號(hào)量數(shù)也是1,當(dāng)線程0獲得信號(hào)量成功后將元素24添加至容器中。隨后的線程1看到信號(hào)量為0,即知曉容器沒(méi)有可用空間就會(huì)被阻塞等待:

一旦線程1刪除一個(gè)元素成功后,就會(huì)歸還一個(gè)令牌,此時(shí)線程1就會(huì)被信號(hào)量喚醒,嘗試獲取令牌并添加元素,這就是我們有界容器實(shí)現(xiàn)的核心思路:

對(duì)應(yīng)的我們給出有界容器的落地代碼示例:

public class BoundedList<E> {


    private final List<E> list;
    private final Semaphore semaphore;

    /*
    初始化一個(gè)并發(fā)的有界容器
     */
    public BoundedList(int bound) {
        this.list = Collections.synchronizedList(new ArrayList<>());
        this.semaphore = new Semaphore(bound);
    }


    public boolean add(E element) {
        boolean wasAdded = false;
        try {
            //獲取令牌,成功后才會(huì)添加容器
            semaphore.acquire();
            wasAdded = list.add(element);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            //添加失敗則釋放令牌,讓其他線程可以嘗試到該有界容器中添加
            if (!wasAdded)
                semaphore.release();
            return wasAdded;
        }
    }

    public void remove(E element) {
        boolean remove = false;
        try {

            remove = list.remove(element);
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            //只有明確元素移除成功,才會(huì)釋放令牌
            if (remove)
                semaphore.release();
        }
    }

    @Override
    public String toString() {
        return JSONUtil.toJsonStr(list);
    }
}

對(duì)應(yīng)測(cè)試代碼如下,大體思路為:

  • 嘗試讓線程0填滿容器使線程1阻塞
  • 隨后線程0移除一個(gè)元素
  • 線程1被喚醒,并成功獲取令牌,將元素5成功添加
BoundedList<Integer> list = new BoundedList<>(5);
        CountDownLatch countDownLatch = new CountDownLatch(2);

        new Thread(() -> {
            //添加5個(gè)元素填滿容器
            Console.log("線程1添加5個(gè)元素");
            for (int i = 0; i < 5; i++) {
                list.add(i);
            }

            ThreadUtil.sleep(5000);
            //移除元素2,讓線程2添加元素5成功
            Console.log("線程1移除元素2");
            list.remove(2);

            countDownLatch.countDown();

        }).start();


        new Thread(() -> {
            ThreadUtil.sleep(1000);
            Console.log("線程2添加元素5");
            list.add(5);
            Console.log("線程2添加元素5成功");
            countDownLatch.countDown();
        }).start();


        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        Console.log("線程1和線程2執(zhí)行完畢,有界容器元素:{}", list);

輸出結(jié)果如下,符合我們對(duì)有界容器預(yù)期:

線程1添加5個(gè)元素
線程2添加元素5
線程1移除元素2
線程2添加元素5成功
線程1和線程2執(zhí)行完畢,有界容器元素:[0,1,3,4,5]

4. Semaphore使用注意事項(xiàng)

  • 獲取和釋放的時(shí)候都可以指定數(shù)量,但是要保持一致。
  • 公平性設(shè)置為true會(huì)更加合理
  • 并不必須由獲取許可證的線程釋放許可證。可以是A獲取,B釋放。

三、Condition

1. 詳解Condition

Condition即條件對(duì)象,不是很常用或者直接用到的對(duì)象,常用于線程等待喚醒操作,例如A線程需要等待某個(gè)條件的時(shí)候,我們可以通過(guò)condition.await()方法,A線程就會(huì)進(jìn)入阻塞狀態(tài)。

線程B執(zhí)行condition.signal()方法,則JVM就會(huì)從被阻塞線程中找到等待該condition的線程。線程A收到可執(zhí)行信號(hào)的時(shí)候,他的線程狀態(tài)就會(huì)變成Runnable可執(zhí)行狀態(tài)。

對(duì)此我們給出代碼示例,可以看到我們從ReentrantLock 中拿到一個(gè)Condition 對(duì)象,讓創(chuàng)建的線程進(jìn)入等待狀態(tài),隨后讓主線程調(diào)用condition 的signal將其喚醒:

private ReentrantLock lock = new ReentrantLock();
    //條件對(duì)象,操控線程的等待和通知
    private Condition condition = lock.newCondition();

    public void waitCondition() throws InterruptedException {
        lock.lock();
        try {
            log.info("等待達(dá)到條件后通知");
            condition.await();
            log.info("收到通知,開(kāi)始執(zhí)行業(yè)務(wù)邏輯");
        } finally {
            lock.unlock();
            log.info("執(zhí)行完成,釋放鎖");
        }
    }


    public void notifyCondition() throws InterruptedException {
        lock.lock();
        try {
            log.info("達(dá)到條件發(fā)起通知");
            condition.signal();
            log.info("發(fā)起通知結(jié)束");
        } finally {
            lock.unlock();
            log.info("發(fā)起通知執(zhí)行完成,釋放鎖");
        }
    }


    public static void main(String[] args) throws InterruptedException {
        Main obj = new Main();

        new Thread(() -> {
            try {
                obj.waitCondition();
                //讓出CPU時(shí)間片,交給主線程發(fā)起通知
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                log.error("等待條件通知設(shè)置失敗,失敗原因 [{}]", e.getMessage(), e);
            }
        }).start();

        //休眠3s喚醒等待線程
        Thread.sleep(3000);
        obj.notifyCondition();
    }

對(duì)應(yīng)的我們也給出輸出結(jié)果:

2. 基于條件對(duì)象完成生產(chǎn)者、消費(fèi)者模式

我們假設(shè)用一個(gè)隊(duì)列存放一波生產(chǎn)者生產(chǎn)的資源,當(dāng)資源滿了通知消費(fèi)者消費(fèi)。當(dāng)消費(fèi)者消費(fèi)空了,通知生產(chǎn)者生產(chǎn)。

所以這時(shí)候使用condition控制流程最合適(這也是阻塞的隊(duì)列內(nèi)部的實(shí)現(xiàn)),所以我們要定義兩個(gè)信號(hào),分別為:

  • 當(dāng)資源被耗盡,我們就使用資源未滿條件(notFull): 調(diào)用signal通知生產(chǎn)者消費(fèi),消費(fèi)者調(diào)用await進(jìn)入等待。
  • 當(dāng)資源被填滿,使用資源為空條件(notEmpty):將生產(chǎn)者用await方法掛起,消費(fèi)者用signal喚醒消費(fèi)告知非空。

很明顯生產(chǎn)者和消費(fèi)者本質(zhì)上就是基于這兩個(gè)標(biāo)識(shí)分別標(biāo)志自己的等待時(shí)機(jī)和通知時(shí)機(jī),以生產(chǎn)者為例,即每生產(chǎn)一個(gè)資源后就可以調(diào)用notEmpty通知消費(fèi)者消費(fèi),當(dāng)生產(chǎn)者速度過(guò)快,則用await等待未滿notFull條件阻塞:

首先我們給出生產(chǎn)者和消費(fèi)者條件和資源隊(duì)列聲明,基于上述條件我們給出一個(gè)經(jīng)典的生產(chǎn)者和消費(fèi)者模式的示例,我們首先給出生產(chǎn)者代碼,可以看到資源滿的時(shí)候調(diào)用notFull.await();將自己掛起等待未滿,生產(chǎn)資源后調(diào)用 notEmpty.signal();通知消費(fèi)者消費(fèi)。

對(duì)應(yīng)消費(fèi)者示例代碼也是一樣,當(dāng)資源消費(fèi)完全,調(diào)用notEmpty.await();等待不空,一旦消費(fèi)定量資源調(diào)用notFull.signal();通知生產(chǎn)者生產(chǎn)。

最終代碼示例如下:

@Slf4j
public class ProducerMode {

    //鎖
    private static ReentrantLock lock = new ReentrantLock();
    // 資源未滿
    private Condition notFull = lock.newCondition();
    //資源為空
    private Condition notEmpty = lock.newCondition();

    private Queue<Integer> queue = new PriorityQueue<>(10);
    private int queueMaxSize = 10;

    /**
     * 生產(chǎn)者
     */
    private class Producer extends Thread {
        @Override
        public void run() {

            while (true) {
                lock.lock();

                try {
                    if (queueMaxSize == queue.size()) {
                        log.info("當(dāng)前隊(duì)列已滿,通知消費(fèi)者消費(fèi)");
                        //等待不滿條件觸發(fā)
                        notFull.await();

                    }

                    queue.offer(1);
                    log.info("生產(chǎn)者補(bǔ)貨,當(dāng)前隊(duì)列有 【{}】", queue.size());
                    //通知消費(fèi)者隊(duì)列不空,可以消費(fèi)
                    notEmpty.signal();
                } catch (Exception e) {
                    log.error("生產(chǎn)者報(bào)錯(cuò),失敗原因 [{}]", e.getMessage(), e);
                } finally {
                    lock.unlock();
                }


            }
        }



    }

    /**
     * 消費(fèi)者
     */
    private class Consumer extends Thread {
        @Override
        public void run() {

            while (true) {
                lock.lock();

                try {
                    if (0 == queue.size()) {
                        log.info("當(dāng)前隊(duì)列已空,通知生產(chǎn)者補(bǔ)貨");
                        //等待不空條件達(dá)到
                        notEmpty.await();

                    }

                    queue.poll();
                    //通知消費(fèi)者不滿了
                    notFull.signal();
                    log.info("消費(fèi)者完成消費(fèi),當(dāng)前隊(duì)列還剩余 【{}】個(gè)元素", queue.size());
                } catch (Exception e) {
                    log.error("生產(chǎn)者報(bào)錯(cuò),失敗原因 [{}]", e.getMessage(), e);
                } finally {
                    lock.unlock();
                }


            }
        }
    }


    public static void main(String[] args) {
        ProducerMode mode = new ProducerMode();
        Producer producer = mode.new Producer();
        ProducerMode.Consumer consumer = mode.new Consumer();
        producer.start();
        consumer.start();
    }
}

對(duì)應(yīng)的我們給出輸出結(jié)果:

四、CyclicBarrier

1. CyclicBarrier 原理和使用示例

CyclicBarrier 也就是循環(huán)柵欄對(duì)象,不是很常用,它主要用于等待線程數(shù)就緒后執(zhí)行公共邏輯的業(yè)務(wù)場(chǎng)景。 例如我們希望每湊齊5個(gè)線程后執(zhí)行后續(xù)邏輯,我們就可以說(shuō)明CyclicBarrier 數(shù)值為5,然后每個(gè)線程到期后調(diào)用await等待其他線程就緒。

一旦到齊5個(gè),CyclicBarrier 就會(huì)通知這些線程開(kāi)始工作,對(duì)應(yīng)的代碼如下所示:

public static void main(String[] args) throws InterruptedException {
        int threadCount = 5;
        CyclicBarrier barrier = new CyclicBarrier(threadCount);

        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                System.out.println("線程 " + Thread.currentThread().getName() + " 開(kāi)始執(zhí)行任務(wù)");
                try {
                    // 模擬執(zhí)行任務(wù)
                    Thread.sleep(1000);
                    System.out.println("線程 " + Thread.currentThread().getName() + " 到達(dá)屏障");
                    barrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }

                System.out.println("所有線程都到達(dá)屏障,一起繼續(xù)執(zhí)行");
            }).start();
        }
    }

對(duì)應(yīng)的我們給出相應(yīng)輸出示例:

2. CyclicBarrier 下的多核并發(fā)運(yùn)算技巧

利用循環(huán)柵欄的特點(diǎn),我們可以很好基于計(jì)算機(jī)核心數(shù)完成所有的耗時(shí)運(yùn)算,等待所有計(jì)算完成之后,通過(guò)柵欄來(lái)匯聚計(jì)算結(jié)果打印輸出:

對(duì)應(yīng)我們給出主線程的實(shí)現(xiàn),可以看到該處理器會(huì)得到一個(gè)與核心數(shù)一致的列表,并將列表中的每個(gè)子列表交由worker線程處理,每當(dāng)worker完成列表中一個(gè)元素運(yùn)算后,就會(huì)觸發(fā)柵欄的方法打印結(jié)果:

public class ArraySquareCalculator {


    private final List<List<Integer>> taskList;

    private final Worker[] workers;


    private final CyclicBarrier barrier;

    public ArraySquareCalculator(List<List<Integer>> taskList) {

        if (taskList == null || taskList.isEmpty()) {
            throw new RuntimeException("任務(wù)列表不能為空");
        }
        if (taskList.size() != Runtime.getRuntime().availableProcessors()) {
            throw new RuntimeException("任務(wù)列表數(shù)量必須等于CPU數(shù)量");
        }

        this.taskList = taskList;

        barrier = new CyclicBarrier(taskList.size(), () -> {
            Console.log("所有線程都到達(dá)屏障,執(zhí)行結(jié)束");
            Console.log("執(zhí)行結(jié)果:{}", JSONUtil.toJsonStr(taskList));
        });

        workers = new Worker[taskList.size()];
        for (int i = 0; i < taskList.size(); i++) {
            workers[i] = new Worker(i, taskList, barrier);
        }


    }


    //啟動(dòng)核心數(shù)對(duì)應(yīng)的工作線程執(zhí)行運(yùn)算
    public synchronized void start() {
        for (Worker worker : workers) {
            new Thread(worker).start();
        }
    }


    
}

對(duì)應(yīng)的我們也給出worker子線程代碼,可以看到核心數(shù)對(duì)應(yīng)的子線程worker完成各自負(fù)責(zé)列表的元素運(yùn)算后,就會(huì)通過(guò)柵欄提交給主線程告知完成:

public class Worker implements Runnable {


    private final int elementIdx;
    private final List<List<Integer>> list;
    private final CyclicBarrier barrier;

    Worker(int elementIdx, List<List<Integer>> list, CyclicBarrier cyclicBarrier) {
        this.elementIdx = elementIdx;
        this.list = list;
        this.barrier = cyclicBarrier;
    }

    @SneakyThrows
    @Override
    public void run() {
        //每個(gè)核心對(duì)應(yīng)的線程處理各自索引列表
        List<Integer> workList = list.get(elementIdx);
        for (int i = 0; i < workList.size(); i++) {
            //完成負(fù)責(zé)列表元素計(jì)算后,通過(guò)屏障等待所有線程完成
            workList.set(i, workList.get(i) << 1);
            barrier.await();
        }


    }
}

對(duì)應(yīng)的我們也給出測(cè)試代碼:

//創(chuàng)建一個(gè)與核心數(shù)一樣的列表
        int size = Runtime.getRuntime().availableProcessors();
        List<List<Integer>> list = new ArrayList<>();
        //添加元素到列表中
        for (int i = 0; i < size; i++) {
            ArrayList<Integer> arrayList = new ArrayList<>();
            for (int j = 1; j <= 3; j++) {
                arrayList.add(j);
            }
            list.add(arrayList);

        }
        //啟動(dòng)并行運(yùn)算處理器
        ArraySquareCalculator calculator = new ArraySquareCalculator(list);
        calculator.start();

輸出結(jié)果如下,與預(yù)期一致:

3. CyclicBarrier如何控制并發(fā)

以上面并行核心線程運(yùn)算邏輯為例,本質(zhì)上await方法調(diào)用后底層就會(huì)完成count扣減,當(dāng)count為0后就會(huì)觸發(fā)一次主線程邏輯調(diào)用,也就是我們的打印輸出,即通過(guò)count來(lái)完成線程之間的循環(huán)并發(fā)流程阻塞和通知:

對(duì)應(yīng)的我們也給出await的源碼,可以看到其內(nèi)部是通過(guò)調(diào)用dowait執(zhí)行上述所說(shuō)邏輯:

public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

查看dowait即可印證我們的邏輯:

  • 所有線程調(diào)用await執(zhí)行count扣減
  • count為0調(diào)用barrierCommand也就是我們初始化時(shí)設(shè)置的打印輸出方法
  • 完成barrierCommand任務(wù)執(zhí)行后調(diào)用nextGeneration將count重置為初始化時(shí)的數(shù)值,對(duì)應(yīng)的我們的代碼就是CPU核心數(shù)
private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;

          //......

            int index = --count;
            //count扣減為0 步入執(zhí)行邏輯
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                 //調(diào)用barrierCommand執(zhí)行歸并邏輯運(yùn)算
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //將count重置為初始值
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }
//......
        } finally {
            lock.unlock();
        }
    }

4. CyclicBarrier 與CountDownLatch區(qū)別(重點(diǎn))

CountDownLatch用戶事件即主要是業(yè)務(wù)流程上的控制并不是針對(duì)線程,CyclicBarrier 循環(huán)柵欄作用于線程,如上代碼必須等待線程到齊后觸發(fā)。

循環(huán)柵欄可重復(fù)使用,CountDownLatch則不能。

責(zé)任編輯:趙寧寧 來(lái)源: 寫代碼的SharkChili
相關(guān)推薦

2025-07-04 09:05:35

2025-02-07 14:42:59

2017-05-31 17:09:52

LinuxShell命令

2024-06-06 09:09:41

SQL循環(huán)控制命令

2010-05-11 12:53:58

Unix awk

2011-08-23 13:36:11

T-SQL查詢流程控制語(yǔ)句

2009-12-15 09:56:51

Ruby流程控制

2021-05-27 09:30:51

Java流程控制

2021-05-27 05:27:22

流程控制Rust

2009-09-04 10:42:56

C#流程控制語(yǔ)句

2024-11-01 16:05:26

2010-07-19 10:11:58

Perl流程控制語(yǔ)句

2021-02-03 06:15:26

工具postManHttp

2021-08-05 06:54:05

流程控制default

2024-11-05 12:59:42

while 循環(huán)迭代字節(jié)碼

2011-09-08 13:53:31

Node.js

2013-12-13 15:48:52

Lua腳本語(yǔ)言

2015-07-23 15:17:37

JavaScript循環(huán)語(yǔ)句

2010-03-18 16:37:13

Python 程序流程

2011-08-24 16:36:00

T-SQL
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 国产精产国品一二三产区视频 | 国产免费一区二区 | 黄色片在线免费看 | av男人的天堂av | 91精品国产一区二区三区香蕉 | 99视频在线免费观看 | 欧美成人一区二区三区 | 中文一区 | 国产精品中文在线 | 成人午夜在线观看 | 男女羞羞视频在线观看 | 精品久久亚洲 | 日韩三级电影一区二区 | 午夜极品| 国产精品毛片一区二区三区 | 一区二区伦理电影 | 亚洲一区二区不卡在线观看 | 欧美日韩在线一区二区三区 | 青青久久久 | 免费黄色录像片 | 国产一区二区观看 | 欧美日韩电影一区二区 | 国产日韩欧美中文字幕 | 欧美日韩精品一区 | 91看片在线观看 | 午夜在线| 欧美a级成人淫片免费看 | 天天操夜夜艹 | 中文字幕日韩欧美一区二区三区 | 国产一区二区三区四区在线观看 | 国产成人精品久久二区二区 | 精品99在线 | 最新免费av网站 | 国内精品视频 | 欧美日韩在线看 | 亚洲成人免费观看 | 欧美激情综合 | 欧美性生活一区二区三区 | 精品欧美乱码久久久久久1区2区 | 久久99精品久久久久久狂牛 | 精品麻豆剧传媒av国产九九九 |