成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

阻塞隊列BlockingQueue,看完就會了

開發 前端
當生產者嘗試向已滿的有界隊列添加元素時,添加方法(比如put()),操作可能會阻塞,直到隊列中有可用空間。這種特性使得有界隊列在某些場景下,能自動實現限流,避免系統資源過度消耗。

在Java并發編程中,生產者-消費者問題是一個常見的需求。java.util.concurrent.BlockingQueue接口為解決這類問題提供了強大便捷的支持。

不僅提供了在多線程環境下安全添加和獲取元素的方法,還通過阻塞機制確保了生產者和消費者之間的協調。

接下來,我們一起看看BlockingQueue的特性、方法以及如何使用它構建高效的多線程應用程序。

一、BlockingQueue類型

(一)無界隊列

無界隊列可以在理論上無限增長,在Java中創建無界BlockingQueue非常簡便:

BlockingQueue<String> unboundedQueue = new LinkedBlockingDeque<>();

此隊列的容量默認是Integer.MAX_VALUE,雖然有默認邊界,但在實際應用中,若生產者持續快速生產元素,而消費者無法及時消費,可能導致內存占用不斷增加,最終引發OOM。所以說,雖然默認有界,實際相當于無界。

(二)有界隊列

有界隊列則具有明確的最大容量限制,創建方式如下:

BlockingQueue<Integer> boundedQueue = new LinkedBlockingDeque<>(10);

這里創建了一個容量為10的BlockingQueue。

當生產者嘗試向已滿的有界隊列添加元素時,添加方法(比如put()),操作可能會阻塞,直到隊列中有可用空間。這種特性使得有界隊列在某些場景下,能自動實現限流,避免系統資源過度消耗。

二、BlockingQueue的方法

(一)添加元素

add(E e)

嘗試將指定元素添加到隊列中。若添加成功,返回true;若隊列已滿,則拋出IllegalStateException異常。比如:

BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5);
try {
    boolean success = queue.add(10);
    if (success) {
        System.out.println("元素添加成功");
    }
} catch (IllegalStateException e) {
    System.out.println("隊列已滿,添加元素失敗");
}

put(E e)

將元素插入隊列,如果隊列已滿,則阻塞當前線程,直到有可用空間。比如:

BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5);
try {
    queue.put(20);
    System.out.println("元素已放入隊列");
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    System.out.println("線程被中斷,添加元素失敗");
}

offer(E e)

嘗試將元素添加到隊列中。若添加成功,返回true;若隊列已滿,則返回false。比如:

BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5);
boolean result = queue.offer(30);
if (result) {
    System.out.println("元素添加成功");
} else {
    System.out.println("隊列已滿,添加元素失敗");
}

offer(E e, long timeout, TimeUnit unit)

在指定的超時時間內嘗試將元素插入隊列。若在超時時間內成功插入,返回true;否則返回false。比如:

BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5);
try {
    boolean success = queue.offer(40, 2, TimeUnit.SECONDS);
    if (success) {
        System.out.println("元素在超時時間內添加成功");
    } else {
        System.out.println("在超時時間內未能添加元素,隊列可能已滿");
    }
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    System.out.println("線程被中斷,添加元素失敗");
}

(二)檢索元素

take()

從隊列中獲取并移除頭部元素。如果隊列為空,當前線程將被阻塞,直到有元素可用。

BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5);
// 假設隊列中已有元素
try {
    Integer element = queue.take();
    System.out.println("取出的元素為: " + element);
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    System.out.println("線程被中斷,獲取元素失敗");
}

poll(long timeout, TimeUnit unit)

檢索并移除隊列頭部元素。若在指定的超時時間內有元素可用,則返回該元素;若超時時間內仍無元素可用,則返回null。

BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5);
try {
    Integer element = queue.poll(1, TimeUnit.SECONDS);
    if (element!= null) {
        System.out.println("在超時時間內取出的元素為: " + element);
    } else {
        System.out.println("在超時時間內未獲取到元素,隊列可能為空");
    }
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    System.out.println("線程被中斷,獲取元素失敗");
}

三、多線程生產者-消費者

我們將模擬一個簡單的生產-消費場景,假設有一個消息隊列,多個生產者線程不斷向隊列中生產消息(這里簡化為隨機整數),多個消費者線程從隊列中獲取消息并進行處理(這里簡化為打印消息和線程名)。為了能夠有效結束,增加poisonPill參數。

首先是生產者:

public class Producer implements Runnable {
    private final BlockingQueue<Integer> queue;
    private final int poisonPill;
    private final int poisonPillPerProducer;

    public Producer(BlockingQueue<Integer> queue, int poisonPill, int poisonPillPerProducer) {
        this.queue = queue;
        this.poisonPill = poisonPill;
        this.poisonPillPerProducer = poisonPillPerProducer;
    }

