首先先介绍一下 Queue、AbstractQueue等接口和类。
Queue
该接口针对添加元素、移除元素、获取第一个元素各有两个方法:
- 添加元素:
- add(E e) 当队列为满时会抛出 IllegalStateException
offer(E e) 当队列为满时会返回 false
- add(E e) 当队列为满时会抛出 IllegalStateException
- 移除元素
- remove(E e) 当队列为空时会抛出 NoSuchElementException
poll(E e) 当队列为空时会返回 null
- remove(E e) 当队列为空时会抛出 NoSuchElementException
- 获取第一个元素:
- element( ) 当队列为空时会抛出 NoSuchElementException
peek( ) 当队列为空时会返回 null
- element( ) 当队列为空时会抛出 NoSuchElementException
BlockingQueue
该接口实现了 Queue 接口,针对添加元素、移除元素各有两个方法
添加元素:
一直等待:
1
void put(E e) throws InterruptedException;
超时等待:
1
2boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;移除元素:
一直等待:
1
E take() throws InterruptedException;
超时等待:
1
E poll(long timeout, TimeUnit unit) throws InterruptedException;
AbstractQueue
add(E e)
1 | public boolean add(E e) { |
remove()
1 | public E remove() { |
element()
1 | public E element() { |
ArrayBlockingQueue
简介
继承体系为:
1 | public class ArrayBlockingQueue<E> extends AbstractQueue<E> |
拥有的变量如下:
1 | /** The queued items */ |
线程安全实现
线程安全主要是通过可重入锁 ReentrantLock 来实现的。
add(E e)
1 | public boolean add(E e) { |
offer(E e)
1 | public boolean offer(E e) { |
enqueue(E e)
1 | private void enqueue(E x) { |
poll()
1 | public E poll() { |
dequeue()
1 | private E dequeue() { |
阻塞
阻塞是基于 Condition 接口实现的。Condition 拥有类似的操作:await/signal。Condition 和一个 Lock 相关,由lock.newCondition() 来创建。只有当前线程获取了这把锁,才能调用 Condition 的 await 方法来等待通知,否则会抛出异常。
put(E e)
1 | public void put(E e) throws InterruptedException { |
- 实现阻塞的关键就是就是这个 notFull 的 Condition,当队列已满,await 方法会阻塞当前线程,并且释放Lock,等待其他线程调用 notFull 的 signal 来唤醒这个阻塞的线程。那么这个操作必然会在拿走元素的操作中出现,这样一旦有元素被拿走,阻塞的线程就会被唤醒。
- 发出 signal 的线程肯定拥有这把锁的,因此 await 方法所在的线程肯定是拿不到这把锁的,await 方法不能立刻返回,需要尝试获取锁直到拥有了锁才可以从 await 方法中返回。
take()
同样对于take方法会有一个 notEmpty 的 Condition。
1 | public E take() throws InterruptedException { |
总结
- 线程安全是基于可重入锁 ReentrantLock 实现的。
- 阻塞是基于 Condition 的 await/signal 实现的。
LinkedBlockingQueue
简介
LinkedBlockingQueue 是使用一个链表来实现,拥有一个内部类 Node 其中 Node 结构如下:
1 | /** |
拥有成员变量如下,其中包括一个 head 节点(item 为 null,next 节点才是first 节点)和一个 last 节点(next 节点为 null),然后还包括两个锁:takeLock 和 putLock
1 | /** The capacity bound, or Integer.MAX_VALUE if none */ |
方法解析
offer(E e)
源码如下,另外还有一个重载方法 offer(E e, long timeout, TimeUnit unit),在队列已满的时候,进行超时阻塞。
1 | public boolean offer(E e) { |
enqueue(Node e)
1 | private void enqueue(Node<E> node) { |
poll()
和 offer(E e) 方法类似,除此之外也提供了一个重载方法 poll(long timeout, TimeUnit unit),在队列为空的时候,进行超时阻塞。
1 | public E poll() { |
dequeue()
1 | private E dequeue() { |
put(E e)
1 | public void put(E e) throws InterruptedException { |
take()
1 | public E take() throws InterruptedException { |
remove(Object o)
因为需要操作整个链表,因此需要同时拥有两个锁才能操作。
1 | public boolean remove(Object o) { |
contains(Object o)
因为需要操作整个链表,因此需要同时拥有两个锁才能操作。
1 | public boolean contains(Object o) { |
总结
- 底层是基于链表实现的。
- 拥有两个锁:putLock 和 takeLock。分别对应添加和移除元素操作。
- 与 putLock 和 takeLock 相对应的两个 condition 分别为 notFull 和 notEmpty。
- offer(E e) 方法在队列已满的时候返回 false,put(E e) 方法在队列已满的时候阻塞。
- poll() 方法在队列为空的时候返回 null,take() 方法在队列为空的时候阻塞。
- remove(Object o) 和 contains(Object o) ,因为需要操作整个链表,因此需要同时拥有两个锁才能操作。