Queue容器

Queue主要可以分为两类,一类是不阻塞的,一类是阻塞的。非阻塞队列主要有PriorityQueue 和 ConcurrentLinkedQueue。

实现一个线程安全的队列主要有两种方式:阻塞队列(加锁,线程会阻塞;CAS,Locksupport.park)以及非阻塞队列(CAS,线程不会阻塞)

使用阻塞算法的队列可以用一个锁 (入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现。非阻塞的实现方式则可以使用循环CAS的方式来实现。

非阻塞队列:PriorityQueue(线程不安全) ConcurrentLinkedQueue(线程安全)

双端队列

比如LinkedList以及ArrayDeque就是双端队列。其中,ArrayQueue是一个用数组实现的双端队列,可以在数组两端进行元素的插入以及删除,所以,这个数组必须是循环数组。LinkedList是基于双向链表的,容量没有限制,可在链表两端进行插入以及删除元素。

ArrayDeque底层是一个数组。

ArrayDeque是一个循环队列。它的实现比较高效,它的思路是这样:引入两个游标,head 和 tail,如果向队列里,插入一个元素,就把 tail 向后移动。如果从队列中删除一个元素,就把head向后移动。

非阻塞队列

非阻塞队列主要讲一下PriorityQueue以及ConcurrentLinkedQueue。

PriorityQueue

PriorityQueue又叫做优先级队列,保存队列元素的顺序不是按照及加入队列的顺序,而是按照队列元素的大小进行重新排序。

  • PriorityQueue内部实现是一个小顶堆,这样保证每次取出来的一定是最小值,他会要求你定义一个Comparable接口。PriorityQueue

  • PriorityQueue不是线程安全的,在多线程情况下最好使用PriorityBlockingQueue 。

  • 不允许插入 null 元素

ConcurrentLinkedQueue(ToDo)

无阻塞线程安全的队列,使用CAS+自旋的操作来执行,这样线程不会阻塞,所以叫做非阻塞队列。

如果我们要实现一个线程安全的队列有两种实现方式:一种是使用阻塞算法,另一种是使用非阻塞算法。使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现,而非阻塞的实现方式则可以使用循环 CAS 的方式来实现

这些方法实际上是通过调用UNSAFE实例的方法,通过CAS处理是线程安全的。

1
2
3
4
5
6
7
8
9
10
11
12
//更改Node中的数据域item	
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
//更改Node中的指针域next
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
//更改Node中的指针域next
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}

ConcurrentLinkedQueue

ConcurrentLinkedQueue

阻塞队列

阻塞队列(BlockingQueue)被广泛使用在“生产者-消费者”问题中,其原因是BlockingQueue提供了可阻塞的插入和移除的方法。当队列容器已满,生产者线程会被阻塞,直到队列未满;当队列容器为空时,消费者线程会被阻塞,直至队列非空时为止。

生产者消费者问题。 put()以及take()方法

一般来说,使用锁和条件队列实现,线程会阻塞;CAS+LockSupport.park(),线程会阻塞。

ArrayBlockingQueue(一把锁)

ArrayBlockingQueue = ArrayQueue+ReentrantLock+Condition。

所以,一方面,ArrayBlockingQueue使用Array做一个循环队列,另一方面,通过ReentrantLock以及Condition来实现等待唤醒操作。

1
2
3
4
5
6
7
8
9
/** The queued items */
final Object[] items;
final ReentrantLock lock;

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

源码中可以看出ArrayBlockingQueue内部是采用数组进行数据存储的属性items),为了保证线程安全,采用的是`ReentrantLock lock`,为了保证可阻塞式的插入删除数据利用的是Condition,当获取数据的消费者线程被阻塞时会将该线程放置到notEmpty等待队列中,当插入数据的生产者线程被阻塞时,会将该线程放置到notFull等待队列中。

put方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//如果当前队列已满,将线程移入到notFull等待队列中
while (count == items.length)
notFull.await();
//满足插入数据的要求,直接进行入队操作
enqueue(e);
} finally {
lock.unlock();
}
}

//添加数据,唤醒消费者
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
//插入数据
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
//通知消费者线程,当前队列中有数据可供消费
notEmpty.signal();
}

take方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//如果队列为空,没有数据,将消费者线程移入等待队列中
while (count == 0)
notEmpty.await();
//获取数据
return dequeue();
} finally {
lock.unlock();
}
}

//取出数据,唤醒生产者
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//获取数据
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
//通知被阻塞的生产者线程
notFull.signal();
return x;
}

LinkedBlockingQueue(两把锁)

  • 底层采用链表来实现。

  • LinkedBlockingQueue在插入数据和删除数据时分别是由两个不同的locktakeLockputLock)来控制线程安全的,因此,也由这两个lock生成了两个对应的condition(notEmptynotFull)来实现可阻塞的插入和删除数据。

  • 通过takeLock和putLock两个锁来控制生产和消费,互不干扰,不会相互因为独占锁而阻塞。

  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    transient Node<E> head;

    /**
    * Tail of linked list.
    * Invariant: last.next == null
    */
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

PriorityBlockingQueue(无界队列)

PriorityBlockingQueue是一个底层由数组实现的无界队列,并带有排序功能,同样采用ReentrantLock来控制并发。由于是无界的,所以插入元素时不会阻塞,没有队列满的状态,只有队列为空的状态。通过这两点特征其实可以猜测它应该是有一个独占锁(底层数组)和一个Condition(只通知消费)来实现的。

DelayQueue

DelayQueue 是一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。

SynchronousQueue

CAS+park.

没有容量的队列。每进行以此put,必须要进行一次take。

LinkedTransferQueue

CAS+park

LinkedTransferQueue是一个无界的阻塞队列,底层由链表实现。

LinkedBlockingDeque

LinkedBlockingDeque是一个有界的双端队列底层采用一个双向的链表来实现,在LinkedBlockingQeque的Node实现多了指向前一个节点的变量prev。并发控制上和ArrayBlockingQueue类似,采用单个ReentrantLock来控制并发,这里是因为双端队列头尾都可以消费和生产,所以使用了一个共享锁。

总结

无界队列:PriorityBlockingQueue(有序,数组)、DelayQueue(底层实现为PriorityBlockingQueue,适用于定时任务)和LinkedTransferQueue(链表)。

有界队列:ArrayBlockingQueue(数组)、LinkedBlockingQueue(链表)以及LinkedBlockingDeque(双向链表)。

没有容量:SynchronousQueue

阻塞队列