掃二維碼與項(xiàng)目經(jīng)理溝通
我們在微信上24小時(shí)期待你的聲音
解答本文疑問/技術(shù)咨詢/運(yùn)營咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流
前言

PriorityBlockingQueue 優(yōu)先級隊(duì)列,線程安全(添加、讀取都進(jìn)行了加鎖)、無界、讀阻塞的隊(duì)列,底層采用的堆結(jié)構(gòu)實(shí)現(xiàn)(二叉樹),默認(rèn)是小根堆,最小的或者最大的元素會一直置頂,每次獲取都取最頂端的數(shù)據(jù)
隊(duì)列創(chuàng)建
小根堆
- PriorityBlockingQueue
concurrentLinkedQueue = new PriorityBlockingQueue ();
大根堆
- PriorityBlockingQueue
concurrentLinkedQueue = new PriorityBlockingQueue (10, new Comparator () { - @Override
- public int compare(Integer o1, Integer o2) {
- return o2 - o1;
- }
- });
應(yīng)用場景
有任務(wù)要執(zhí)行,可以對任務(wù)加一個(gè)優(yōu)先級的權(quán)重,這樣隊(duì)列會識別出來,對該任務(wù)優(yōu)先進(jìn)行出隊(duì)。
我們來看一個(gè)具體例子,例子中定義了一個(gè)將要放入“優(yōu)先阻塞隊(duì)列”的任務(wù)類,并且定義了一個(gè)任務(wù)工場類和一個(gè)任務(wù)執(zhí)行類,在任務(wù)工場類中產(chǎn)生了各種不同優(yōu)先級的任務(wù),將其添加到隊(duì)列中,在任務(wù)執(zhí)行類中,任務(wù)被一個(gè)個(gè)取出并執(zhí)行。
- package com.niuh.queue.priority;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Queue;
- import java.util.Random;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.PriorityBlockingQueue;
- import java.util.concurrent.TimeUnit;
- /**
- *
- * PriorityBlockingQueue使用示例
- *
- */
- public class PriorityBlockingQueueDemo {
- public static void main(String[] args) throws Exception {
- Random random = new Random(47);
- ExecutorService exec = Executors.newCachedThreadPool();
- PriorityBlockingQueue
queue = new PriorityBlockingQueue<>(); - exec.execute(new PrioritizedTaskProducer(queue, exec)); // 這里需要注意,往PriorityBlockingQueue中添加任務(wù)和取出任務(wù)的
- exec.execute(new PrioritizedTaskConsumer(queue)); // 步驟是同時(shí)進(jìn)行的,因而輸出結(jié)果并不一定是有序的
- }
- }
- class PrioritizedTask implements Runnable, Comparable
{ - private Random random = new Random(47);
- private static int counter = 0;
- private final int id = counter++;
- private final int priority;
- protected static List
sequence = new ArrayList<>(); - public PrioritizedTask(int priority) {
- this.priority = priority;
- sequence.add(this);
- }
- @Override
- public int compareTo(PrioritizedTask o) {
- return priority < o.priority ? 1 : (priority > o.priority ? -1 : 0); // 定義優(yōu)先級計(jì)算方式
- }
- @Override
- public void run() {
- try {
- TimeUnit.MILLISECONDS.sleep(random.nextInt(250));
- } catch (InterruptedException e) {
- }
- System.out.println(this);
- }
- @Override
- public String toString() {
- return String.format("[%1$-3d]", priority) + " Task " + id;
- }
- public String summary() {
- return "(" + id + ": " + priority + ")";
- }
- public static class EndSentinel extends PrioritizedTask {
- private ExecutorService exec;
- public EndSentinel(ExecutorService exec) {
- super(-1);
- this.exec = exec;
- }
- @Override
- public void run() {
- int count = 0;
- for (PrioritizedTask pt : sequence) {
- System.out.print(pt.summary());
- if (++count % 5 == 0) {
- System.out.println();
- }
- }
- System.out.println();
- System.out.println(this + " Calling shutdownNow()");
- exec.shutdownNow();
- }
- }
- }
- class PrioritizedTaskProducer implements Runnable {
- private Random random = new Random(47);
- private Queue
queue; - private ExecutorService exec;
- public PrioritizedTaskProducer(Queue
queue, ExecutorService exec) { - this.queue = queue;
- this.exec = exec;
- }
- @Override
- public void run() {
- for (int i = 0; i < 20; i++) {
- queue.add(new PrioritizedTask(random.nextInt(10))); // 往PriorityBlockingQueue中添加隨機(jī)優(yōu)先級的任務(wù)
- Thread.yield();
- }
- try {
- for (int i = 0; i < 10; i++) {
- TimeUnit.MILLISECONDS.sleep(250);
- queue.add(new PrioritizedTask(10)); // 往PriorityBlockingQueue中添加優(yōu)先級為10的任務(wù)
- }
- for (int i = 0; i < 10; i++) {
- queue.add(new PrioritizedTask(i));// 往PriorityBlockingQueue中添加優(yōu)先級為1-10的任務(wù)
- }
- queue.add(new PrioritizedTask.EndSentinel(exec));
- } catch (InterruptedException e) {
- }
- System.out.println("Finished PrioritizedTaskProducer");
- }
- }
- class PrioritizedTaskConsumer implements Runnable {
- private PriorityBlockingQueue
queue; - public PrioritizedTaskConsumer(PriorityBlockingQueue
queue) { - this.queue = queue;
- }
- @Override
- public void run() {
- try {
- while (!Thread.interrupted()) {
- queue.take().run(); // 任務(wù)的消費(fèi)者,從PriorityBlockingQueue中取出任務(wù)執(zhí)行
- }
- } catch (InterruptedException e) {
- }
- System.out.println("Finished PrioritizedTaskConsumer");
- }
- }
工作原理
PriorityBlockingQueue 是 JDK1.5 的時(shí)候出來的一個(gè)阻塞隊(duì)列。但是該隊(duì)列入隊(duì)的時(shí)候是不會阻塞的,永遠(yuǎn)會加到隊(duì)尾。下面我們介紹下它的幾個(gè)特點(diǎn):
注意:
源碼分析
定義
PriorityBlockingQueue的類繼承關(guān)系如下:
其包含的方法定義如下:
成員屬性
從下面的字段我們可以知道,該隊(duì)列可以排序,使用顯示鎖來保證操作的原子性,在空隊(duì)列時(shí),出隊(duì)線程會堵塞等。
- /**
- * 默認(rèn)數(shù)組長度
- */
- private static final int DEFAULT_INITIAL_CAPACITY = 11;
- /**
- * 最大達(dá)容量,分配時(shí)超出可能會出現(xiàn) OutOfMemoryError 異常
- */
- private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
- /**
- * 隊(duì)列,存儲我們的元素
- */
- private transient Object[] queue;
- /**
- * 隊(duì)列長度
- */
- private transient int size;
- /**
- * 比較器,入隊(duì)進(jìn)行權(quán)重的比較
- */
- private transient Comparator super E> comparator;
- /**
- * 顯示鎖
- */
- private final ReentrantLock lock;
- /**
- * 空隊(duì)列時(shí)進(jìn)行線程阻塞的 Condition 對象
- */
- private final Condition notEmpty;
構(gòu)造函數(shù)
- /**
- * 默認(rèn)構(gòu)造,使用長度為 11 的數(shù)組,比較器為空
- */
- public PriorityBlockingQueue() {
- this(DEFAULT_INITIAL_CAPACITY, null);
- }
- /**
- * 自定義數(shù)據(jù)長度構(gòu)造,比較器為空
- */
- public PriorityBlockingQueue(int initialCapacity) {
- this(initialCapacity, null);
- }
- /**
- * 自定義數(shù)組長度,可以自定義比較器
- */
- public PriorityBlockingQueue(int initialCapacity,
- Comparator super E> comparator) {
- if (initialCapacity < 1)
- throw new IllegalArgumentException();
- this.lock = new ReentrantLock();
- this.notEmpty = lock.newCondition();
- this.comparator = comparator;
- this.queue = new Object[initialCapacity];
- }
- /**
- * 構(gòu)造函數(shù),帶有初始內(nèi)容的隊(duì)列
- */
- public PriorityBlockingQueue(Collection extends E> c) {
- this.lock = new ReentrantLock();
- this.notEmpty = lock.newCondition();
- boolean heapify = true; // true if not known to be in heap order
- boolean screen = true; // true if must screen for nulls
- if (c instanceof SortedSet>) {
- SortedSet extends E> ss = (SortedSet extends E>) c;
- this.comparator = (Comparator super E>) ss.comparator();
- heapify = false;
- }
- else if (c instanceof PriorityBlockingQueue>) {
- PriorityBlockingQueue extends E> pq =
- (PriorityBlockingQueue extends E>) c;
- this.comparator = (Comparator super E>) pq.comparator();
- screen = false;
- if (pq.getClass() == PriorityBlockingQueue.class) // exact match
- heapify = false;
- }
- Object[] a = c.toArray();
- int n = a.length;
- // If c.toArray incorrectly doesn't return Object[], copy it.
- if (a.getClass() != Object[].class)
- a = Arrays.copyOf(a, n, Object[].class);
- if (screen && (n == 1 || this.comparator != null)) {
- for (int i = 0; i < n; ++i)
- if (a[i] == null)
- throw new NullPointerException();
- }
- this.queue = a;
- this.size = n;
- if (heapify)
- heapify();
- }
入隊(duì)方法
入隊(duì)方法,下面可以看到 put 方法最終會調(diào)用 offer 方法,所以我們只看 offer 方法即可。
offer(E e)
- public void put(E e) {
- offer(e); // never need to block
- }
- public boolean offer(E e) {
- //判斷是否為空
- if (e == null)
- throw new NullPointerException();
- //顯示鎖
- final ReentrantLock lock = this.lock;
- lock.lock();
- //定義臨時(shí)對象
- int n, cap;
- Object[] array;
- //判斷數(shù)組是否滿了
- while ((n = size) >= (cap = (array = queue).length))
- //數(shù)組擴(kuò)容
- tryGrow(array, cap);
- try {
- //拿到比較器
- Comparator super E> cmp = comparator;
- //判斷是否有自定義比較器
- if (cmp == null)
- //堆上浮
- siftUpComparable(n, e, array);
- else
- //使用自定義比較器進(jìn)行堆上浮
- siftUpUsingComparator(n, e, array, cmp);
- //隊(duì)列長度 +1
- size = n + 1;
- //喚醒休眠的出隊(duì)線程
- notEmpty.signal();
- } finally {
- //釋放鎖
- lock.unlock();
- }
- return true;
- }
siftUpComparable(int k, T x, Object[] array)
上浮調(diào)整比較器方法的實(shí)現(xiàn)
- private static
void siftUpComparable(int k, T x, Object[] array) { - Comparable super T> key = (Comparable super T>) x;
- while (k > 0) {
- //無符號向左移,目的是找到放入位置的父節(jié)點(diǎn)
- int parent = (k - 1) >>> 1;
- //拿到父節(jié)點(diǎn)的值
- Object e = array[parent];
- //比較是否大于該元素,不大于就沒比較交換
- if (key.compareTo((T) e) >= 0)
- break;
- //以下都是元素位置交換
- array[k] = e;
- k = parent;
- }
- array[k] = key;
- }
根據(jù)上面的代碼,可以看出這是完全二叉樹在進(jìn)行上浮調(diào)整。調(diào)整入隊(duì)的元素,找出最小的,將元素排列有序化。簡單理解就是:父節(jié)點(diǎn)元素值一定要比它的子節(jié)點(diǎn)得小,如果父節(jié)點(diǎn)大于子節(jié)點(diǎn)了,那就兩者位置進(jìn)行交換。
入隊(duì)圖解
例子:85 添加到二叉堆中(大頂堆)
- package com.niuh.queue.priority;
- import java.util.Comparator;
- import java.util.concurrent.PriorityBlockingQueue;
- /**
- *
- * PriorityBlockingQueue 簡單演示 demo
- *
- */
- public class TestPriorityBlockingQueue {
- public static void main(String[] args) throws InterruptedException {
- // 大頂堆
- PriorityBlockingQueue
concurrentLinkedQueue = new PriorityBlockingQueue (10, new Comparator () { - @Override
- public int compare(Integer o1, Integer o2) {
- return o2 - o1;
- }
- });
- concurrentLinkedQueue.offer(90);
- concurrentLinkedQueue.offer(80);
- concurrentLinkedQueue.offer(70);
- concurrentLinkedQueue.offer(60);
- concurrentLinkedQueue.offer(40);
- concurrentLinkedQueue.offer(30);
- concurrentLinkedQueue.offer(20);
- concurrentLinkedQueue.offer(10);
- concurrentLinkedQueue.offer(50);
- concurrentLinkedQueue.offer(85);
- //輸出元素排列
- concurrentLinkedQueue.stream().forEach(e-> System.out.print(e+" "));
- //取出元素
- Integer take = concurrentLinkedQueue.take();
- System.out.println();
- concurrentLinkedQueue.stream().forEach(e-> System.out.print(e+" "));
- }
- }
操作的細(xì)節(jié)分為兩步:
85 按照上面講的先插入到堆的尾部,也就是一維數(shù)組的尾部,一維數(shù)組的尾部的話就上圖的位置,因?yàn)檫@是一個(gè)完全二叉樹,所以它的尾部就是50后面這個(gè)結(jié)點(diǎn)。插進(jìn)來之后這個(gè)時(shí)候就破壞了堆,它的每一個(gè)結(jié)點(diǎn)都要大于它的兒子的這種屬性了,接下來要做的事情就是要把 85 依次地向上浮動,怎么浮動?就是 85 大于它的父親結(jié)點(diǎn),那么就和父親結(jié)點(diǎn)進(jìn)行交換,直到走到根如果大于根的話,就和根也進(jìn)行交換。
85 再繼續(xù)往前走之后,它要和 80 再進(jìn)行比較,同理可得:也就是說這個(gè)結(jié)點(diǎn)每次和它的父親比,如果它大于它的父親的話就交換,直到它不再大于它的父親。
出隊(duì)方法
入隊(duì)列的方法說完后,我們來說說出隊(duì)列的方法。PriorityBlockingQueue提供了多種出隊(duì)操作的實(shí)現(xiàn)來滿足不同情況下的需求,如下:
poll 和 peek 與上面類似,這里不做說明
take()
出隊(duì)方法,該方法會阻塞
- public E take() throws InterruptedException {
- //顯示鎖
- final ReentrantLock lock = this.lock;
- //可中斷鎖
- lock.lockInterruptibly();
- //結(jié)果接收對象
- E result;
- try {
- //判斷隊(duì)列是否為空
- while ( (result = dequeue()) == null)
- //線程阻塞
- notEmpty.await();
- } finally {
- lock.unlock();
- }
- return result;
- }
dequeue()
我們再來看看具體出隊(duì)方法的實(shí)現(xiàn),dequeue方法
- private E dequeue() {
- //長度減少 1
- int n = size - 1;
- //判斷隊(duì)列中是否有元素
- if (n < 0)
- return null;
- else {
- //隊(duì)列對象
- Object[] array = queue;
- //取出第一個(gè)元素
- E result = (E) array[0];
- //拿出最后一個(gè)元素
- E x = (E) array[n];
- //置空
- array[n] = null;
- Comparator super E> cmp = comparator;
- if (cmp == null)
- //下沉調(diào)整
- siftDownComparable(0, x, array, n);
- else
- siftDownUsingComparator(0, x, array, n, cmp);
- //成功則減少隊(duì)列中的元素?cái)?shù)量
- size = n;
- return result;
- }
總體就是找到父節(jié)點(diǎn)與兩個(gè)子節(jié)點(diǎn)中最小的一個(gè)節(jié)點(diǎn),然后進(jìn)行交換位置,不斷重復(fù),由上而下的交換。
siftDownComparable(int k, T x, Object[] array, int n)
再來看看下沉比較器方法的實(shí)現(xiàn)
- private static
void siftDownComparable(int k, T x, Object[] array, - int n) {
- //判斷隊(duì)列長度
- if (n > 0) {
- Comparable super T> key = (Comparable super T>)x;
- //找到隊(duì)列最后一個(gè)元素的父節(jié)點(diǎn)的索引。
- int half = n >>> 1; // loop while a non-leaf
- while (k < half) {
- //拿到 k 節(jié)點(diǎn)下的左子節(jié)點(diǎn)
- int child = (k << 1) + 1; // assume left child is least
- //取得子節(jié)點(diǎn)對應(yīng)的值
- Object c = array[child];
- //取得 k 右子節(jié)點(diǎn)的索引
- int right = child + 1;
- //比較右節(jié)點(diǎn)的索引是否小于隊(duì)列長度和左右子節(jié)點(diǎn)的值進(jìn)行比較
- if (right < n &&
- ((Comparable super T>) c).compareTo((T) array[right]) > 0)
- c = array[child = right];
- //比較父節(jié)點(diǎn)值是否大于子節(jié)點(diǎn)
- if (key.compareTo((T) c) <= 0)
- break;
- //下面都是元素替換
- array[k] = c;
- k = child;
- }
- array[k] = key;
- }
- }
出隊(duì)圖解
將堆尾元素替換到頂部(即堆頂被替代刪除掉)
依次從根部向下調(diào)整整個(gè)堆的結(jié)構(gòu)(一直到堆尾即可) HeapifyDown
例子:90 從二叉堆中刪除(大頂堆)
總結(jié)
PriorityBlockingQueue 真的是個(gè)神奇的隊(duì)列,可以實(shí)現(xiàn)優(yōu)先出隊(duì)。最特別的是它只有一個(gè)鎖,入隊(duì)操作永遠(yuǎn)成功,而出隊(duì)只有在空隊(duì)列的時(shí)候才會進(jìn)行線程阻塞??梢哉f有一定的應(yīng)用場景吧,比如:有任務(wù)要執(zhí)行,可以對任務(wù)加一個(gè)優(yōu)先級的權(quán)重,這樣隊(duì)列會識別出來,對該任務(wù)優(yōu)先進(jìn)行出隊(duì)。

我們在微信上24小時(shí)期待你的聲音
解答本文疑問/技術(shù)咨詢/運(yùn)營咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流