成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

10分鐘搞定 Java 并發(fā)隊列好嗎?好的

開發(fā) 后端
在【并發(fā)系列】中,主要講解了 執(zhí)行者與線程池,同步工具,鎖 , 在分析源碼時,或多或少的提及到了「隊列」,隊列在 JUC 中也是多種多樣存在,所以本文就以「遠看」視角,幫助大家快速了解與區(qū)分這些看似「雜亂」的隊列。

前言

如果按照用途與特性進行粗略的劃分,JUC 包中包含的工具大體可以分為 6 類:

  1. 執(zhí)行者與線程池
  2. 并發(fā)隊列
  3. 同步工具
  4. 并發(fā)集合
  5. 原子變量

在【并發(fā)系列】中,主要講解了 執(zhí)行者與線程池,同步工具,鎖 , 在分析源碼時,或多或少的提及到了「隊列」,隊列在 JUC 中也是多種多樣存在,所以本文就以「遠看」視角,幫助大家快速了解與區(qū)分這些看似「雜亂」的隊列

并發(fā)隊列

Java 并發(fā)隊列按照實現(xiàn)方式來進行劃分可以分為 2 種:

  1. 阻塞隊列
  2. 非阻塞隊列

如果你已經(jīng)看完并發(fā)系列鎖的實現(xiàn),你已經(jīng)能夠知道他們實現(xiàn)的區(qū)別:

前者就是基于鎖實現(xiàn)的,后者則是基于 CAS 非阻塞算法實現(xiàn)的

常見的隊列有下面這幾種:

 

瞬間懵逼?看到這個沒有人性的圖想直接走人?客觀先別急,一會就柳暗花明了

當下你也許有個問題:

為什么會有這么多種隊列的存在?

鎖有應對各種情形的鎖,隊列也自然有應對各種情形的隊列了, 是不是也有點單一職責原則的意思呢?

所以我們要了解這些隊列到底是怎么設計的?以及用在了哪些地方?

先來看下圖

 

如果你在 IDE 中打開以上非阻塞隊列和阻塞隊列,查看其實現(xiàn)方法,你就會發(fā)現(xiàn),阻塞隊列較非阻塞隊列 額外支持兩種操作:

阻塞的插入

當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿

阻塞的移除

當隊列為空時,獲取元素的線程會阻塞,直到隊列變?yōu)榉强?/p>

綜合說明入隊/出隊操作,看似雜亂的方法,用一個表格就能概括了

 

拋出異常

  • 當隊列滿時,此時如果再向隊列中插入元素,會拋出 IllegalStateException (這很好理解)
  • 當隊列空時,此時如果再從隊列中獲取元素,會拋出 NoSuchElementException (這也很好理解)

返回特殊值

  • 當向隊列插入元素時,會返回元素是否插入成功,成功則返回 true
  • 當從隊列移除元素時,如果沒有則返回 null

一直阻塞

  • 當隊列滿時,如果生產者線程向隊列 put 元素,隊列會一直阻塞生產者線程,直到隊列可用或者響應中斷退出
  • 當隊列為空時,如果消費者線程 從隊列里面 take 元素,隊列會阻塞消費者線程,直到隊列不為空

關于阻塞,我們其實早在 并發(fā)編程之等待通知機制 就已經(jīng)充分說明過了,你還記得下面這張圖嗎?原理其實是一樣一樣滴

 

超時退出

和鎖一樣,因為有阻塞,為了靈活使用,就一定支持超時退出,阻塞時間達到超時時間,就會直接返回

至于為啥插入和移除這么多種單詞表示形式,我也不知道,為了方便記憶,只需要記住阻塞的方法形式即可:

單詞 put 和 take 字母 t 首位相連,一個放,一個拿

到這里你應該對 Java 并發(fā)隊列有了個初步的認識了,原來看似雜亂的方法貌似也有了規(guī)律。接下來就到了瘋狂串知識點的時刻了,借助前序章節(jié)的知識,分分鐘就理解全部隊列了

 

ArrayBlockingQueue

之前也說過,JDK中的命名還是很講究滴,一看這名字,底層就是數(shù)組實現(xiàn)了,是否有界,那就看在構造的時候是否需要指定 capacity 值了

填鴨式的說明也容易忘,這些都是哪看到的呢?在所有隊列的 Java docs 的第一段,一句話就概括了該隊列的主要特性,所以強烈建議大家自己在看源碼時,簡單瞄一眼 docs 開頭,心中就有多半個數(shù)了

 

