Queue主要可以分为两类,一类是不阻塞的,一类是阻塞的。非阻塞队列主要有PriorityQueue 和 ConcurrentLinkedQueue。
实现一个线程安全的队列主要有两种方式:阻塞队列(加锁,线程会阻塞;CAS,Locksupport.park)以及非阻塞队列(CAS,线程不会阻塞)。
使用阻塞算法的队列可以用一个锁 (入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现。非阻塞的实现方式则可以使用循环CAS的方式来实现。
非阻塞队列:PriorityQueue(线程不安全) ConcurrentLinkedQueue(线程安全)
双端队列
比如LinkedList以及ArrayDeque就是双端队列。其中,ArrayQueue是一个用数组实现的双端队列,可以在数组两端进行元素的插入以及删除,所以,这个数组必须是循环数组。LinkedList是基于双向链表的,容量没有限制,可在链表两端进行插入以及删除元素。
ArrayDeque底层是一个数组。
ArrayDeque是一个循环队列。它的实现比较高效,它的思路是这样:引入两个游标,head 和 tail,如果向队列里,插入一个元素,就把 tail 向后移动。如果从队列中删除一个元素,就把head向后移动。
非阻塞队列
非阻塞队列主要讲一下PriorityQueue以及ConcurrentLinkedQueue。
PriorityQueue
PriorityQueue又叫做优先级队列,保存队列元素的顺序不是按照及加入队列的顺序,而是按照队列元素的大小进行重新排序。
PriorityQueue内部实现是一个小顶堆,这样保证每次取出来的一定是最小值,他会要求你定义一个Comparable接口。PriorityQueue
PriorityQueue不是线程安全的,在多线程情况下最好使用PriorityBlockingQueue 。
不允许插入 null 元素
ConcurrentLinkedQueue(ToDo)
无阻塞线程安全的队列,使用CAS+自旋的操作来执行,这样线程不会阻塞,所以叫做非阻塞队列。
如果我们要实现一个线程安全的队列有两种实现方式:一种是使用阻塞算法,另一种是使用非阻塞算法。使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现,而非阻塞的实现方式则可以使用循环 CAS 的方式来实现。
这些方法实际上是通过调用UNSAFE实例的方法,通过CAS处理是线程安全的。
1 | //更改Node中的数据域item |
阻塞队列
阻塞队列(BlockingQueue)被广泛使用在“生产者-消费者”问题中,其原因是BlockingQueue提供了可阻塞的插入和移除的方法。当队列容器已满,生产者线程会被阻塞,直到队列未满;当队列容器为空时,消费者线程会被阻塞,直至队列非空时为止。
生产者消费者问题。 put()以及take()方法
一般来说,使用锁和条件队列实现,线程会阻塞;CAS+LockSupport.park(),线程会阻塞。
ArrayBlockingQueue(一把锁)
ArrayBlockingQueue = ArrayQueue+ReentrantLock+Condition。
所以,一方面,ArrayBlockingQueue使用Array做一个循环队列,另一方面,通过ReentrantLock以及Condition来实现等待唤醒操作。
1 | /** The queued items */ |
源码中可以看出ArrayBlockingQueue内部是采用数组进行数据存储的(属性items),为了保证线程安全,采用的是`ReentrantLock lock`,为了保证可阻塞式的插入删除数据利用的是Condition,当获取数据的消费者线程被阻塞时会将该线程放置到notEmpty等待队列中,当插入数据的生产者线程被阻塞时,会将该线程放置到notFull等待队列中。
put方法
1 | public void put(E e) throws InterruptedException { |
take方法
1 | public E take() throws InterruptedException { |
LinkedBlockingQueue(两把锁)
底层采用链表来实现。
LinkedBlockingQueue在插入数据和删除数据时分别是由两个不同的lock(
takeLock和putLock)来控制线程安全的,因此,也由这两个lock生成了两个对应的condition(notEmpty和notFull)来实现可阻塞的插入和删除数据。通过takeLock和putLock两个锁来控制生产和消费,互不干扰,不会相互因为独占锁而阻塞。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19transient Node<E> head;
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
PriorityBlockingQueue(无界队列)
PriorityBlockingQueue是一个底层由数组实现的无界队列,并带有排序功能,同样采用ReentrantLock来控制并发。由于是无界的,所以插入元素时不会阻塞,没有队列满的状态,只有队列为空的状态。通过这两点特征其实可以猜测它应该是有一个独占锁(底层数组)和一个Condition(只通知消费)来实现的。
DelayQueue
DelayQueue 是一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。
SynchronousQueue
CAS+park.
没有容量的队列。每进行以此put,必须要进行一次take。
LinkedTransferQueue
CAS+park
LinkedTransferQueue是一个无界的阻塞队列,底层由链表实现。
LinkedBlockingDeque
LinkedBlockingDeque是一个有界的双端队列,底层采用一个双向的链表来实现,在LinkedBlockingQeque的Node实现多了指向前一个节点的变量prev。并发控制上和ArrayBlockingQueue类似,采用单个ReentrantLock来控制并发,这里是因为双端队列头尾都可以消费和生产,所以使用了一个共享锁。
总结
无界队列:PriorityBlockingQueue(有序,数组)、DelayQueue(底层实现为PriorityBlockingQueue,适用于定时任务)和LinkedTransferQueue(链表)。
有界队列:ArrayBlockingQueue(数组)、LinkedBlockingQueue(链表)以及LinkedBlockingDeque(双向链表)。
没有容量:SynchronousQueue