成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

一篇學(xué)會NioEventLoopGroup源碼解析

網(wǎng)絡(luò) 通信技術(shù)
這里我們會創(chuàng)建一個線程執(zhí)行器 ThreadPerTaskExecutor,使用默認(rèn)的線程工廠DefaultThreadFactory,線程執(zhí)行器會將一個任務(wù)包裝為一個 FastThreadLocalThread對象,然后調(diào)用start方法開啟一個新的線程執(zhí)行任務(wù)!

[[408806]]

本文轉(zhuǎn)載自微信公眾號「源碼學(xué)徒」,作者皇甫嗷嗷叫 。轉(zhuǎn)載本文請聯(lián)系源碼學(xué)徒公眾號。

NioEventLoopGroup的初始化源碼

一、尋找源碼的過程

我們前面說到過,NioEventLoopGroup我們可以近乎把它看作是一個線程池,該線程池會執(zhí)行一個一個的任務(wù),我們常用的NioEventLoopGroup大概有兩種,NioEventLoopGroup(int nThreads),NioEventLoopGroup(),即一個是指定線程數(shù)量的,一個是默認(rèn)指定線程數(shù)量的!這里我們以無參構(gòu)造為入口進(jìn)行分析!

  1. EventLoopGroup work = new NioEventLoopGroup(); 
  1. public NioEventLoopGroup() { 
  2.     this(0); 

當(dāng)我們使用默認(rèn)的數(shù)量的時候,他會傳遞一個0,我們繼續(xù)往下跟!

  1. public NioEventLoopGroup(int nThreads) { 
  2.     this(nThreads, (Executor) null); 

注意這里傳遞的參數(shù)是:0,null

  1. public NioEventLoopGroup(int nThreads, Executor executor) { 
  2.     //每個 group維護(hù)一個 SelectorProvider 主要用它獲取selector選擇器 
  3.     this(nThreads, executor, SelectorProvider.provider()); 

這里面多傳遞了一個 SelectorProvider.provider(),該方法是JDK NIO提供的API主要可以獲取NIO選擇器或者Channel,如下圖:

我們回歸主線繼續(xù)跟:

  1. public NioEventLoopGroup( 
  2.             int nThreads, Executor executor, final SelectorProvider selectorProvider) { 
  3.     this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE); 

這里多傳遞了一個 DefaultSelectStrategy選擇策略,這在后面講解NioEventLoop會具體講解,不做闡述!

  1. public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, 
  2.                              final SelectStrategyFactory selectStrategyFactory) { 
  3.     super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); 

我們會發(fā)現(xiàn)這里還會默認(rèn)傳遞一個拒絕策略RejectedExecutionHandlers.reject(),這個拒絕策略是干嘛的呢?

  1. @Override 
  2. public void rejected(Runnable task, SingleThreadEventExecutor executor) { 
  3.     throw new RejectedExecutionException(); 

我們得到一個結(jié)論,當(dāng)某些條件觸發(fā)這個拒絕策略,那么他會拋出一個RejectedExecutionException異常,具體什么時候觸發(fā),后續(xù)也會詳細(xì)說明,這里只需要記住就OK了!

我們繼續(xù)回到主線, 這里我們開始調(diào)用父類,還記得上一節(jié)課我們分析的NioEventLoopGroup的父類是誰嗎?沒錯是:MultithreadEventLoopGroup, 我們會進(jìn)入到MultithreadEventLoopGroup里面:

  1. protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { 
  2.     //線程數(shù)量為0時  使用默認(rèn)的cpu * 2 
  3.     super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); 

nThreads還記得是幾嗎?是0對不對,這里有個判斷,當(dāng)你的線程數(shù)量為0的時候,會使用DEFAULT_EVENT_LOOP_THREADS當(dāng)作線程池的數(shù)量,DEFAULT_EVENT_LOOP_THREADS是多少呢?

  1. DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); 

默認(rèn)是CPU的兩倍,所以我們現(xiàn)在得到一個結(jié)論,當(dāng)我們使用默認(rèn)的NioEventLoopGroup的時候,系統(tǒng)會默認(rèn)使用系統(tǒng)CPU核數(shù)*2當(dāng)作線程池的數(shù)量!

我們上一步傳遞過來的selectorProvider、拒絕策略、selectStrategyFactory被封裝為數(shù)組,并放在args[0],args[1], args[2]的位置!

我們繼續(xù)回到主線,這里又再次調(diào)用到父類,MultithreadEventExecutorGroup:

  1. protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { 
  2.     this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); 

注意,這里又再次多傳遞了一個參數(shù):DefaultEventExecutorChooserFactory一個選擇器工廠,這里會返回一個選擇器,他是DefaultEventExecutorChooserFactory類型的,具體分析后面會分析!我們繼續(xù)回到主線:

  1. protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { 
  2.     ..........后續(xù)源碼補(bǔ)充.......... 

到這里我們終于看到了一大段代碼,這里是EventLoopGroup的主要邏輯,我們逐行分析:

二、構(gòu)建線程執(zhí)行器

1. 源碼解析

  1. //newDefaultThreadFactory  構(gòu)建線程工廠 
  2. if (executor == null) { 
  3.     //創(chuàng)建并保存線程執(zhí)行器  執(zhí)行器  執(zhí)行任務(wù)的  默認(rèn)是  DefaultThreadFactory 線程池 
  4.     executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); 

這里會判斷我們傳入的執(zhí)行器是否為空,否則就新建一個,我們還記得executor是什么值嗎?是null,對不對,所以它會進(jìn)入到這里的邏輯我們進(jìn)入到newDefaultThreadFactory源碼里面看一下:

newDefaultThreadFactory()主要邏輯

  1. protected ThreadFactory newDefaultThreadFactory() { 
  2.     return new DefaultThreadFactory(getClass()); 

可以看到,這里向執(zhí)行器里面?zhèn)魅肓艘粋€ DefaultThreadFactory一個默認(rèn)的線程工廠!

ThreadPerTaskExecutor主要邏輯:

  1. /** 
  2.  * io.netty.util.concurrent.DefaultThreadFactory#newThread(java.lang.Runnable) 
  3.  * 
  4.  * 執(zhí)行任務(wù)  每次執(zhí)行任務(wù)都會創(chuàng)建一個線程實(shí)體對象 
  5.  * @param command 線程 
  6.  */ 
  7. @Override 
  8. public void execute(Runnable command) { 
  9.     //執(zhí)行任務(wù) 
  10.     threadFactory.newThread(command).start(); 

我們發(fā)現(xiàn),這里調(diào)用了一個我們傳入的線程工廠,創(chuàng)建了一個新的線程并調(diào)用start方法啟動了起來,那么他是如何創(chuàng)建的呢? 我們進(jìn)入到newThread源碼里面查看,由于我們默認(rèn)使用的線程工廠是 DefaultThreadFactory, 所以,我們會進(jìn)入到 DefaultThreadFactory#newThread

  1. @Override 
  2. public Thread newThread(Runnable r) { 
  3.     //創(chuàng)建一個線程 每次執(zhí)行任務(wù)的時候都會創(chuàng)建一個線程實(shí)體 
  4.     Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet()); 
  5.     try { 
  6.         if (t.isDaemon() != daemon) { 
  7.             t.setDaemon(daemon); 
  8.         } 
  9.  
  10.         if (t.getPriority() != priority) { 
  11.             t.setPriority(priority); 
  12.         } 
  13.     } catch (Exception ignored) { 
  14.         // Doesn't matter even if failed to set
  15.     } 
  16.     return t; 

這里沒有太多的操作,只是會將一個 Runnable封裝為一個 Thread進(jìn)行返回,我們重點(diǎn)關(guān)注一下這個Thread,它和我們傳統(tǒng)使用的Thread是一樣的嗎? 我們跟進(jìn)到 newThread方法看一下:

  1. protected Thread newThread(Runnable r, String name) { 
  2.     //Netty自己封裝的線程 
  3.     return new FastThreadLocalThread(threadGroup, r, name); 

邏輯很簡單,就是將一個Thread包裝為Netty自定義的 FastThreadLocalThread,至于為什么,我們暫時不往下多做解釋,后續(xù)章節(jié)會很詳細(xì)的解釋它!

2. 線程執(zhí)行器總結(jié)

這里我們會創(chuàng)建一個線程執(zhí)行器 ThreadPerTaskExecutor,使用默認(rèn)的線程工廠DefaultThreadFactory,線程執(zhí)行器會將一個任務(wù)包裝為一個 FastThreadLocalThread對象,然后調(diào)用start方法開啟一個新的線程執(zhí)行任務(wù)!

三、創(chuàng)建對應(yīng)數(shù)量的執(zhí)行器

  1. //創(chuàng)建執(zhí)行器數(shù)組  數(shù)量和預(yù)設(shè)線程數(shù)量一致 
  2. children = new EventExecutor[nThreads]; 
  3.  
  4. for (int i = 0; i < nThreads; i ++) { 
  5.     boolean success = false
  6.     try { 
  7.         //創(chuàng)建執(zhí)行器  開始創(chuàng)建執(zhí)行器   這里的執(zhí)行機(jī)估計(jì)就會EventLoop  是NioEventLoop 
  8.         children[i] = newChild(executor, args); 
  9.         success = true
  10.     } catch (Exception e) { 
  11.         .....省略不必要代碼 
  12.     } finally { 
  13.         .....省略不必要代碼 
  14.     } 

1. 源碼解析

  1. children = new EventExecutor[nThreads]; 

首先他會創(chuàng)建一個空的EventExecutor執(zhí)行器數(shù)組,然后遍歷填充!

還記得 nThreads是幾嗎? 默認(rèn)是CPU*2的大小,所以這里會創(chuàng)建 CPU * 2數(shù)量的執(zhí)行器! 我們發(fā)現(xiàn),for循環(huán)中填充的主要邏輯是newChild,所以,我們進(jìn)入到 newChild方法, 這里提示一點(diǎn),我們創(chuàng)建的Group對象是一個什么對象? 是NioEventLoopGroup對象對不對,所以我們這里會進(jìn)入到 NioEventLoopGroup#newChild方法:

  1. @Override 
  2. protected EventLoop newChild(Executor executor, Object... args) throws Exception { 
  3.     EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null
  4.     return new NioEventLoop(this, executor, (SelectorProvider) args[0], 
  5.                             ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory); 

我們傳遞過來 的args長度為3,前面做過解析 :

args[0]為 selectorProvider、args[1]為拒絕策略、args[2]為selectStrategyFactory

所以 queueFactory為null, 然后我們再重點(diǎn)關(guān)注 NioEventLoop對象,可以看出,newChild方法返回的是 NioEventLoop,那么我們初步就可以確定,EventExecutor數(shù)組里面存在的是NioEventLoop對象!至此,我們就不深究了,NioEventLoop的初始化源碼分析我會放到下一節(jié)課分析,這里我們可以確定一件事, EventExecutor數(shù)組里面存在的是NioEventLoop對象!我們繼續(xù)回到主線:

2. 執(zhí)行器數(shù)組總結(jié)

for循環(huán)完畢之后,此時的EventExecutor[nThreads];數(shù)組就被填充滿了,里面的每一個元素都是NioEventLoop對象,每一個NioEventLoop對象都包含一個 ThreadPerTaskExecutor線程執(zhí)行器對象!

四、創(chuàng)建一個執(zhí)行器選擇器

1. 源碼解析

  1. chooser = chooserFactory.newChooser(children); 

還記得 chooserFactory是什么類型的嗎? 是DefaultEventExecutorChooserFactory類型的,忘了的可以往上翻一下尋找源碼的過程中的代碼或者調(diào)試一下!

我們進(jìn)入到 DefaultEventExecutorChooserFactory#newChooser 源碼邏輯中,并傳入剛剛我們循環(huán)填充好的數(shù)組:

  1. @Override 
  2. public EventExecutorChooser newChooser(EventExecutor[] executors) { 
  3.     //判斷2的冪 isPowerOfTwo 
  4.     if (isPowerOfTwo(executors.length)) { 
  5.         return new PowerOfTwoEventExecutorChooser(executors); 
  6.     } else { 
  7.         //簡單的 
  8.         return new GenericEventExecutorChooser(executors); 
  9.     } 

可以看到,這里似乎有兩種情況,返回的是不同的策略對象,當(dāng)你的數(shù)組長度是2的冪等次方的時候,返回的是 PowerOfTwoEventExecutorChooser對象,否則返回 GenericEventExecutorChooser對象,我們就兩種情況全部分析一下:

I、PowerOfTwoEventExecutorChooser

  1. PowerOfTwoEventExecutorChooser(EventExecutor[] executors) { 
  2.     this.executors = executors; 
  3.  
  4. @Override 
  5. public EventExecutor next() { 
  6.     //2的冪等性  實(shí)現(xiàn)這個  也能實(shí)現(xiàn)循環(huán)取數(shù)的 
  7.     //executors   就是NioEventLoop數(shù)組  按照2次冪求本次獲取的EventLoop是個啥 
  8.     return executors[idx.getAndIncrement() & executors.length - 1]; 

這段代碼的主要邏輯是,取一個自增的CAS類,與數(shù)組長度做&運(yùn)算,最終會出現(xiàn)循環(huán)取數(shù)的結(jié)果:

從上面的圖片可以基本看出來, 該功能可以實(shí)現(xiàn)一個循環(huán)取數(shù)的功能,每次達(dá)到數(shù)組的尾部部都會重新回到頭部重新獲取!

代碼案例:

  1. public static void main(String[] args) { 
  2.     String[] strings = {"第一個""第二個""第三個""第四個"}; 
  3.     AtomicInteger idx = new AtomicInteger(); 
  4.     for (int i = 0; i < 9; i++) { 
  5.         System.out.println(strings[idx.getAndIncrement() & strings.length -1]); 
  6.     } 
  7.  

結(jié)果集

  1. 第一個 
  2. 第二個 
  3. 第三個 
  4. 第四個 
  5. 第一個 
  6. 第二個 
  7. 第三個 
  8. 第四個 
  9. 第一個 

II、GenericEventExecutorChooser

當(dāng)你的線程數(shù)量不是2的冪次方的時候,會走一個通用的選擇器,具體實(shí)現(xiàn)源碼如下:

  1. GenericEventExecutorChooser(EventExecutor[] executors) { 
  2.     this.executors = executors; 
  3.  
  4. @Override 
  5. public EventExecutor next() { 
  6.     //自增  取模   以達(dá)到循環(huán)的目的 
  7.     //假設(shè)executors 長度為5  那么 不斷的循環(huán)就會不斷的得到 0 1 2 3 4  0 1 2 3 4。。。 
  8.     return executors[Math.abs(idx.getAndIncrement() % executors.length)]; 

這個代碼就不用了我做演示了吧,他的功能和上面那個功能是一樣的能 能夠達(dá)到一個循環(huán)取數(shù)的功能

思考

為什么Netty要分為兩個策略類來實(shí)現(xiàn)呢,直接用第二種不行嗎?

Netty官方對性能的要求達(dá)到了極致,大家要知道位運(yùn)算速度要高于直接取模運(yùn)算的,所以Netty官方即使是這一點(diǎn)也做了一個優(yōu)化!

2. 執(zhí)行器選擇器總結(jié)

我們通過上述可以了解到,這里會通過一個選擇器工廠創(chuàng)建一個選擇器,并保存在NioEvenetLoopGroup中,調(diào)用該選擇器的next方法會返回一個NioEventLoop對象,其中的獲取方式是不斷的循環(huán),依次獲取NioEventLoop對象,這也是一個NioEventLoop對SocketChannel為一對多的基礎(chǔ)!這都是后話!

NioEventLoopGroup源碼總結(jié)

 

  1. 創(chuàng)建一個線程執(zhí)行器,當(dāng)調(diào)用該線程執(zhí)行器的execute方法的時候,會講一個Runable對象包裝為Thread對象,再將Thread對象包裝為FastThreadLocalThread對象,然后啟動起來! 簡單來說,每調(diào)用一次execute方法,都去創(chuàng)建并啟動一條新線程執(zhí)行任務(wù)!
  2. 創(chuàng)建一個執(zhí)行器數(shù)組,數(shù)組長度與我們傳遞的數(shù)量有關(guān),默認(rèn)為CPU*2個數(shù)量,然后再循環(huán)填充這個空數(shù)組,數(shù)組里面的元素是一個NioEventLoop對象,每一個NioEventLoop對會持有一個線程執(zhí)行器的引用!
  3. 創(chuàng)建一個執(zhí)行器選擇器,調(diào)用該執(zhí)行器選擇器的next方法可以返回一個NioEventLoop對象,內(nèi)部是進(jìn)行循環(huán)取數(shù)的,每一個NioEventLoop都可能會被多次獲取!

 

責(zé)任編輯:武曉燕 來源: 源碼學(xué)徒
相關(guān)推薦

2021-07-12 22:50:29

Caffeine數(shù)據(jù)結(jié)構(gòu)

2022-06-09 08:41:17

Go網(wǎng)絡(luò)庫Gnet

2022-01-02 08:43:46

Python

2021-07-02 09:45:29

MySQL InnoDB數(shù)據(jù)

2021-07-06 08:59:18

抽象工廠模式

2023-01-03 08:31:54

Spring讀取器配置

2021-07-05 22:11:38

MySQL體系架構(gòu)

2023-11-28 08:29:31

Rust內(nèi)存布局

2022-08-26 09:29:01

Kubernetes策略Master

2021-05-11 08:54:59

建造者模式設(shè)計(jì)

2022-08-23 08:00:59

磁盤性能網(wǎng)絡(luò)

2022-02-07 11:01:23

ZooKeeper

2021-12-07 08:50:40

字母區(qū)間字符串

2021-07-29 07:55:20

React實(shí)踐代碼

2021-09-06 06:31:40

理解動態(tài)規(guī)劃

2021-09-14 07:26:26

組合問題循環(huán)

2022-06-30 22:53:18

數(shù)據(jù)結(jié)構(gòu)算法

2022-03-14 08:16:00

Java程序開發(fā)

2021-08-01 07:19:16

語言OpenrestyNginx

2021-09-13 09:00:03

istio安裝部署
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號

主站蜘蛛池模板: 成人精品国产 | 成人免费大片黄在线播放 | 荷兰欧美一级毛片 | 久久久涩 | 91精品久久久久久久久99蜜臂 | 国产黄色大片 | 在线观看视频福利 | 成人一区二区三区在线 | 蜜臀网 | 亚洲高清在线免费观看 | 午夜成人免费电影 | 91精品国产综合久久福利软件 | 中文字幕电影在线观看 | 91精品国产综合久久婷婷香蕉 | 久久高清国产视频 | 欧美乱做爰xxxⅹ久久久 | 在线观看中文视频 | 亚洲在线视频 | 在线观看亚洲一区二区 | 中文字幕视频在线免费 | 欧美一区二区三区的 | 性一区| 亚洲视频在线看 | 日韩视频精品在线 | 91精品在线播放 | av中文字幕在线 | 亚洲欧美在线视频 | 精品日韩一区 | 精品成人| 午夜网站视频 | 夜夜av| 久久精品亚洲 | 日韩免费在线 | 国产一区久久久 | 国产高清一区二区三区 | 中文字幕在线免费观看 | 中文字幕av一区 | 久久久久久国产 | 久久69精品久久久久久久电影好 | 欧美激情精品久久久久 | 天天夜夜操 |