    @Override
    public void run() {
        try {
            produceMessages();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void produceMessages() throws InterruptedException {
        Random random = new Random();
        for (int i = 0; i < 100; i++) {
            int message = random.nextInt(100);
            queue.put(message);
        }
        for (int j = 0; j < poisonPillPerProducer; j++) {
            queue.put(poisonPill);
        }
    }
}

生產者構造函數接受一個BlockingQueue用于與消費者通信,還接受poisonPill和每個生產者應發送poisonPill值的數量。在produceMessages方法中,先生產100個隨機消息放入隊列,然后發送指定數量的毒丸。

接著是消費者:

public class Consumer implements Runnable {
    private final BlockingQueue<Integer> queue;
    private final int poisonPill;

    public Consumer(BlockingQueue<Integer> queue, int poisonPill) {
        this.queue = queue;
        this.poisonPill = poisonPill;
    }

    @Override
    public void run() {
        try {
            consumeMessages();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void consumeMessages() throws InterruptedException {
        while (true) {
            Integer message = queue.take();
            if (message.equals(poisonPill)) {
                return;
            }
            System.out.println(Thread.currentThread().getName() + " 消費消息: " + message);
        }
    }
}

消費者構造函數接受BlockingQueue和poisonPill值。在consumeMessages方法中,不斷從隊列獲取消息,如果是等于poisonPill則結束消費,否則打印消息和線程名。

最后是主程序類來啟動生產者和消費者線程:

final int BOUND = 10;
final int N_PRODUCERS = 3;
final int N_CONSUMERS = 2;
final int poisonPill = Integer.MAX_VALUE;
final int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS;
final int mod = N_CONSUMERS % N_PRODUCERS;

final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BOUND);

for (int i = 0; i < N_PRODUCERS; i++) {
    new Thread(new Producer(queue, poisonPill, poisonPillPerProducer)).start();
}

for (int j = 0; j < N_CONSUMERS; j++) {
    new Thread(new Consumer(queue, poisonPill)).start();
}

new Thread(new Producer(queue, poisonPill, poisonPillPerProducer + mod)).start();

在主程序中,定義了隊列容量、生產者和消費者數量,以及poisonPill值相關參數,創建了數量為10的有界BlockingQueue,啟動了指定數量的生產者和消費者線程。

當運行上述程序時,生產者線程會不斷向隊列中放入隨機整數,消費者線程會從隊列中取出并打印這些整數,同時每個消費者接收到poisonPill后會結束執行。

運行結果如下(因為用了隨機數,每次效果不同):

Thread-3 消費消息: 47
Thread-4 消費消息: 3
Thread-3 消費消息: 35
Thread-4 消費消息: 83
Thread-4 消費消息: 68
Thread-4 消費消息: 40
Thread-4 消費消息: 73
Thread-4 消費消息: 56
Thread-4 消費消息: 56
...

隨著生產和消費的進行,最終所有消費者線程在接收到poisonPill后停止。

責任編輯:武曉燕 來源: 看山的小屋
相關推薦

2022-05-17 08:24:58

查詢日志MySQL

2021-08-13 07:56:13

Python虛擬環境

2017-12-12 13:27:20

主板跳線USB

2020-11-27 09:16:21

BlockingQue

2018-04-27 15:33:59

Python裝飾器

2017-02-09 19:45:07

Linux系統Linux 發行版

2023-06-30 08:27:20

2022-10-21 08:02:40

reduce?初始值循環

2020-06-05 18:09:14

TomcatWeb應用服務器

2024-02-20 08:16:10

阻塞隊列源碼

2020-11-20 06:22:02

LinkedBlock

2020-11-24 09:04:55

PriorityBlo

2020-11-25 14:28:56

DelayedWork

2020-11-19 07:41:51

ArrayBlocki

2023-12-06 07:28:47

阻塞IO異步IO

2022-07-14 08:22:48

Computedvue3

2017-04-12 10:02:21

Java阻塞隊列原理分析

2024-12-26 07:49:57

Java隊列線程

2013-09-22 17:11:48

路由器選購路由器

2012-06-14 10:34:40

Java阻塞搜索實例
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 羞羞视频网站在线观看 | 日本久久精品视频 | 99精品国产一区二区青青牛奶 | 亚洲视频不卡 | av片毛片| 国产精品视频免费看 | 天天躁天天操 | 午夜男人视频 | 亚洲天堂中文字幕 | 97人人草 | 色呦呦网站 | 免费在线看黄视频 | 亚洲 欧美 在线 一区 | 一级欧美一级日韩片免费观看 | 国产精品无码久久久久 | 一区二区三区国产视频 | 欧美精品首页 | 91麻豆精品国产91久久久久久久久 | 午夜激情国产 | 日韩最新网址 | 亚洲视频在线观看 | 欧美日韩国产一区二区三区 | 乱码av午夜噜噜噜噜动漫 | 激情av免费看 | 国产免费看 | 欧美精品欧美精品系列 | av喷水| 欧美视频免费在线 | 欧美高清视频一区 | 日韩a级片 | 青青久草 | 亚洲性综合网 | 在线看免费 | 久久性色 | 精品国产乱码久久久久久丨区2区 | 久久成人一区 | 亚洲 中文 欧美 | 精品欧美一区二区中文字幕视频 | 国产激情91久久精品导航 | 在线播放国产一区二区三区 | 久久久久国产精品 |