BlockingQueue

首先先介绍一下 Queue、AbstractQueue等接口和类。

Queue

该接口针对添加元素、移除元素、获取第一个元素各有两个方法:

  1. 添加元素:
    • add(E e) 当队列为满时会抛出 IllegalStateException
      offer(E e) 当队列为满时会返回 false
  2. 移除元素
    • remove(E e) 当队列为空时会抛出 NoSuchElementException
      poll(E e) 当队列为空时会返回 null
  3. 获取第一个元素:
    • element( ) 当队列为空时会抛出 NoSuchElementException
      peek( ) 当队列为空时会返回 null

BlockingQueue

该接口实现了 Queue 接口,针对添加元素、移除元素各有两个方法

  1. 添加元素:

    一直等待:

    1
    void put(E e) throws InterruptedException;

    超时等待:

    1
    2
    boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException;
  2. 移除元素:

    一直等待:

    1
    E take() throws InterruptedException;

    超时等待:

    1
    E poll(long timeout, TimeUnit unit) throws InterruptedException;

AbstractQueue

add(E e)

1
2
3
4
5
6
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}

remove()

1
2
3
4
5
6
7
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}

element()

1
2
3
4
5
6
7
public E element() {
E x = peek();
if (x != null)
return x;
else
throw new NoSuchElementException();
}

ArrayBlockingQueue

简介

继承体系为:

1
2
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable

拥有的变量如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/** The queued items */
final Object[] items;

/** items index for next take, poll, peek or remove */
int takeIndex;

/** items index for next put, offer, or add */
int putIndex;

/** Number of elements in the queue */
int count;

/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/

/** Main lock guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

/**
* Shared state for currently active iterators, or null if there
* are known not to be any. Allows queue operations to update
* iterator state.
*/
transient Itrs itrs = null;

线程安全实现

线程安全主要是通过可重入锁 ReentrantLock 来实现的。

add(E e)

1
2
3
public boolean add(E e) {
return super.add(e);
}

offer(E e)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 获取锁
lock.lock();
try {
// 如果队列已满返回 false
if (count == items.length)
return false;
else {
// 添加元素
enqueue(e);
return true;
}
} finally {
// 释放锁
lock.unlock();
}
}

enqueue(E e)

1
2
3
4
5
6
7
8
9
10
11
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
// 类似于notify()
notEmpty.signal();
}

poll()

1
2
3
4
5
6
7
8
9
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}

dequeue()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}

阻塞

阻塞是基于 Condition 接口实现的。Condition 拥有类似的操作:await/signal。Condition 和一个 Lock 相关,由lock.newCondition() 来创建。只有当前线程获取了这把锁,才能调用 Condition 的 await 方法来等待通知,否则会抛出异常。

put(E e)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
// 如果队列已满,则阻塞等待队列移除元素
notFull.await();
// 该方法中包含移除元素发生阻塞时需要 notEmpty.signal()
enqueue(e);
} finally {
lock.unlock();
}
}
  • 实现阻塞的关键就是就是这个 notFull 的 Condition,当队列已满,await 方法会阻塞当前线程,并且释放Lock,等待其他线程调用 notFull 的 signal 来唤醒这个阻塞的线程。那么这个操作必然会在拿走元素的操作中出现,这样一旦有元素被拿走,阻塞的线程就会被唤醒。
  • 发出 signal 的线程肯定拥有这把锁的,因此 await 方法所在的线程肯定是拿不到这把锁的,await 方法不能立刻返回,需要尝试获取锁直到拥有了锁才可以从 await 方法中返回。

take()

同样对于take方法会有一个 notEmpty 的 Condition。

1
2
3
4
5
6
7
8
9
10
11
12
13
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
// 如果队列为空,则阻塞等待队列添加一个元素
notEmpty.await();
// 该方法中包含移除元素发生阻塞时需要 notFull.signal()
return dequeue();
} finally {
lock.unlock();
}
}

总结

  • 线程安全是基于可重入锁 ReentrantLock 实现的。
  • 阻塞是基于 Condition 的 await/signal 实现的。

LinkedBlockingQueue

简介

LinkedBlockingQueue 是使用一个链表来实现,拥有一个内部类 Node 其中 Node 结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Linked list node class
*/
static class Node<E> {
E item;

/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;

Node(E x) { item = x; }
}

拥有成员变量如下,其中包括一个 head 节点(item 为 null,next 节点才是first 节点)和一个 last 节点(next 节点为 null),然后还包括两个锁:takeLock 和 putLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;

/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();

/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node<E> head;

/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

方法解析

offer(E e)

源码如下,另外还有一个重载方法 offer(E e, long timeout, TimeUnit unit),在队列已满的时候,进行超时阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
// 队列已满则返回 false
if (count.get() == capacity)
return false;
int c = -1;
// 根据待添加元素 e 创建一个 node 节点
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
// 将 last 节点的 next 设置为 node 节点 ,然后将 node 节点设置为 last 节点
enqueue(node);
c = count.getAndIncrement();
// 如果添加该元素后队列还未满,则调用 notFull.signal() 方法,即唤醒添加元素时由于队列已满导致阻塞的线程
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
// 一个获取 takeLock、notEmpty.signal()、释放 takeLock 操作过程。也就是通知其他线程可以移除元素了。
signalNotEmpty();
return c >= 0;
}

enqueue(Node e)

1
2
3
4
5
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}

poll()

和 offer(E e) 方法类似,除此之外也提供了一个重载方法 poll(long timeout, TimeUnit unit),在队列为空的时候,进行超时阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public E poll() {
final AtomicInteger count = this.count;
// 队列为空则返回 null
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
// 获取 first 节点的 item,然后设置为 head 节点(first.item = null)
x = dequeue();
c = count.getAndDecrement();
// 如果移除元素后队列不为空,则唤醒移除元素时由于队列为空导致阻塞的线程
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
// 一个获取 putLock、notFull.signal()、释放 putLock 操作过程。也就是通知其他线程可以添加元素了。
if (c == capacity)
signalNotFull();
return x;
}

dequeue()

1
2
3
4
5
6
7
8
9
10
11
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}

put(E e)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}

take()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

remove(Object o)

因为需要操作整个链表,因此需要同时拥有两个锁才能操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public boolean remove(Object o) {
if (o == null) return false;
// putLock.lock(); takeLock.lock();
fullyLock();
try {
// 遍历查找
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}

contains(Object o)

因为需要操作整个链表,因此需要同时拥有两个锁才能操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public boolean contains(Object o) {
if (o == null) return false;
// putLock.lock(); takeLock.lock();
fullyLock();
try {
// 遍历查找
for (Node<E> p = head.next; p != null; p = p.next)
if (o.equals(p.item))
return true;
return false;
} finally {
fullyUnlock();
}
}

总结

  • 底层是基于链表实现的。
  • 拥有两个锁:putLock 和 takeLock。分别对应添加和移除元素操作。
  • 与 putLock 和 takeLock 相对应的两个 condition 分别为 notFull 和 notEmpty。
  • offer(E e) 方法在队列已满的时候返回 false,put(E e) 方法在队列已满的时候阻塞。
  • poll() 方法在队列为空的时候返回 null,take() 方法在队列为空的时候阻塞。
  • remove(Object o) 和 contains(Object o) ,因为需要操作整个链表,因此需要同时拥有两个锁才能操作。