掃二維碼與項(xiàng)目經(jīng)理溝通
我們在微信上24小時(shí)期待你的聲音
解答本文疑問/技術(shù)咨詢/運(yùn)營咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流
1. 為什么要寫這篇文章

創(chuàng)新互聯(lián)是一家專注于成都網(wǎng)站設(shè)計(jì)、成都網(wǎng)站建設(shè)與策劃設(shè)計(jì),青原網(wǎng)站建設(shè)哪家好?創(chuàng)新互聯(lián)做網(wǎng)站,專注于網(wǎng)站建設(shè)10年,網(wǎng)設(shè)計(jì)領(lǐng)域的專業(yè)建站公司;建站業(yè)務(wù)涵蓋:青原等地區(qū)。青原做網(wǎng)站價(jià)格咨詢:18982081108
幾年前 NoSQL 開始流行的時(shí)候,像其他團(tuán)隊(duì)一樣,我們的團(tuán)隊(duì)也熱衷于令人興奮的新東西,并且計(jì)劃替換一個(gè)應(yīng)用程序的數(shù)據(jù)庫。但是,當(dāng)深入實(shí)現(xiàn)細(xì)節(jié)時(shí),我們想起了一位智者曾經(jīng)說過的話:“細(xì)節(jié)決定成敗”。最終我們意識到 NoSQL 不是解決所有問題的銀彈,而 NoSQL vs RDMS 的答案是:“視情況而定”。類似地,去年RxJava 和 Spring Reactor 這樣的并發(fā)庫加入了讓人充滿激情的語句,如異步非阻塞方法等。為了避免再犯同樣的錯(cuò)誤,我們嘗試評估諸如 ExecutorService、 RxJava、Disruptor 和 Akka 這些并發(fā)框架彼此之間的差異,以及如何確定各自框架的正確用法。
本文中用到的術(shù)語在這里有更詳細(xì)的描述。
2. 分析并發(fā)框架的示例用例
3. 快速更新線程配置
在開始比較并發(fā)框架的之前,讓我們快速復(fù)習(xí)一下如何配置最優(yōu)線程數(shù)以提高并行任務(wù)的性能。這個(gè)理論適用于所有框架,并且在所有框架中使用相同的線程配置來度量性能。
參考: http://baddotrobot.com/blog/2013/06/01/optimum-number-of-threads/
4. 性能測試結(jié)果
性能測試配置 GCP -> 處理器:Intel(R) Xeon(R) CPU @ 2.30GHz;架構(gòu):x86_64;CPU 內(nèi)核:8個(gè)(注意:這些結(jié)果僅對該配置有意義,并不表示一個(gè)框架比另一個(gè)框架更好)。
5. 使用執(zhí)行器服務(wù)并行化 IO 任務(wù)
5.1 何時(shí)使用?
如果一個(gè)應(yīng)用程序部署在多個(gè)節(jié)點(diǎn)上,并且每個(gè)節(jié)點(diǎn)的 req/sec 小于可用的核心數(shù)量,那么 ExecutorService 可用于并行化任務(wù),更快地執(zhí)行代碼。
5.2 什么時(shí)候適用?
如果一個(gè)應(yīng)用程序部署在多個(gè)節(jié)點(diǎn)上,并且每個(gè)節(jié)點(diǎn)的 req/sec 遠(yuǎn)遠(yuǎn)高于可用的核心數(shù)量,那么使用 ExecutorService 進(jìn)一步并行化只會(huì)使情況變得更糟。
當(dāng)外部服務(wù)延遲增加到 400ms 時(shí),性能測試結(jié)果如下(請求速率 @50 req/sec,8核)。
5.3 所有任務(wù)按順序執(zhí)行示例
- // I/O 任務(wù):調(diào)用外部服務(wù)
- String posts = JsonService.getPosts();
- String comments = JsonService.getComments();
- String albums = JsonService.getAlbums();
- String photos = JsonService.getPhotos();
- // 合并來自外部服務(wù)的響應(yīng)
- // (內(nèi)存中的任務(wù)將作為此操作的一部分執(zhí)行)
- int userId = new Random().nextInt(10) + 1;
- String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments);
- String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);
- // 構(gòu)建最終響應(yīng)并將其發(fā)送回客戶端
- String response = postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;
- return response;
5.4 I/O 任務(wù)與 ExecutorService 并行執(zhí)行代碼示例
- // 添加 I/O 任務(wù)
- List
> ioCallableTasks = new ArrayList<>(); - ioCallableTasks.add(JsonService::getPosts);
- ioCallableTasks.add(JsonService::getComments);
- ioCallableTasks.add(JsonService::getAlbums);
- ioCallableTasks.add(JsonService::getPhotos);
- // 調(diào)用所有并行任務(wù)
- ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize);
- List
> futuresOfIOTasks = ioExecutorService.invokeAll(ioCallableTasks); - // 獲取 I/O 操作(阻塞調(diào)用)結(jié)果
- String posts = futuresOfIOTasks.get(0).get();
- String comments = futuresOfIOTasks.get(1).get();
- String albums = futuresOfIOTasks.get(2).get();
- String photos = futuresOfIOTasks.get(3).get();
- // 合并響應(yīng)(內(nèi)存中的任務(wù)是此操作的一部分)
- String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments);
- String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);
- // 構(gòu)建最終響應(yīng)并將其發(fā)送回客戶端
- return postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;
6. 使用執(zhí)行器服務(wù)并行化 IO 任務(wù)(CompletableFuture)
與上述情況類似:處理傳入請求的 HTTP 線程被阻塞,而 CompletableFuture 用于處理并行任務(wù)。
6.1 何時(shí)使用?
如果沒有 AsyncResponse,性能與 ExecutorService 相同。如果多個(gè) API 調(diào)用必須異步并且鏈接起來,那么這種方法更好(類似 Node 中的 Promises)。
- ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize);
- // I/O 任務(wù)
- CompletableFuture
postsFuture = CompletableFuture.supplyAsync(JsonService::getPosts, ioExecutorService); - CompletableFuture
commentsFuture = CompletableFuture.supplyAsync(JsonService::getComments, - ioExecutorService);
- CompletableFuture
albumsFuture = CompletableFuture.supplyAsync(JsonService::getAlbums, - ioExecutorService);
- CompletableFuture
photosFuture = CompletableFuture.supplyAsync(JsonService::getPhotos, - ioExecutorService);
- CompletableFuture.allOf(postsFuture, commentsFuture, albumsFuture, photosFuture).get();
- // 從 I/O 任務(wù)(阻塞調(diào)用)獲得響應(yīng)
- String posts = postsFuture.get();
- String comments = commentsFuture.get();
- String albums = albumsFuture.get();
- String photos = photosFuture.get();
- // 合并響應(yīng)(內(nèi)存中的任務(wù)將是此操作的一部分)
- String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments);
- String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);
- // 構(gòu)建最終響應(yīng)并將其發(fā)送回客戶端
- return postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;
7. 使用 ExecutorService 并行處理所有任務(wù)
使用 ExecutorService 并行處理所有任務(wù),并使用 @suspended AsyncResponse response 以非阻塞方式發(fā)送響應(yīng)。
圖片來自 http://tutorials.jenkov.com/java-nio/nio-vs-io.html
7.1 何時(shí)使用?
如果用例類似于服務(wù)器端聊天應(yīng)用程序,在客戶端響應(yīng)之前,線程不需要保持連接,那么異步、非阻塞方法比同步通信更受歡迎。在這些用例中,系統(tǒng)資源可以通過異步、非阻塞方法得到更好的利用,而不僅僅是等待。
- // 為異步執(zhí)行提交并行任務(wù)
- ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize);
- CompletableFuture
postsFuture = CompletableFuture.supplyAsync(JsonService::getPosts, ioExecutorService); - CompletableFuture
commentsFuture = CompletableFuture.supplyAsync(JsonService::getComments, - ioExecutorService);
- CompletableFuture
albumsFuture = CompletableFuture.supplyAsync(JsonService::getAlbums, - ioExecutorService);
- CompletableFuture
photosFuture = CompletableFuture.supplyAsync(JsonService::getPhotos, - ioExecutorService);
- // 當(dāng) /posts API 返回響應(yīng)時(shí),它將與來自 /comments API 的響應(yīng)結(jié)合在一起
- // 作為這個(gè)操作的一部分,將執(zhí)行內(nèi)存中的一些任務(wù)
- CompletableFuture
postsAndCommentsFuture = postsFuture.thenCombineAsync(commentsFuture, - (posts, comments) -> ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments),
- ioExecutorService);
- // 當(dāng) /albums API 返回響應(yīng)時(shí),它將與來自 /photos API 的響應(yīng)結(jié)合在一起
- // 作為這個(gè)操作的一部分,將執(zhí)行內(nèi)存中的一些任務(wù)
- CompletableFuture
albumsAndPhotosFuture = albumsFuture.thenCombineAsync(photosFuture, - (albums, photos) -> ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos),
- ioExecutorService);
- // 構(gòu)建最終響應(yīng)并恢復(fù) http 連接,把響應(yīng)發(fā)送回客戶端
- postsAndCommentsFuture.thenAcceptBothAsync(albumsAndPhotosFuture, (s1, s2) -> {
- LOG.info("Building Async Response in Thread " + Thread.currentThread().getName());
- String response = s1 + s2;
- asyncHttpResponse.resume(response);
- }, ioExecutorService);
8. RxJava
8.1 何時(shí)使用?
如果編碼的場景適合異步非阻塞方式,那么可以*** RxJava 或任何響應(yīng)式開發(fā)庫。還具有諸如 back-pressure 之類的附加功能,可以在生產(chǎn)者和消費(fèi)者之間平衡負(fù)載。
- int userId = new Random().nextInt(10) + 1;
- ExecutorService executor = CustomThreads.getExecutorService(8);
- // I/O 任務(wù)
- Observable
postsObservable = Observable.just(userId).map(o -> JsonService.getPosts()) - .subscribeOn(Schedulers.from(executor));
- Observable
commentsObservable = Observable.just(userId).map(o -> JsonService.getComments()) - .subscribeOn(Schedulers.from(executor));
- Observable
albumsObservable = Observable.just(userId).map(o -> JsonService.getAlbums()) - .subscribeOn(Schedulers.from(executor));
- Observable
photosObservable = Observable.just(userId).map(o -> JsonService.getPhotos()) - .subscribeOn(Schedulers.from(executor));
- // 合并來自 /posts 和 /comments API 的響應(yīng)
- // 作為這個(gè)操作的一部分,將執(zhí)行內(nèi)存中的一些任務(wù)
- Observable
postsAndCommentsObservable = Observable - .zip(postsObservable, commentsObservable,
- (posts, comments) -> ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments))
- .subscribeOn(Schedulers.from(executor));
- // 合并來自 /albums 和 /photos API 的響應(yīng)
- // 作為這個(gè)操作的一部分,將執(zhí)行內(nèi)存中的一些任務(wù)
- Observable
albumsAndPhotosObservable = Observable - .zip(albumsObservable, photosObservable,
- (albums, photos) -> ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos))
- .subscribeOn(Schedulers.from(executor));
- // 構(gòu)建最終響應(yīng)
- Observable.zip(postsAndCommentsObservable, albumsAndPhotosObservable, (r1, r2) -> r1 + r2)
- .subscribeOn(Schedulers.from(executor))
- .subscribe((response) -> asyncResponse.resume(response), e -> asyncResponse.resume("error"));
9. Disruptor
[Queue vs RingBuffer]
圖片1:http://tutorials.jenkov.com/java-concurrency/blocking-queues.html
圖片2:https://www.baeldung.com/lmax-disruptor-concurrency
9.1 何時(shí)使用?
Disruptor 框架在下列場合性能更好:與事件驅(qū)動(dòng)的體系結(jié)構(gòu)一起使用,或主要關(guān)注內(nèi)存任務(wù)的單個(gè)生產(chǎn)者和多個(gè)消費(fèi)者。
- static {
- int userId = new Random().nextInt(10) + 1;
- // 示例 Event-Handler; count down latch 用于使線程與 http 線程同步
- EventHandler
postsApiHandler = (event, sequence, endOfBatch) -> { - event.posts = JsonService.getPosts();
- event.countDownLatch.countDown();
- };
- // 配置 Disputor 用于處理事件
- DISRUPTOR.handleEventsWith(postsApiHandler, commentsApiHandler, albumsApiHandler)
- .handleEventsWithWorkerPool(photosApiHandler1, photosApiHandler2)
- .thenHandleEventsWithWorkerPool(postsAndCommentsResponseHandler1, postsAndCommentsResponseHandler2)
- .handleEventsWithWorkerPool(albumsAndPhotosResponseHandler1, albumsAndPhotosResponseHandler2);
- DISRUPTOR.start();
- }
- // 對于每個(gè)請求,在 RingBuffer 中發(fā)布一個(gè)事件:
- Event event = null;
- RingBuffer
ringBuffer = DISRUPTOR.getRingBuffer(); - long sequence = ringBuffer.next();
- CountDownLatch countDownLatch = new CountDownLatch(6);
- try {
- event = ringBuffer.get(sequence);
- event.countDownLatch = countDownLatch;
- event.startTime = System.currentTimeMillis();
- } finally {
- ringBuffer.publish(sequence);
- }
- try {
- event.countDownLatch.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
10. Akka
圖片來自:https://blog.codecentric.de/en/2015/08/introduction-to-akka-actors/
10.1 示例代碼
- // 來自 controller :
- Actors.masterActor.tell(new Master.Request("Get Response", event, Actors.workerActor), ActorRef.noSender());
- // handler :
- public Receive createReceive() {
- return receiveBuilder().match(Request.class, request -> {
- Event event = request.event; // Ideally, immutable data structures should be used here.
- request.worker.tell(new JsonServiceWorker.Request("posts", event), getSelf());
- request.worker.tell(new JsonServiceWorker.Request("comments", event), getSelf());
- request.worker.tell(new JsonServiceWorker.Request("albums", event), getSelf());
- request.worker.tell(new JsonServiceWorker.Request("photos", event), getSelf());
- }).match(Event.class, e -> {
- if (e.posts != null && e.comments != null & e.albums != null & e.photos != null) {
- int userId = new Random().nextInt(10) + 1;
- String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, e.posts,
- e.comments);
- String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, e.albums,
- e.photos);
- String response = postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;
- e.response = response;
- e.countDownLatch.countDown();
- }
- }).build();
- }
11. 總結(jié)

我們在微信上24小時(shí)期待你的聲音
解答本文疑問/技術(shù)咨詢/運(yùn)營咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流