av激情亚洲男人的天堂国语,日韩欧美精品一中文字幕,无码av一区二区三区无码,国产又色又爽又刺激的a片,国产又色又爽又刺激的a片

并發(fā)編程中一種經(jīng)典的分而治之的思想!!

作者個人研發(fā)的在高并發(fā)場景下,提供的簡單、穩(wěn)定、可擴(kuò)展的延遲消息隊列框架,具有精準(zhǔn)的定時任務(wù)和延遲隊列處理功能。自開源半年多以來,已成功為十幾家中小型企業(yè)提供了精準(zhǔn)定時調(diào)度方案,經(jīng)受住了生產(chǎn)環(huán)境的考驗。為使更多童鞋受益,現(xiàn)給出開源框架地址:

讓客戶滿意是我們工作的目標(biāo),不斷超越客戶的期望值來自于我們對這個行業(yè)的熱愛。我們立志把好的技術(shù)通過有效、簡單的方式提供給客戶,將通過不懈努力成為客戶在信息化領(lǐng)域值得信任、有價值的長期合作伙伴,公司提供的服務(wù)項目有:主機(jī)域名、網(wǎng)站空間、營銷軟件、網(wǎng)站建設(shè)、鹿寨網(wǎng)站維護(hù)、網(wǎng)站推廣。

https://github.com/sunshinelyz/mykit-delay

寫在前面

在JDK中,提供了這樣一種功能:它能夠?qū)?fù)雜的邏輯拆分成一個個簡單的邏輯來并行執(zhí)行,待每個并行執(zhí)行的邏輯執(zhí)行完成后,再將各個結(jié)果進(jìn)行匯總,得出最終的結(jié)果數(shù)據(jù)。有點像Hadoop中的MapReduce。

ForkJoin是由JDK1.7之后提供的多線程并發(fā)處理框架。ForkJoin框架的基本思想是分而治之。什么是分而治之?分而治之就是將一個復(fù)雜的計算,按照設(shè)定的閾值分解成多個計算,然后將各個計算結(jié)果進(jìn)行匯總。相應(yīng)的,F(xiàn)orkJoin將復(fù)雜的計算當(dāng)做一個任務(wù),而分解的多個計算則是當(dāng)做一個個子任務(wù)來并行執(zhí)行。

Java并發(fā)編程的發(fā)展

對于Java語言來說,生來就支持多線程并發(fā)編程,在并發(fā)編程領(lǐng)域也是在不斷發(fā)展的。Java在其發(fā)展過程中對并發(fā)編程的支持越來越完善也正好印證了這一點。

  • Java 1 支持thread,synchronized。
  • Java 5 引入了 thread pools, blocking queues, concurrent collections,locks, condition queues。
  • Java 7 加入了fork-join庫。
  • Java 8 加入了 parallel streams。

并發(fā)與并行

并發(fā)和并行在本質(zhì)上還是有所區(qū)別的。

并發(fā)

并發(fā)指的是在同一時刻,只有一個線程能夠獲取到CPU執(zhí)行任務(wù),而多個線程被快速的輪換執(zhí)行,這就使得在宏觀上具有多個線程同時執(zhí)行的效果,并發(fā)不是真正的同時執(zhí)行,并發(fā)可以使用下圖表示。

并行

并行指的是無論何時,多個線程都是在多個CPU核心上同時執(zhí)行的,是真正的同時執(zhí)行。

分治法

基本思想

把一個規(guī)模大的問題劃分為規(guī)模較小的子問題,然后分而治之,最后合并子問題的解得到原問題的解。

步驟

①分割原問題;

②求解子問題;

③合并子問題的解為原問題的解。

