阻塞隊列BlockingQueue,看完就會了
在Java并發編程中,生產者-消費者問題是一個常見的需求。java.util.concurrent.BlockingQueue接口為解決這類問題提供了強大便捷的支持。
不僅提供了在多線程環境下安全添加和獲取元素的方法,還通過阻塞機制確保了生產者和消費者之間的協調。
接下來,我們一起看看BlockingQueue的特性、方法以及如何使用它構建高效的多線程應用程序。
一、BlockingQueue類型
(一)無界隊列
無界隊列可以在理論上無限增長,在Java中創建無界BlockingQueue非常簡便:
BlockingQueue<String> unboundedQueue = new LinkedBlockingDeque<>();
此隊列的容量默認是Integer.MAX_VALUE,雖然有默認邊界,但在實際應用中,若生產者持續快速生產元素,而消費者無法及時消費,可能導致內存占用不斷增加,最終引發OOM。所以說,雖然默認有界,實際相當于無界。
(二)有界隊列
有界隊列則具有明確的最大容量限制,創建方式如下:
BlockingQueue<Integer> boundedQueue = new LinkedBlockingDeque<>(10);
這里創建了一個容量為10的BlockingQueue。
當生產者嘗試向已滿的有界隊列添加元素時,添加方法(比如put()),操作可能會阻塞,直到隊列中有可用空間。這種特性使得有界隊列在某些場景下,能自動實現限流,避免系統資源過度消耗。
二、BlockingQueue的方法
(一)添加元素
add(E e)
嘗試將指定元素添加到隊列中。若添加成功,返回true;若隊列已滿,則拋出IllegalStateException異常。比如:
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5);
try {
boolean success = queue.add(10);
if (success) {
System.out.println("元素添加成功");
}
} catch (IllegalStateException e) {
System.out.println("隊列已滿,添加元素失敗");
}
put(E e)
將元素插入隊列,如果隊列已滿,則阻塞當前線程,直到有可用空間。比如:
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5);
try {
queue.put(20);
System.out.println("元素已放入隊列");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("線程被中斷,添加元素失敗");
}
offer(E e)
嘗試將元素添加到隊列中。若添加成功,返回true;若隊列已滿,則返回false。比如:
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5);
boolean result = queue.offer(30);
if (result) {
System.out.println("元素添加成功");
} else {
System.out.println("隊列已滿,添加元素失敗");
}
offer(E e, long timeout, TimeUnit unit)
在指定的超時時間內嘗試將元素插入隊列。若在超時時間內成功插入,返回true;否則返回false。比如:
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5);
try {
boolean success = queue.offer(40, 2, TimeUnit.SECONDS);
if (success) {
System.out.println("元素在超時時間內添加成功");
} else {
System.out.println("在超時時間內未能添加元素,隊列可能已滿");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("線程被中斷,添加元素失敗");
}
(二)檢索元素
take()
從隊列中獲取并移除頭部元素。如果隊列為空,當前線程將被阻塞,直到有元素可用。
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5);
// 假設隊列中已有元素
try {
Integer element = queue.take();
System.out.println("取出的元素為: " + element);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("線程被中斷,獲取元素失敗");
}
poll(long timeout, TimeUnit unit)
檢索并移除隊列頭部元素。若在指定的超時時間內有元素可用,則返回該元素;若超時時間內仍無元素可用,則返回null。
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5);
try {
Integer element = queue.poll(1, TimeUnit.SECONDS);
if (element!= null) {
System.out.println("在超時時間內取出的元素為: " + element);
} else {
System.out.println("在超時時間內未獲取到元素,隊列可能為空");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("線程被中斷,獲取元素失敗");
}
三、多線程生產者-消費者
我們將模擬一個簡單的生產-消費場景,假設有一個消息隊列,多個生產者線程不斷向隊列中生產消息(這里簡化為隨機整數),多個消費者線程從隊列中獲取消息并進行處理(這里簡化為打印消息和線程名)。為了能夠有效結束,增加poisonPill參數。
首先是生產者:
public class Producer implements Runnable {
private final BlockingQueue<Integer> queue;
private final int poisonPill;
private final int poisonPillPerProducer;
public Producer(BlockingQueue<Integer> queue, int poisonPill, int poisonPillPerProducer) {
this.queue = queue;
this.poisonPill = poisonPill;
this.poisonPillPerProducer = poisonPillPerProducer;
}
@Override
public void run() {
try {
produceMessages();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void produceMessages() throws InterruptedException {
Random random = new Random();
for (int i = 0; i < 100; i++) {
int message = random.nextInt(100);
queue.put(message);
}
for (int j = 0; j < poisonPillPerProducer; j++) {
queue.put(poisonPill);
}
}
}
生產者構造函數接受一個BlockingQueue用于與消費者通信,還接受poisonPill和每個生產者應發送poisonPill值的數量。在produceMessages方法中,先生產100個隨機消息放入隊列,然后發送指定數量的毒丸。
接著是消費者:
public class Consumer implements Runnable {
private final BlockingQueue<Integer> queue;
private final int poisonPill;
public Consumer(BlockingQueue<Integer> queue, int poisonPill) {
this.queue = queue;
this.poisonPill = poisonPill;
}
@Override
public void run() {
try {
consumeMessages();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void consumeMessages() throws InterruptedException {
while (true) {
Integer message = queue.take();
if (message.equals(poisonPill)) {
return;
}
System.out.println(Thread.currentThread().getName() + " 消費消息: " + message);
}
}
}
消費者構造函數接受BlockingQueue和poisonPill值。在consumeMessages方法中,不斷從隊列獲取消息,如果是等于poisonPill則結束消費,否則打印消息和線程名。
最后是主程序類來啟動生產者和消費者線程:
final int BOUND = 10;
final int N_PRODUCERS = 3;
final int N_CONSUMERS = 2;
final int poisonPill = Integer.MAX_VALUE;
final int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS;
final int mod = N_CONSUMERS % N_PRODUCERS;
final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BOUND);
for (int i = 0; i < N_PRODUCERS; i++) {
new Thread(new Producer(queue, poisonPill, poisonPillPerProducer)).start();
}
for (int j = 0; j < N_CONSUMERS; j++) {
new Thread(new Consumer(queue, poisonPill)).start();
}
new Thread(new Producer(queue, poisonPill, poisonPillPerProducer + mod)).start();
在主程序中,定義了隊列容量、生產者和消費者數量,以及poisonPill值相關參數,創建了數量為10的有界BlockingQueue,啟動了指定數量的生產者和消費者線程。
當運行上述程序時,生產者線程會不斷向隊列中放入隨機整數,消費者線程會從隊列中取出并打印這些整數,同時每個消費者接收到poisonPill后會結束執行。
運行結果如下(因為用了隨機數,每次效果不同):
Thread-3 消費消息: 47
Thread-4 消費消息: 3
Thread-3 消費消息: 35
Thread-4 消費消息: 83
Thread-4 消費消息: 68
Thread-4 消費消息: 40
Thread-4 消費消息: 73
Thread-4 消費消息: 56
Thread-4 消費消息: 56
...
隨著生產和消費的進行,最終所有消費者線程在接收到poisonPill后停止。