并發編程中一種經典的分而治之的思想?。?/h1>
作者個人研發的在高并發場景下,提供的簡單、穩定、可擴展的延遲消息隊列框架,具有精準的定時任務和延遲隊列處理功能。自開源半年多以來,已成功為十幾家中小型企業提供了精準定時調度方案,經受住了生產環境的考驗。為使更多童鞋受益,現給出開源框架地址:
https://github.com/sunshinelyz/mykit-delay
寫在前面
在JDK中,提供了這樣一種功能:它能夠將復雜的邏輯拆分成一個個簡單的邏輯來并行執行,待每個并行執行的邏輯執行完成后,再將各個結果進行匯總,得出最終的結果數據。有點像Hadoop中的MapReduce。
ForkJoin是由JDK1.7之后提供的多線程并發處理框架。ForkJoin框架的基本思想是分而治之。什么是分而治之?分而治之就是將一個復雜的計算,按照設定的閾值分解成多個計算,然后將各個計算結果進行匯總。相應的,ForkJoin將復雜的計算當做一個任務,而分解的多個計算則是當做一個個子任務來并行執行。
Java并發編程的發展
對于Java語言來說,生來就支持多線程并發編程,在并發編程領域也是在不斷發展的。Java在其發展過程中對并發編程的支持越來越完善也正好印證了這一點。
- Java 1 支持thread,synchronized。
- Java 5 引入了 thread pools, blocking queues, concurrent collections,locks, condition queues。
- Java 7 加入了fork-join庫。
- Java 8 加入了 parallel streams。
并發與并行
并發和并行在本質上還是有所區別的。
并發
并發指的是在同一時刻,只有一個線程能夠獲取到CPU執行任務,而多個線程被快速的輪換執行,這就使得在宏觀上具有多個線程同時執行的效果,并發不是真正的同時執行,并發可以使用下圖表示。
并行
并行指的是無論何時,多個線程都是在多個CPU核心上同時執行的,是真正的同時執行。
分治法
基本思想
把一個規模大的問題劃分為規模較小的子問題,然后分而治之,最后合并子問題的解得到原問題的解。
步驟
①分割原問題;
②求解子問題;
③合并子問題的解為原問題的解。
我們可以使用如下偽代碼來表示這個步驟。
- if(任務很?。﹞
- 直接計算得到結果
- }else{
- 分拆成N個子任務
- 調用子任務的fork()進行計算
- 調用子任務的join()合并計算結果
- }
在分治法中,子問題一般是相互獨立的,因此,經常通過遞歸調用算法來求解子問題。
典型應用
- 二分搜索
- 大整數乘法
- Strassen矩陣乘法
- 棋盤覆蓋
- 合并排序
- 快速排序
- 線性時間選擇
- 漢諾塔
ForkJoin并行處理框架
ForkJoin框架概述
Java 1.7 引入了一種新的并發框架—— Fork/Join Framework,主要用于實現“分而治之”的算法,特別是分治之后遞歸調用的函數。
ForkJoin框架的本質是一個用于并行執行任務的框架, 能夠把一個大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務的計算結果。在Java中,ForkJoin框架與ThreadPool共存,并不是要替換ThreadPool
其實,在Java 8中引入的并行流計算,內部就是采用的ForkJoinPool來實現的。例如,下面使用并行流實現打印數組元組的程序。
- public class SumArray {
- public static void main(String[] args){
- List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9);
- numberList.parallelStream().forEach(System.out::println);
- }
- }
這段代碼的背后就使用到了ForkJoinPool。
說到這里,可能有讀者會問:可以使用線程池的ThreadPoolExecutor來實現啊?為什么要使用ForkJoinPool啊?ForkJoinPool是個什么鬼啊?! 接下來,我們就來回答這個問題。
ForkJoin框架原理
ForkJoin框架是從jdk1.7中引入的新特性,它同ThreadPoolExecutor一樣,也實現了Executor和ExecutorService接口。它使用了一個無限隊列來保存需要執行的任務,而線程的數量則是通過構造函數傳入,如果沒有向構造函數中傳入指定的線程數量,那么當前計算機可用的CPU數量會被設置為線程數量作為默認值。
ForkJoinPool主要使用 分治法(Divide-and-Conquer Algorithm) 來解決問題。典型的應用比如快速排序算法。這里的要點在于,ForkJoinPool能夠使用相對較少的線程來處理大量的任務。比如要對1000萬個數據進行排序,那么會將這個任務分割成兩個500萬的排序任務和一個針對這兩組500萬數據的合并任務。以此類推,對于500萬的數據也會做出同樣的分割處理,到最后會設置一個閾值來規定當數據規模到多少時,停止這樣的分割處理。比如,當元素的數量小于10時,會停止分割,轉而使用插入排序對它們進行排序。那么到最后,所有的任務加起來會有大概200萬+個。問題的關鍵在于,對于一個任務而言,只有當它所有的子任務完成之后,它才能夠被執行。
所以當使用ThreadPoolExecutor時,使用分治法會存在問題,因為ThreadPoolExecutor中的線程無法向任務隊列中再添加一個任務并在等待該任務完成之后再繼續執行。而使用ForkJoinPool就能夠解決這個問題,它就能夠讓其中的線程創建新的任務,并掛起當前的任務,此時線程就能夠從隊列中選擇子任務執行。
那么使用ThreadPoolExecutor或者ForkJoinPool,性能上會有什么差異呢?
首先,使用ForkJoinPool能夠使用數量有限的線程來完成非常多的具有父子關系的任務,比如使用4個線程來完成超過200萬個任務。但是,使用ThreadPoolExecutor時,是不可能完成的,因為ThreadPoolExecutor中的Thread無法選擇優先執行子任務,需要完成200萬個具有父子關系的任務時,也需要200萬個線程,很顯然這是不可行的,也是很不合理的!!
工作竊取算法
假如我們需要做一個比較大的任務,我們可以把這個任務分割為若干互不依賴的子任務,為了減少線程間的競爭,于是把這些子任務分別放到不同的隊列里,并為每個隊列創建一個單獨的線程來執行隊列里的任務,線程和隊列一一對應,比如A線程負責處理A隊列里的任務。但是有的線程會先把自己隊列里的任務干完,而其他線程對應的隊列里還有任務等待處理。干完活的線程與其等著,不如去幫其他線程干活,于是它就去其他線程的隊列里竊取一個任務來執行。而在這時它們會訪問同一個隊列,所以為了減少竊取任務線程和被竊取任務線程之間的競爭,通常會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。
工作竊取算法的優點:充分利用線程進行并行計算,并減少了線程間的競爭。
工作竊取算法的缺點:在某些情況下還是存在競爭,比如雙端隊列里只有一個任務時。并且該算法會消耗更多的系統資源,比如創建多個線程和多個雙端隊列。
Fork/Join框架局限性:
對于Fork/Join框架而言,當一個任務正在等待它使用Join操作創建的子任務結束時,執行這個任務的工作線程查找其他未被執行的任務,并開始執行這些未被執行的任務,通過這種方式,線程充分利用它們的運行時間來提高應用程序的性能。為了實現這個目標,Fork/Join框架執行的任務有一些局限性。
(1)任務只能使用Fork和Join操作來進行同步機制,如果使用了其他同步機制,則在同步操作時,工作線程就不能執行其他任務了。比如,在Fork/Join框架中,使任務進行了睡眠,那么,在睡眠期間內,正在執行這個任務的工作線程將不會執行其他任務了。(2)在Fork/Join框架中,所拆分的任務不應該去執行IO操作,比如:讀寫數據文件。(3)任務不能拋出檢查異常,必須通過必要的代碼來出來這些異常。
ForkJoin框架的實現
ForkJoin框架中一些重要的類如下所示。
ForkJoinPool 框架中涉及的主要類如下所示。
1.ForkJoinPool類
實現了ForkJoin框架中的線程池,由類圖可以看出,ForkJoinPool類實現了線程池的Executor接口。
我們也可以從下圖中看出ForkJoinPool的類圖關系。
其中,可以使用Executors.newWorkStealPool()方法創建ForkJoinPool。
ForkJoinPool中提供了如下提交任務的方法。
- public void execute(ForkJoinTask<?> task)
- public void execute(Runnable task)
- public <T> T invoke(ForkJoinTask<T> task)
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
- public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
- public <T> ForkJoinTask<T> submit(Callable<T> task)
- public <T> ForkJoinTask<T> submit(Runnable task, T result)
- public ForkJoinTask<?> submit(Runnable task)
2.ForkJoinWorkerThread類
實現ForkJoin框架中的線程。
3.ForkJoinTask類
ForkJoinTask封裝了數據及其相應的計算,并且支持細粒度的數據并行。ForkJoinTask比線程要輕量,ForkJoinPool中少量工作線程能夠運行大量的ForkJoinTask。
ForkJoinTask類中主要包括兩個方法fork()和join(),分別實現任務的分拆與合并。
fork()方法類似于Thread.start(),但是它并不立即執行任務,而是將任務放入工作隊列中。跟Thread.join()方法不同,ForkJoinTask的join()方法并不簡單的阻塞線程,而是利用工作線程運行其他任務,當一個工作線程中調用join(),它將處理其他任務,直到注意到目標子任務已經完成。
我們可以使用下圖來表示這個過程。
ForkJoinTask有3個子類:
- RecursiveAction:無返回值的任務。
- RecursiveTask:有返回值的任務。
- CountedCompleter:完成任務后將觸發其他任務。
4.RecursiveTask類
有返回結果的ForkJoinTask實現Callable。
5.RecursiveAction類
無返回結果的ForkJoinTask實現Runnable。
6.CountedCompleter類
在任務完成執行后會觸發執行一個自定義的鉤子函數。
ForkJoin示例程序
- package io.binghe.concurrency.example.aqs;
- import lombok.extern.slf4j.Slf4j;
- import java.util.concurrent.ForkJoinPool;
- import java.util.concurrent.Future;
- import java.util.concurrent.RecursiveTask;
- @Slf4j
- public class ForkJoinTaskExample extends RecursiveTask<Integer> {
- public static final int threshold = 2;
- private int start;
- private int end;
- public ForkJoinTaskExample(int start, int end) {
- this.start = start;
- this.end = end;
- }
- @Override
- protected Integer compute() {
- int sum = 0;
- //如果任務足夠小就計算任務
- boolean canCompute = (end - start) <= threshold;
- if (canCompute) {
- for (int i = start; i <= end; i++) {
- sum += i;
- }
- } else {
- // 如果任務大于閾值,就分裂成兩個子任務計算
- int middle = (start + end) / 2;
- ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
- ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);
- // 執行子任務
- leftTask.fork();
- rightTask.fork();
- // 等待任務執行結束合并其結果
- int leftResult = leftTask.join();
- int rightResult = rightTask.join();
- // 合并子任務
- sum = leftResult + rightResult;
- }
- return sum;
- }
- public static void main(String[] args) {
- ForkJoinPool forkjoinPool = new ForkJoinPool();
- //生成一個計算任務,計算1+2+3+4
- ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);
- //執行一個任務
- Future<Integer> result = forkjoinPool.submit(task);
- try {
- log.info("result:{}", result.get());
- } catch (Exception e) {
- log.error("exception", e);
- }
- }
- }
本文轉載自微信公眾號「冰河技術」,可以通過以下二維碼關注。轉載本文請聯系冰河技術公眾號。