我們可以使用如下偽代碼來表示這個步驟。

 
 
 
 
  1. if(任務(wù)很?。﹞ 
  2.     直接計算得到結(jié)果 
  3. }else{ 
  4.     分拆成N個子任務(wù) 
  5.     調(diào)用子任務(wù)的fork()進(jìn)行計算 
  6.     調(diào)用子任務(wù)的join()合并計算結(jié)果 

在分治法中,子問題一般是相互獨立的,因此,經(jīng)常通過遞歸調(diào)用算法來求解子問題。

典型應(yīng)用

  • 二分搜索
  • 大整數(shù)乘法
  • Strassen矩陣乘法
  • 棋盤覆蓋
  • 合并排序
  • 快速排序
  • 線性時間選擇
  • 漢諾塔

ForkJoin并行處理框架

ForkJoin框架概述

Java 1.7 引入了一種新的并發(fā)框架—— Fork/Join Framework,主要用于實現(xiàn)“分而治之”的算法,特別是分治之后遞歸調(diào)用的函數(shù)。

ForkJoin框架的本質(zhì)是一個用于并行執(zhí)行任務(wù)的框架, 能夠把一個大任務(wù)分割成若干個小任務(wù),最終匯總每個小任務(wù)結(jié)果后得到大任務(wù)的計算結(jié)果。在Java中,F(xiàn)orkJoin框架與ThreadPool共存,并不是要替換ThreadPool

其實,在Java 8中引入的并行流計算,內(nèi)部就是采用的ForkJoinPool來實現(xiàn)的。例如,下面使用并行流實現(xiàn)打印數(shù)組元組的程序。

 
 
 
 
  1. public class SumArray { 
  2.     public static void main(String[] args){ 
  3.         List numberList = Arrays.asList(1,2,3,4,5,6,7,8,9); 
  4.         numberList.parallelStream().forEach(System.out::println); 
  5.     } 

這段代碼的背后就使用到了ForkJoinPool。

說到這里,可能有讀者會問:可以使用線程池的ThreadPoolExecutor來實現(xiàn)啊?為什么要使用ForkJoinPool啊?ForkJoinPool是個什么鬼啊?! 接下來,我們就來回答這個問題。

ForkJoin框架原理

ForkJoin框架是從jdk1.7中引入的新特性,它同ThreadPoolExecutor一樣,也實現(xiàn)了Executor和ExecutorService接口。它使用了一個無限隊列來保存需要執(zhí)行的任務(wù),而線程的數(shù)量則是通過構(gòu)造函數(shù)傳入,如果沒有向構(gòu)造函數(shù)中傳入指定的線程數(shù)量,那么當(dāng)前計算機(jī)可用的CPU數(shù)量會被設(shè)置為線程數(shù)量作為默認(rèn)值。

ForkJoinPool主要使用 分治法(Divide-and-Conquer Algorithm) 來解決問題。典型的應(yīng)用比如快速排序算法。這里的要點在于,F(xiàn)orkJoinPool能夠使用相對較少的線程來處理大量的任務(wù)。比如要對1000萬個數(shù)據(jù)進(jìn)行排序,那么會將這個任務(wù)分割成兩個500萬的排序任務(wù)和一個針對這兩組500萬數(shù)據(jù)的合并任務(wù)。以此類推,對于500萬的數(shù)據(jù)也會做出同樣的分割處理,到最后會設(shè)置一個閾值來規(guī)定當(dāng)數(shù)據(jù)規(guī)模到多少時,停止這樣的分割處理。比如,當(dāng)元素的數(shù)量小于10時,會停止分割,轉(zhuǎn)而使用插入排序?qū)λ鼈冞M(jìn)行排序。那么到最后,所有的任務(wù)加起來會有大概200萬+個。問題的關(guān)鍵在于,對于一個任務(wù)而言,只有當(dāng)它所有的子任務(wù)完成之后,它才能夠被執(zhí)行。

所以當(dāng)使用ThreadPoolExecutor時,使用分治法會存在問題,因為ThreadPoolExecutor中的線程無法向任務(wù)隊列中再添加一個任務(wù)并在等待該任務(wù)完成之后再繼續(xù)執(zhí)行。而使用ForkJoinPool就能夠解決這個問題,它就能夠讓其中的線程創(chuàng)建新的任務(wù),并掛起當(dāng)前的任務(wù),此時線程就能夠從隊列中選擇子任務(wù)執(zhí)行。

那么使用ThreadPoolExecutor或者ForkJoinPool,性能上會有什么差異呢?

首先,使用ForkJoinPool能夠使用數(shù)量有限的線程來完成非常多的具有父子關(guān)系的任務(wù),比如使用4個線程來完成超過200萬個任務(wù)。但是,使用ThreadPoolExecutor時,是不可能完成的,因為ThreadPoolExecutor中的Thread無法選擇優(yōu)先執(zhí)行子任務(wù),需要完成200萬個具有父子關(guān)系的任務(wù)時,也需要200萬個線程,很顯然這是不可行的,也是很不合理的!!

工作竊取算法

假如我們需要做一個比較大的任務(wù),我們可以把這個任務(wù)分割為若干互不依賴的子任務(wù),為了減少線程間的競爭,于是把這些子任務(wù)分別放到不同的隊列里,并為每個隊列創(chuàng)建一個單獨的線程來執(zhí)行隊列里的任務(wù),線程和隊列一一對應(yīng),比如A線程負(fù)責(zé)處理A隊列里的任務(wù)。但是有的線程會先把自己隊列里的任務(wù)干完,而其他線程對應(yīng)的隊列里還有任務(wù)等待處理。干完活的線程與其等著,不如去幫其他線程干活,于是它就去其他線程的隊列里竊取一個任務(wù)來執(zhí)行。而在這時它們會訪問同一個隊列,所以為了減少竊取任務(wù)線程和被竊取任務(wù)線程之間的競爭,通常會使用雙端隊列,被竊取任務(wù)線程永遠(yuǎn)從雙端隊列的頭部拿任務(wù)執(zhí)行,而竊取任務(wù)的線程永遠(yuǎn)從雙端隊列的尾部拿任務(wù)執(zhí)行。

工作竊取算法的優(yōu)點:充分利用線程進(jìn)行并行計算,并減少了線程間的競爭。

工作竊取算法的缺點:在某些情況下還是存在競爭,比如雙端隊列里只有一個任務(wù)時。并且該算法會消耗更多的系統(tǒng)資源,比如創(chuàng)建多個線程和多個雙端隊列。

Fork/Join框架局限性:

對于Fork/Join框架而言,當(dāng)一個任務(wù)正在等待它使用Join操作創(chuàng)建的子任務(wù)結(jié)束時,執(zhí)行這個任務(wù)的工作線程查找其他未被執(zhí)行的任務(wù),并開始執(zhí)行這些未被執(zhí)行的任務(wù),通過這種方式,線程充分利用它們的運行時間來提高應(yīng)用程序的性能。為了實現(xiàn)這個目標(biāo),F(xiàn)ork/Join框架執(zhí)行的任務(wù)有一些局限性。

(1)任務(wù)只能使用Fork和Join操作來進(jìn)行同步機(jī)制,如果使用了其他同步機(jī)制,則在同步操作時,工作線程就不能執(zhí)行其他任務(wù)了。比如,在Fork/Join框架中,使任務(wù)進(jìn)行了睡眠,那么,在睡眠期間內(nèi),正在執(zhí)行這個任務(wù)的工作線程將不會執(zhí)行其他任務(wù)了。(2)在Fork/Join框架中,所拆分的任務(wù)不應(yīng)該去執(zhí)行IO操作,比如:讀寫數(shù)據(jù)文件。(3)任務(wù)不能拋出檢查異常,必須通過必要的代碼來出來這些異常。

ForkJoin框架的實現(xiàn)

ForkJoin框架中一些重要的類如下所示。

ForkJoinPool 框架中涉及的主要類如下所示。

1.ForkJoinPool類

實現(xiàn)了ForkJoin框架中的線程池,由類圖可以看出,F(xiàn)orkJoinPool類實現(xiàn)了線程池的Executor接口。

我們也可以從下圖中看出ForkJoinPool的類圖關(guān)系。

其中,可以使用Executors.newWorkStealPool()方法創(chuàng)建ForkJoinPool。

ForkJoinPool中提供了如下提交任務(wù)的方法。

 
 
 
 
  1. public void execute(ForkJoinTask task) 
  2. public void execute(Runnable task) 
  3. public  T invoke(ForkJoinTask task) 
  4. public  List> invokeAll(Collection> tasks)  
  5. public  ForkJoinTask submit(ForkJoinTask task) 
  6. public  ForkJoinTask submit(Callable task) 
  7. public  ForkJoinTask submit(Runnable task, T result) 
  8. public ForkJoinTask submit(Runnable task) 

2.ForkJoinWorkerThread類

實現(xiàn)ForkJoin框架中的線程。

3.ForkJoinTask類

ForkJoinTask封裝了數(shù)據(jù)及其相應(yīng)的計算,并且支持細(xì)粒度的數(shù)據(jù)并行。ForkJoinTask比線程要輕量,F(xiàn)orkJoinPool中少量工作線程能夠運行大量的ForkJoinTask。

ForkJoinTask類中主要包括兩個方法fork()和join(),分別實現(xiàn)任務(wù)的分拆與合并。

fork()方法類似于Thread.start(),但是它并不立即執(zhí)行任務(wù),而是將任務(wù)放入工作隊列中。跟Thread.join()方法不同,F(xiàn)orkJoinTask的join()方法并不簡單的阻塞線程,而是利用工作線程運行其他任務(wù),當(dāng)一個工作線程中調(diào)用join(),它將處理其他任務(wù),直到注意到目標(biāo)子任務(wù)已經(jīng)完成。

我們可以使用下圖來表示這個過程。

ForkJoinTask有3個子類:

  • RecursiveAction:無返回值的任務(wù)。
  • RecursiveTask:有返回值的任務(wù)。
  • CountedCompleter:完成任務(wù)后將觸發(fā)其他任務(wù)。

4.RecursiveTask類

有返回結(jié)果的ForkJoinTask實現(xiàn)Callable。

5.RecursiveAction類

無返回結(jié)果的ForkJoinTask實現(xiàn)Runnable。

6.CountedCompleter類

在任務(wù)完成執(zhí)行后會觸發(fā)執(zhí)行一個自定義的鉤子函數(shù)。

ForkJoin示例程序

 
 
 
 
  1. package io.binghe.concurrency.example.aqs; 
  2.   
  3. import lombok.extern.slf4j.Slf4j; 
  4. import java.util.concurrent.ForkJoinPool; 
  5. import java.util.concurrent.Future; 
  6. import java.util.concurrent.RecursiveTask; 
  7. @Slf4j 
  8. public class ForkJoinTaskExample extends RecursiveTask { 
  9.     public static final int threshold = 2; 
  10.     private int start; 
  11.     private int end; 
  12.     public ForkJoinTaskExample(int start, int end) { 
  13.         this.start = start; 
  14.         this.end = end; 
  15.     } 
  16.     @Override 
  17.     protected Integer compute() { 
  18.         int sum = 0; 
  19.         //如果任務(wù)足夠小就計算任務(wù) 
  20.         boolean canCompute = (end - start) <= threshold; 
  21.         if (canCompute) { 
  22.             for (int i = start; i <= end; i++) { 
  23.                 sum += i; 
  24.             } 
  25.         } else { 
  26.             // 如果任務(wù)大于閾值,就分裂成兩個子任務(wù)計算 
  27.             int middle = (start + end) / 2; 
  28.             ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle); 
  29.             ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end); 
  30.   
  31.             // 執(zhí)行子任務(wù) 
  32.             leftTask.fork(); 
  33.             rightTask.fork(); 
  34.   
  35.             // 等待任務(wù)執(zhí)行結(jié)束合并其結(jié)果 
  36.             int leftResult = leftTask.join(); 
  37.             int rightResult = rightTask.join(); 
  38.   
  39.             // 合并子任務(wù) 
  40.             sum = leftResult + rightResult; 
  41.         } 
  42.         return sum; 
  43.     } 
  44.     public static void main(String[] args) { 
  45.         ForkJoinPool forkjoinPool = new ForkJoinPool(); 
  46.   
  47.         //生成一個計算任務(wù),計算1+2+3+4 
  48.         ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100); 
  49.   
  50.         //執(zhí)行一個任務(wù) 
  51.         Future result = forkjoinPool.submit(task); 
  52.   
  53.         try { 
  54.             log.info("result:{}", result.get()); 
  55.         } catch (Exception e) { 
  56.             log.error("exception", e); 
  57.         } 
  58.     } 

本文轉(zhuǎn)載自微信公眾號「冰河技術(shù)」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系冰河技術(shù)公眾號。


分享名稱:并發(fā)編程中一種經(jīng)典的分而治之的思想??!
網(wǎng)頁URL:http://uogjgqi.cn/article/djdhiij.html
掃二維碼與項目經(jīng)理溝通

我們在微信上24小時期待你的聲音

解答本文疑問/技術(shù)咨詢/運營咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流