一 阻塞队列
1.1 阻塞队列介绍
阻塞队列(BlockingQueue)是一个在队列基础上又支持了两个附加操作的队列:
put方法:当队列装满时,添加的线程则被阻塞,直到队列不满,则可用。
take方法:当队列为空时,消费的线程则被阻塞,直到队列不空时,则可用。
阻塞功能使得生产者和消费者两端的能力得以平衡,当有任何一端速度过快时,阻塞队列便会把过快的速度给降下来。实现阻塞最重要的两个方法是 take 方法和 put 方法。
根据容量的大小,分为有界和无界两种。
总结:当队列满了无法添加元素,或者是队列空了无法移除元素时
1.2 阻塞的put与take方法逻辑
1.2.1 put方法
1.2.1.1 put方法常见接口
1) offer(E e):如果队列没满,返回true,如果队列已满,返回false(不阻塞)
2)offer(E e, long timeout, TimeUnit unit):可以设置阻塞时间,如果队列已满,则进行阻塞。超过阻塞时间,则返回false
3)put(E e):队列没满的时候是正常的插入,如果队列已满,则阻塞,直至队列空出位置
1.2.1.2 put方法逻辑
- put 方法插入元素时,如果队列没有满,那就和普通的插入一样是正常的插入
- 但是如果队列已满,那么就无法继续插入,则阻塞,直到队列里有了空闲空间
- 如果后续队列有了空闲空间,比如消费者消费了一个元素,那么此时队列就会解除阻塞状态,并把需要添加的数据添加到队列中
1.2.2 take方法
1.2.2.1 take方法逻辑
take 方法的功能是获取并移除队列的头结点,通常在队列里有数据的时候是可以正常移除的,可是一旦执行 take 方法的时候,队列里无数据,则阻塞,直到队列里有数据。一旦队列里有数据了,就会立刻解除阻塞状态,并且取到数据。
1.3 常见接口API*
- 抛出异常:add、remove、element
- 返回结果但不抛出异常:offer、poll、peek
- 阻塞:put、take
方法 | 抛出异常 | 返回特定值 | 阻塞 | 阻塞特定时间 |
入队 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
出队 | remove() | poll() | take() | poll(time, unit) |
获取队首元素 | element() | peek() | 不支持 | 不支持 |
1.4 常见阻塞队列类型
BlockingQueue 接口的实现类都被放在了 juc 包中。
1.5 阻塞队列应用场景
1.因为阻塞队列是线程安全的,所以生产者和消费者都可以是多线程的,不会发生线程安全问题。
2.队列它还能起到一个隔离的作用。银行转账的程序,账户和金额等信息放到队列中就可以,而不需要去关心银行这个类如何实现具体的转账业务。
二 详解阻塞队列
2.1 ArrayBlockingQueue
ArrayBlockingQueue是最典型的有界阻塞队列,其内部是用数组存储元素的,初始化时需要指定容量大小,利用 ReentrantLock 实现线程安全,利用了Lock锁的Condition通知机制进行阻塞控制。
使用独占锁ReentrantLock实现线程安全,入队和出队操作使用同一个锁对象,也就是只能有一个线程可以进行入队或者出队操作;这也就意味着生产者和消费者无法并行操作,在高并发场景下会成为性能瓶颈。
在生产者-消费者模型中使用时,如果生产速度和消费速度基本匹配的情况下,使用ArrayBlockingQueue是个不错选择;
特点:有界队列,两个指针都是从队首向队尾移动,保证队列的先进先出原则。存取相互排斥。
api定义:
BlockingQueue queue = new ArrayBlockingQueue(1024);
queue.put("1"); //向队列中添加元素
2.2 LinkedBlockingQueue
LinkedBlockingQueue是一个基于链表实现的阻塞队列。
LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素
LinkedBlockingQueue采用两把锁的锁分离技术实现入队出队互不阻塞,添加元素和获取元素都有独立的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行
特点: 无界阻塞度列,可以指定容量,默认为Integer.maxValue,先进先出,存取互不干扰。
队尾入队,队首出队。
api定义:
//指定队列的大小创建有界队列
BlockingQueue<Integer> boundedQueue = new LinkedBlockingQueue<>(100);
//无界队列
BlockingQueue<Integer> unboundedQueue = new LinkedBlockingQueue<>();
2.3 ArrayBlockingQueue和LinkedBlockingQueue区别
LinkedBlockingQueue是一个阻塞队列,内部由两个ReentrantLock来实现出入队列的线程安全,由各自的Condition对象的await和signal来实现等待和唤醒功能。它和ArrayBlockingQueue的不同点在于:
ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能 。
在线程池中用的就是LinkedBlockingQueue,利用其读写分离。
https://zhuanlan.zhihu.com/p/510361516?utm_id=0
2.4 SynchrounusBlockingQueue
SynchronousQueue: 是一个没有数据缓存区域的BlockingQueue,容量为0,底层基于链表,它不会为队列元素维护存储空间,它只是多个线程之间数据交换的媒介。
SynchronousQueue 最大的不同之处在于,它的容量为 0,所以没有一个地方来暂存元素,生产者线程对其的插入操作put必须等待消费者的移除操作take(导致每次取数据都要先阻塞,直到有数据被放入;同理,每次放数据的时候也会阻塞,直到有消费者来取)。
使用场景:
SynchronousQueue非常适合传递性场景做交换工作,生产者的线程和消费者的线程同步传递某些信息、事件或者任务。
Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
synchronousQueue队列的api
BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<>();
2.5 DelayQueue
DelayQueue 一个支持延时,使用优先级队列实现的无界阻塞队列, 内部采用优先队列 PriorityQueue存储元素,同时元素必须实现 Delayed接口;Delayed 接口又继承了 Comparable 接口,所以自然就拥有了比较和排序的能力。
延迟队列的特点是:不是先进先出,而是会按照延迟时间的长短来排序,下一个即将执行的任务会排到队列的最前面。会按照延迟时间的长短来排序,下一个即将执行的任务会排到队列的最前面。
DelayQueue的api
DelayQueue<OrderInfo> queue = new DelayQueue<OrderInfo>();
三 阻塞队列的选取
3.1 如何选择适合的阻塞队列
3.1.1 线程池使用的队列
1.FiexedThreadPool,singleThreadExecutor 选取的是LinkedBlockingQueue
2.cacheThreadPool 选取的是synchronousQueue
3.ScheduledThreadPool 选取的是延迟队列 DelayQueue
3.1.2 选取队列参考因素
a)功能:有优先级排序的需求,则选择PriorityBlockingQueue队列。
b)容量:有的是容量固定的,如 ArrayBlockingQueue;有的默认是容量无限的,如LinkedBlockingQueue;而有的里面没有任何容量,如 SynchronousQueue;而对于 DelayQueue 而言,它的容量固定就是 Integer.MAX_VALUE
c)扩容:根据是否需要扩容来选取合适的队列,PriorityBlockingQueue 即使在指定了初始容量之后,后续如果有需要,也可以自动扩容。ArrayBlockingQueue ,因为它的容量在创建时就确定了,无法扩容.
d)性能:如果我们的场景需要直接传递的话,可以优先考虑 SynchronousQueue
文章评论