在講 Java AQS隊列同步器以及ReentrantLock的應用 時我們介紹了公平鎖與非公平鎖的概念,ArrayBlockingQueue 也有同樣的概念,看它的構造方法,就有 ReentrantLock 來輔助實現(xiàn)

  1. public ArrayBlockingQueue(int capacity, boolean fair) { 
  2.     if (capacity <= 0) 
  3.         throw new IllegalArgumentException(); 
  4.     this.items = new Object[capacity]; 
  5.     lock = new ReentrantLock(fair); 
  6.     notEmpty = lock.newCondition(); 
  7.     notFull =  lock.newCondition(); 

默認情況下,依舊是不保證線程公平訪問隊列(公平與否是指阻塞的線程能否按照阻塞的先后順序訪問隊列,先阻塞線訪問,后阻塞后訪問)

到這我也要臨時問一個說過多次的面試送分題了:

為什么默認采用非公平鎖的方式?它較公平鎖方式有什么好處,又可能帶來哪些問題?

知道了以上內容,結合上面表格中的方法,ArrayBlockingQueue 就可以輕松過關了

 

和數(shù)組相對的自然是鏈表了

LinkedBlockingQueue

 

LinkedBlockingQueue 也算是一個有界阻塞隊列 ,從下面的構造函數(shù)中你也可以看出,該隊列的默認和最大長度為 Integer.MAX_VALUE ,這也就 docs 說 optionally-bounded 的原因了

  1. public LinkedBlockingQueue() { 
  2.     this(Integer.MAX_VALUE); 
  3.  
  4. public LinkedBlockingQueue(int capacity) { 
  5.   if (capacity <= 0) throw new IllegalArgumentException(); 
  6.   this.capacity = capacity; 
  7.   last = head = new Node<E>(null); 

正如 Java 集合一樣,鏈表形式的隊列,其存取效率要比數(shù)組形式的隊列高。但是在一些并發(fā)程序中,數(shù)組形式的隊列由于具有一定的可預測性,因此可以在某些場景中獲得更高的效率

看到 LinkedBlockingQueue 是不是也有些熟悉呢?為什么要使用線程池? 就已經(jīng)和它多次照面了

創(chuàng)建單個線程池

  1. public static ExecutorService newSingleThreadExecutor() { 
  2.     return new FinalizableDelegatedExecutorService 
  3.         (new ThreadPoolExecutor(1, 1, 
  4.                                 0L, TimeUnit.MILLISECONDS, 
  5.                                 new LinkedBlockingQueue<Runnable>())); 

創(chuàng)建固定個數(shù)線程池

  1. public static ExecutorService newFixedThreadPool(int nThreads) { 
  2.     return new ThreadPoolExecutor(nThreads, nThreads, 
  3.                                   0L, TimeUnit.MILLISECONDS, 
  4.                                   new LinkedBlockingQueue<Runnable>()); 

面試送分題又來了

使用 Executors 創(chuàng)建線程池很簡單,為什么大廠嚴格要求禁用這種創(chuàng)建方式呢?

PriorityBlockingQueue

PriorityBlockingQueue 是一個支持優(yōu)先級的無界的阻塞隊列,默認情況下采用自然順序升序排列,當然也有非默認情況自定義優(yōu)先級,需要排序,那自然要用到 Comparator 來定義排序規(guī)則了

 

可以定義優(yōu)先級,自然也就有相應的限制,以及使用的注意事項

  • 按照上圖說明,隊列中不允許存在 null 值,也不允許存在不能排序的元素
  • 對于排序值相同的元素,其序列是不保證的,但你可以繼續(xù)自定義其他可以區(qū)分出來優(yōu)先級的值,如果你有嚴格的優(yōu)先級區(qū)分,建議有更完善的比較規(guī)則,就像 Java docs 這樣
  1. class FIFOEntry<E extends Comparable<? super E>> 
  2.     implements Comparable<FIFOEntry<E>> { 
  3.   static final AtomicLong seq = new AtomicLong(0); 
  4.   final long seqNum; 
  5.   final E entry; 
  6.   public FIFOEntry(E entry) { 
  7.     seqNum = seq.getAndIncrement(); 
  8.     this.entry = entry; 
  9.   } 
  10.   public E getEntry() { return entry; } 
  11.   public int compareTo(FIFOEntry<E> other) { 
  12.     int res = entry.compareTo(other.entry); 
  13.     if (res == 0 && other.entry != this.entry) 
  14.       res = (seqNum < other.seqNum ? -1 : 1); 
  15.     return res; 
  16.   } 
  • 隊列容量是沒有上限的,但是如果插入的元素超過負載,有可能會引起OutOfMemory異常(這是肯定的),這也是為什么我們通常所說,隊列無界,心中有界
  • PriorityBlockingQueue 也有 put 方法,這是一個阻塞的方法,因為它是無界的,自然不會阻塞,所以就有了下面比較聰明的做法
  1. public void put(E e) { 
  2.     offer(e); // never need to block  請自行對照上面表格 
  • 可以給定初始容量,這個容量會按照一定的算法自動擴充
  1. // Default array capacity. 
  2. private static final int DEFAULT_INITIAL_CAPACITY = 11; 
  3.  
  4. public PriorityBlockingQueue() { 
  5.     this(DEFAULT_INITIAL_CAPACITY, null); 

這里默認的容量是 11,由于也是基于數(shù)組,那面試送分題又來了

你通常是怎樣定義容器/集合初始容量的?有哪些依據(jù)?

DelayQueue

DelayQueue 是一個支持延時獲取元素的無界阻塞隊列

  • 是否延時肯定是和某個時間(通常和當前時間) 進行比較
  • 比較過后還要進行排序,所以也是存在一定的優(yōu)先級

看到這也許覺得這有點和 PriorityBlockingQueue 很像,沒錯,DelayQueue 的內部也是使用 PriorityQueue

 

上圖綠色框線也告訴你,DelayQueue 隊列的元素必須要實現(xiàn) Depayed 接口:

 

所以從上圖可以看出使用 DelayQueue 非常簡單,只需要兩步:

實現(xiàn) getDelay() 方法,返回元素要延時多長時間

  1. public long getDelay(TimeUnit unit) { 
  2.    // 最好采用納秒形式,這樣更精確 
  3.     return unit.convert(time - now(), NANOSECONDS); 

實現(xiàn) compareTo() 方法,比較元素順序

  1. public int compareTo(Delayed other) { 
  2.     if (other == this) // compare zero if same object 
  3.         return 0; 
  4.     if (other instanceof ScheduledFutureTask) { 
  5.         ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; 
  6.         long diff = time - x.time
  7.         if (diff < 0) 
  8.             return -1; 
  9.         else if (diff > 0) 
  10.             return 1; 
  11.         else if (sequenceNumber < x.sequenceNumber) 
  12.             return -1; 
  13.         else 
  14.             return 1; 
  15.     } 
  16.     long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); 
  17.     return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; 

上面的代碼哪來的呢?如果你打開 ScheduledThreadPoolExecutor 里的 ScheduledFutureTask,你就看到了 (ScheduledThreadPoolExecutor 內部就是應用 DelayQueue)

所以綜合來說,下面兩種情況非常適合使用 DelayQueue

  • 緩存系統(tǒng)的設計:用 DelayQueue 保存緩存元素的有效期,使用一個線程循環(huán)查詢 DelayQueue,如果能從 DelayQueue 中獲取元素,說明緩存有效期到了
  • 定時任務調度:用 DelayQueue 保存當天會執(zhí)行的任務以及時間,如果能從 DelayQueue 中獲取元素,任務就可以開始執(zhí)行了。比如 TimerQueue 就是這樣實現(xiàn)的

SynchronousQueue

 

這是一個不存儲元素的阻塞隊列,不存儲元素還叫隊列?

 

沒錯,SynchronousQueue 直譯過來叫同步隊列,如果在隊列里面呆久了應該就算是“異步”了吧

所以使用它,每個put() 操作必須要等待一個 take() 操作,反之亦然,否則不能繼續(xù)添加元素

實際中怎么用呢?假如你需要兩個線程之間同步共享變量,如果不用 SynchronousQueue 你可能會選擇用 CountDownLatch 來完成,就像這樣:

  1. ExecutorService executor = Executors.newFixedThreadPool(2); 
  2. AtomicInteger sharedState = new AtomicInteger(); 
  3. CountDownLatch countDownLatch = new CountDownLatch(1); 
  4.  
  5.  
  6.  
  7. Runnable producer = () -> { 
  8.     Integer producedElement = ThreadLocalRandom 
  9.       .current() 
  10.       .nextInt(); 
  11.     sharedState.set(producedElement); 
  12.     countDownLatch.countDown(); 
  13. }; 
  14.  
  15.  
  16.  
  17. Runnable consumer = () -> { 
  18.     try { 
  19.         countDownLatch.await(); 
  20.         Integer consumedElement = sharedState.get(); 
  21.     } catch (InterruptedException ex) { 
  22.         ex.printStackTrace(); 
  23.     } 
  24. }; 

這點小事就用計數(shù)器來實現(xiàn),顯然很不合適,用 SynchronousQueue 改造一下,感覺瞬間就不一樣了

  1. ExecutorService executor = Executors.newFixedThreadPool(2); 
  2. SynchronousQueue<Integer> queue = new SynchronousQueue<>(); 
  3.  
  4. Runnable producer = () -> { 
  5.     Integer producedElement = ThreadLocalRandom 
  6.       .current() 
  7.       .nextInt(); 
  8.     try { 
  9.         queue.put(producedElement); 
  10.     } catch (InterruptedException ex) { 
  11.         ex.printStackTrace(); 
  12.     } 
  13. }; 
  14.  
  15. Runnable consumer = () -> { 
  16.     try { 
  17.         Integer consumedElement = queue.take(); 
  18.     } catch (InterruptedException ex) { 
  19.         ex.printStackTrace(); 
  20.     } 
  21. }; 

其實 Executors.newCachedThreadPool() 方法里面使用的就是 SynchronousQueue

  1. public static ExecutorService newCachedThreadPool() { 
  2.     return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 
  3.                                   60L, TimeUnit.SECONDS, 
  4.                                   new SynchronousQueue<Runnable>()); 

看到前面 LinkedBlockingQueue 用在 newSingleThreadExecutor 和newFixedThreadPool 上,而newCachedThreadPool 卻用 SynchronousQueue,這是為什么呢?

因為單線程池和固定線程池中,線程數(shù)量是有限的,因此提交的任務需要在LinkedBlockingQueue隊列中等待空余的線程;

而緩存線程池中,線程數(shù)量幾乎無限(上限為Integer.MAX_VALUE),因此提交的任務只需要在SynchronousQueue 隊列中同步移交給空余線程即可, 所以有時也會說SynchronousQueue 的吞吐量要高于 LinkedBlockingQueue 和 ArrayBlockingQueue

LinkedTransferQueue

簡單來說,TransferQueue提供了一個場所,生產者線程使用 transfer 方法傳入一些對象并阻塞,直至這些對象被消費者線程全部取出。

你有沒有覺得,剛剛介紹的 SynchronousQueue 是否很像一個容量為 0 的 TransferQueue。

但 LinkedTransferQueue 相比其他阻塞隊列多了三個方法

transfer(E e)如果當前有消費者正在等待消費元素,transfer 方法就可以直接將生產者傳入的元素立刻 transfer (傳輸) 給消費者;如果沒有消費者等待消費元素,那么 transfer 方法會把元素放到隊列的 tail(尾部)

節(jié)點,一直阻塞,直到該元素被消費者消費才返回

  • tryTransfer(E e)

tryTransfer,很顯然是一種嘗試,如果沒有消費者等待消費元素,則馬上返回 false ,程序不會阻塞

  • tryTransfer(E e, long timeout, TimeUnit unit)

帶有超時限制,嘗試將生產者傳入的元素 transfer 給消費者,如果超時時間到,還沒有消費者消費元素,則返回 false

你瞧,所有阻塞的方法都是一個套路:

  1. 阻塞方式
  2. 帶有 try 的非阻塞方式
  3. 帶有 try 和超時時間的非阻塞方式

看到這你也許感覺 LinkedTransferQueue 沒啥特點,其實它和其他阻塞隊列的差別還挺大的:

BlockingQueue 是如果隊列滿了,線程才會阻塞;但是 TransferQueue 是如果沒有消費元素,則會阻塞 (transfer 方法)

這也就應了 Doug Lea 說的那句話:

LinkedTransferQueue is actually a superset of ConcurrentLinkedQueue, SynchronousQueue (in “fair” mode), and unboundedLinkedBlockingQueues. And it’s made better by allowing you to mix and match those features as well as take advantage of higher-performance i mplementation techniques.

簡單翻譯:LinkedTransferQueue 是ConcurrentLinkedQueue, SynchronousQueue (在公平模式下), 無界的LinkedBlockingQueues等的超集; 允許你混合使用阻塞隊列的多種特性

所以,在合適的場景中,請盡量使用LinkedTransferQueue上面都看的是單向隊列 FIFO,接下來我們看看雙向隊列

LinkedBlockingDeque

LinkedBlockingDeque 是一個由鏈表結構組成的雙向阻塞隊列,凡是后綴為 Deque 的都是雙向隊列意思,后綴的發(fā)音為deck——/dek/, 剛接觸它時我以為是這個冰激凌的發(fā)音

 

所謂雙向隊列值得就是可以從隊列的兩端插入和移除元素。所以:

雙向隊列因為多了一個操作隊列的入口,在多線程同時入隊是,也就會減少一半的競爭

隊列有頭,有尾,因此它又比其他阻塞隊列多了幾個特殊的方法

  • addFirst
  • addLast
  • xxxxFirst
  • xxxxLast
  • ... ...

這么一看,雙向阻塞隊列確實很高效,

那雙向阻塞隊列應用在什么地方了呢?

不知道你是否聽過 “工作竊取”模式,看似不太厚道的一種方法,實則是高效利用線程的好辦法。下一篇文章,我們就來看看 ForkJoinPool 是如何應用 “工作竊取”模式的

總結

 

到這關于 Java 隊列(其實主要介紹了阻塞隊列)就快速的區(qū)分完了,將看似雜亂的方法做了分類整理,方便快速理解其用途,同時也說明了這些隊列的實際用途。相信你帶著更高的視角來閱讀源碼會更加輕松,最后也希望大家認真看兩個隊列的源碼實現(xiàn),在遇到隊列的問題,腦海中的畫面分分鐘就可以搞定了

本文轉載自微信公眾號「 日拱一兵」,可以通過以下二維碼關注。轉載本文請聯(lián)系 日拱一兵公眾號。

 

責任編輯:武曉燕 來源: 日拱一兵
相關推薦

2021-12-01 06:50:50

Docker底層原理

2021-01-07 08:05:20

JenkinsDevOps

2021-01-28 10:36:09

Redis擴縮容架構

2009-11-02 08:44:17

Windows 7快速安裝

2010-03-05 17:28:08

2011-02-21 17:48:35

vsFTPd

2021-07-15 06:43:11

Bash調試腳本

2020-10-29 08:28:42

Java NIO異步非阻塞

2011-05-26 09:03:17

JSONjavascript

2025-03-18 09:20:00

Go語言Golang

2009-11-26 11:19:52

NIS服務器

2013-09-13 14:08:01

2016-04-06 11:14:48

iOS相機自定義

2020-12-18 07:33:20

SpringSchedule組件

2023-11-30 10:21:48

虛擬列表虛擬列表工具庫

2021-02-17 00:08:53

Windows 10Windows微軟

2010-11-03 11:01:05

求職面試

2020-10-13 18:22:58

DevOps工具開發(fā)

2014-08-08 09:30:04

android scrollview

2021-04-23 09:50:41

topLinux命令
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 丝袜美腿一区二区三区 | 超碰97免费在线 | 午夜精品久久久久久久久久久久 | 理伦毛片| 女人精96xxx免费网站p | 国产在视频一区二区三区吞精 | 国产精品久久久久久久岛一牛影视 | www.青青草 | 免费在线一区二区 | 亚洲一区 中文字幕 | 伊人性伊人情综合网 | 羞羞的视频在线看 | 亚洲va欧美va人人爽午夜 | 亚洲精品丝袜日韩 | 欧美精品日韩精品国产精品 | 精品国产一区二区三区性色av | 国产在线观看不卡一区二区三区 | 黄色成人国产 | 亚洲成人综合社区 | 亚洲精品在线免费 | 亚洲成人精品视频 | 久久国产综合 | 国产视频久 | 亚洲成网站 | 精品国产aⅴ | 成人性视频免费网站 | 日本公妇乱淫xxxⅹ 国产在线不卡 | 一级黄色片在线看 | 国产成人99av超碰超爽 | 一区二区三区免费在线观看 | 午夜影院普通用户体验区 | 中文字幕免费视频 | 国产精品一区二区视频 | 天堂色网| 天天综合网天天综合 | 98久久| 男女视频在线看 | 国产精品视频中文字幕 | 久久久久久av | 欧美不卡在线 | 九九热精品视频 |