队列(Queue)结构应用分析
什么是队列?
队列(Queue)这种结构非常好理解。你可以把它想象成超市中排队结账,排在前面的先结账出队,排在后面的后结账出队。后来的人只能站在末尾,不允许插队。如图所示:
类似超市排队结账,还有地铁,机场,火车站排队进站等这种满足先进者先出结构,就是我们要探讨的典型的“队列”。
队列(Queue)跟栈(Stack)非常相似,也是操作受限的一种逻辑结构,最基本的操作也是两个:入队(enqueue),放一个数据到队列尾部;出队(dequeue),从队列头部取一个元素。如图所示:
队列有哪些应用场景?
队列的应用非常广泛,特别是一些具有某些额外特性的队列,比如循环队列、阻塞队列、并发队列。它们在很多偏底层的系统、框架、中间件的开发中,起着关键性的作用。比如高性能队列 Disruptor,Linux 环形缓存,都用到了循环并发队列;Java concurrent 并发包中BlockingQueue 基于阻塞特性实现公平,非公平等特性。
JAVA中常见队列有哪些?
单端队列:只支持一端入队(enqueue),一端出队(dequeue)。
双端队列:支持队列的两端进行入队和出队操作。
循环队列:可提供更好的性能,降低时间复杂度。
阻塞队列:生产者和消费者应用模型中的一种容器,在队列空或满的时候进行阻塞。
优先级队列:支持按优先级操作的的队列结构(内部对元素进行排序)。
……。
Java中队列规范的定义?
package com.cy.pj.ds.queue;
public interface Queue {
void enqueue(Object element);
Object dequeue();
int size();
boolean isEmpty();
}
基于Java数组如何实现队列?
package com.cy.pj.ds.queue;
/** * 基于数组结构进行队列实现 */
public class SimpleArrayQueue implements Queue {
private Object[] array;
private int size;
public SimpleArrayQueue() {
this(10);
}
public SimpleArrayQueue(int capacity) {
this.array=new Object[capacity];
}
@Override
public void enqueue(Object element) {
//1.队列是否已满,满了则抛出异常
if(size==array.length)
throw new IllegalStateException("Queue is full");
//2.存储数据
array[size]=element;
//3.有效元素个数加1
size++;
}
@Override
public Object dequeue() {
//1.判断队列是否为空
if(size==0)
throw new IllegalStateException("Queue is empty");
//2.获取对头元素
Object temp=array[0];
//3.移动元素
// for(int i=0;i<array.length-1;i++) {
// array[i]=array[i+1];
// }
System.arraycopy(array, 1, array, 0, size-1);
//4.设置size-1位置元素为空
array[size-1]=null;
//5.有效元素个数减一
size--;
return temp;
}
@Override
public int size() {
return size;
}
@Override
public boolean isEmpty() {
return size==0;
}
public static void main(String[] args) {
SimpleArrayQueue queue=new SimpleArrayQueue(3);
queue.enqueue(100);
queue.enqueue(200);
queue.enqueue(300);
System.out.println(queue.size());
System.out.println(queue.dequeue());//FIFO 100
System.out.println(queue.dequeue());//FIFO 200
System.out.println(queue.dequeue());//FIFO 300
System.out.println(queue.isEmpty());
}
}
每次出队的时候,数组的元素整体往左移动,这样队列出队的时间复杂度就为O(N),那么有什么办法可以降低队列出队操作的时间复杂度吗?
基于Java链表如何实现队列?
package com.cy.pj.ds.queue;
/** * 基于单链表结构进行基础队列的实现 * @author qilei */
public class SimpleLinkedQueue implements Queue {
private Node head;
private Node tail;
class Node{
private Object element;
private Node next;
public Node(Object element,Node next) {
this.element=element;
this.next=next;
}
}
@Override
public void enqueue(Object element) {
if(head==null) {
head=tail=new Node(element,null);
}else {
Node newNode=new Node(element, null);
tail.next=newNode;
tail=newNode;
}
}
@Override
public Object dequeue() {
if(head==null)return null;
Node oldHead=head;
head=oldHead.next;
Object element=oldHead.element;
oldHead.next=null;
oldHead.element=null;
return element;
}
@Override
public int size() {
int count=0;
Node node=head;
while(node!=null) {
node=node.next;
count++;
}
return count;
}
@Override
public boolean isEmpty() {
//return size==0;
return head==null;
}
public static void main(String[] args) {
SimpleLinkedQueue queue=new SimpleLinkedQueue();
queue.enqueue(100);
queue.enqueue(200);
queue.enqueue(300);
System.out.println(queue.size());
System.out.println(queue.dequeue());//100
System.out.println(queue.dequeue());//200
System.out.println(queue.dequeue());//300
System.out.println(queue.isEmpty());
}
}
基于链表的实现方式,可以实现一个支持无限排队的无界队列,但是可能会导致过多的请求排队等待,请求处理的响应时间过长。所以,针对响应时间比较敏感的系统,基于链表实现是不合适的(例如线程池)。
什么是循环队列?
循环队列是让队列形成一种环形结构,它以循环的方式去存储元素,但还是会按照队列的先进先出的原则去操作,如图所示:
为什么要使用循环队列?
在基础队列应用中,基于数组方式实现的简易队列,我们发现一个问题,每次出队都会涉及到数组中元素的移动,时间的复杂度比较高,那如何进行优化呢?
我们现在为队列添加两个变量,它们分别为head和tail,其初始值都为下标0,都指向数组中的第一个元素,如图所示:
现在我们向队列添加A,B,C,D四个元素,每添加一个元素tail的下标就向后移动一个位置,当四个元素添加完毕,此时tail移动到下标为4的位置。如图所示:
当从队列中出队一个元素,head的下标也会向后移动一个位置,假设现在出队A,B两个元素,此时head的位置,如图所示:
随着不停地进行入队、出队操作,head和tail都会持续往后移动。当tail移动到最右边,即使数组中还有空闲空间,也无法继续往队列中添加数据了。那我们怎么办呢?那就是移动元素,如图所示:
通过这样的设计可以适当减少元素移动次数,出队操作的时间复杂度仍然是O(1),但
入队操作的时间复杂度不是O(1).那我们还能继续优化吗?此时可以借助”循环队列”,降低时间复杂度,减少队列中元素的移动,充分利用队列空间。
基于Java如何实现循环队列?
循环队列通过head和tail两个变量操作入队和出队,假如“head==tail”则表示队列为空,“head=(tail+1)%capacity”则表示队列已满。当进行入队时tail=(tail+1)%capacity,出队时head=(head+1)%capacity。其详细代码如下:
package com.cy.pj.ds.queue;
/** * 循环队列实现 * @author qilei * */
public class CircleArrayQueue implements Queue{
private Object[] array;
private int capacity;
private int head;
private int tail;
public CircleArrayQueue(int capacity) {
this.capacity=capacity+1;
this.array=new Object[this.capacity];
}
@Override
public void enqueue(Object element) {
//1.入队操作
array[tail]=element;
//2.修改tail变量的值,并进行判定
int newTail=(tail+1)%this.capacity;
if(newTail==head)
throw new IllegalStateException("Queue is full");
tail=newTail;
}
@Override
public Object dequeue() {
if(head==tail)
throw new IllegalStateException("Queue is empty");
Object temp=array[head];
array[head]=null;
head=(head+1)%capacity;
return temp;
}
@Override
public int size() {
int count=tail-head;
if(count<0)
count+=this.capacity;
return count;
}
@Override
public boolean isEmpty() {
return head==tail;
}
public static void main(String[] args) {
CircleArrayQueue queue=new CircleArrayQueue(3);
System.out.println("head="+queue.head);//0
System.out.println("tail="+queue.tail);//0
queue.enqueue(100);
queue.enqueue(200);
queue.enqueue(300);
System.out.println(queue.size());//3
System.out.println("head="+queue.head);//0
System.out.println("tail="+queue.tail);//3
System.out.println(queue.dequeue());//100
System.out.println("head="+queue.head);//1
System.out.println("tail="+queue.tail);//3
queue.enqueue(400);
System.out.println("head="+queue.head);//1
System.out.println("tail="+queue.tail);//0
}
}
循环队列中随着不断入队操作的执行,tail指向了队尾的后一个位置,也就是新元素将要被插入的位置,如果该位置和head相等了,那么必然说明当前状态已经不能容纳一个元素入队(间接的说明队满)。因为这种情况是和队空(head==tail)的判断条件是一样的,所以我们选择舍弃一个节点位置,tail指向下一个元素的位置,我们使用tail+1判断下一个元素插入之后,是否还能再加入一个元素,如果不能了说明队列满,不能容纳当前元素入队(其实还剩下一个空位置),当然这是牺牲了一个节点位置来实现和判断队空的条件进行区分。
什么是双端队列?
双端队列(Double-ended queue)是一种特殊的队列,简称为Deque。支持队列两端的入队和出队操作。同时具备了栈(Stack)和队列(Queue)特性,如图所示:
为什么会有双端队列?
双端队列在很多场景都有应用,Java中ForkJoin模式下的工作窃取(允许其它线程从自己的线程队列尾部获取任务、执行任务)
基于Java链表如何实现双端队列?
第一步:定义队列接口
package com.cy.pj.ds.queue;
/** * 双端队列接口 * @author qilei */
public interface Deque {
void addFirst(Object element);
void addLast(Object element);
Object removeFirst();
Object removeLast();
int size();
boolean isEmpty();
//....
}
第二步:定义队列接口实现
package com.cy.pj.ds.queue;
/** * 简易双端队列实现 * @author qilei */
public class SimpleLinkedDeque implements Deque {
transient Node first;
transient Node last;
transient int size;
static class Node{
private Object element;
private Node next;
private Node prev;
public Node(Node prev,Object element,Node next) {
this.prev=prev;
this.element=element;
this.next=next;
}
}
@Override
public void addFirst(Object element) {
//1.获取第一个节点
Node oldFirst=first;
//2.创建新节点
Node newNode=new Node(null, element, oldFirst);
//3.设置第一个节点
first=newNode;
if(oldFirst==null) {
last=newNode;
}else {
oldFirst.prev=newNode;
}
//4.修改size的值
size++;
}
@Override
public void addLast(Object element) {
//1.获取最后一个节点
Node oldLast=last;
//2.创建新节点
Node newNode=new Node(oldLast, element, null);
//3.设置最后一个节点
last=newNode;
if(oldLast==null) {
first=newNode;
}else {
oldLast.next=newNode;
}
//4.更新size的值
size++;
}
@Override
public Object removeFirst() {
//1.获取第一个节点并进行判定
Node oldFirst=first;
if(oldFirst==null)
throw new IllegalStateException("Queue is empty");
//2.修改第一个节点并设置新的first节点
Object temp=oldFirst.element;
Node newFirst=oldFirst.next;
oldFirst.element=null;
oldFirst.next=null;
//oldFirst.prev=null;
first=newFirst;
if(newFirst==null) {
last=null;
}else {
newFirst.prev=null;
}
//3.更新size的值
size--;
return temp;
}
@Override
public Object removeLast() {
//1.获取last节点并进行校验
Node oldLast=last;
if(oldLast==null)
throw new IllegalStateException("Queue is empty");
//2.修改last节点并设置新的last
Object temp=oldLast.element;
Node newLast=oldLast.prev;
oldLast.element=null;//help gc
oldLast.prev=null;
last=newLast;
if(newLast==null) {
first=null;
}else {
newLast.next=null;
}
//3.更新size的值
size--;
return temp;
}
@Override
public int size() {
return size;
}
@Override
public boolean isEmpty() {
return size==0;
}
public static void main(String[] args) {
SimpleLinkedDeque dq=new SimpleLinkedDeque();
dq.addLast(100);//first
dq.addLast(200);
dq.addLast(300);//last
System.out.println(dq.removeFirst());//100
System.out.println(dq.removeFirst());//200
System.out.println(dq.removeFirst());//300
//=========================
dq.addFirst(400);
dq.addFirst(500);
System.out.println(dq.removeLast());
System.out.println(dq.removeLast());
}
}
什么是阻塞式队列?
阻塞(Blocking)式队列,顾名思义,首先它是一个队列(Queue),然后在这个队列中加入了阻塞(Blocking)式功能(例如去饭店吃饭,满员了可在等候区排队等待)。
为什么需要阻塞式队列?
阻塞式队列(BlockingQueue)经常应用于生产者和消费者模式,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。当队列中没有数据的情况下,消费端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。当队列中填满数据的情况下,生产端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。如图所示:
基于Java如何实现阻塞队列?
package com.cy.pj.ds.queue;
/** * 阻塞式队列实现 */
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class SimpleArrayBlockingQueue {
//store data
Object[] array;
//index for dequeue
int takeIndex;
//index for enqueue
int putIndex;
//number of elements
int size;
//lock guarding all access
ReentrantLock lock;
//Condition for dequeue
Condition takeCondition;
Condition for enqueue
Condition putCondition;
public SimpleArrayBlockingQueue(int capacity) {
if(capacity<0)
throw new IllegalArgumentException();
this.array=new Object[capacity];
lock=new ReentrantLock();
takeCondition=lock.newCondition();
putCondition=lock.newCondition();
}
//enqueue
public void put(Object element) throws InterruptedException {
//1.获取锁对象
final ReentrantLock lock=this.lock;
lock.lockInterruptibly();
try {
//2.校验容器是否已满,满了则等待
while(size==array.length)putCondition.await();
//3.存储数据
array[putIndex]=element;
if(++putIndex==array.length)putIndex=0;
size++;
//4.通知消费者取数据
takeCondition.signalAll();;
}finally {
//5.释放锁
lock.unlock();
}
}
//dequeue
public Object take() throws InterruptedException {
//1.获取锁对象
final ReentrantLock lock=this.lock;
lock.lockInterruptibly();
try {
//2.检测容器是否为空,空则等待
while(size==0)takeCondition.await();
//3.移除元素
Object element=array[takeIndex];
array[takeIndex]=null;
if(++takeIndex==array.length)takeIndex=0;
size--;
//4.通知生产者放数据
putCondition.signalAll();
return element;
}finally {
//5.方法锁
lock.unlock();
}
}
public int size() {
final ReentrantLock lock=this.lock;
lock.lock();
try {
return size;
}finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
SimpleArrayBlockingQueue q=new SimpleArrayBlockingQueue(3);
new Thread() {
//Producer
int i=100;
public void run() {
try {
while(true) {
q.put(i++);
try{
Thread.sleep(500);}catch(Exception e) {
}
}
}catch (Exception e) {
e.printStackTrace();
}
};
}.start();
new Thread() {
//Consumer
public void run() {
try {
while(true) {
System.out.println(q.take());
}
}catch (Exception e) {
e.printStackTrace();
}
};
}.start();
}
}
在Java的JUC包中,提供了很多基于阻塞方式实现的队列,BlockingQueue接口是一种阻塞式队列接口,基于此接口的实现类对象解决了高效、安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。
Java中的ArrayBlockQueue如何实现的?
ArrayBlockingQueue 是一个有边界的阻塞队列,它的内部实现是一个数组。它的容量是有限的,我们必须在其初始化的时候指定它的容量大小,容量大小一旦指定,其大小不可改变。其内部的阻塞方式是通过重入锁 ReenterLock 和 Condition 条件队列实现的,但是队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,这样就会导致入队和出队操作不能同时进行。
Java中LinkedBlockingQueue如何实现的?
LinkedBlockingQueue采用的是一种基于单链表实现的阻塞式无界队列。此队列在添加一个元素时会创建一个新的Node对象。删除一个元素时要移除一个节点对象。频发的创建和销毁可能对GC操作有较大影响。但是,此队列中的锁(Lock)是分离的,其添加操作采用的是putLock,移除操作采用的则是takeLock,这样能大大提高队列的吞吐量,也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
Java中ConcurrentLinkedQueue如何实现的?
ConcurrentLinkedQueue是一个基于单链表实现的、线程安全的、非阻塞式无界队列。此队列的设计也非常考验设计功底,其内部全程使用了cas操作,并且在边界控制方面也引入了哨兵机制。总之,设计复杂程度远远高于直接使用锁(Lock)对象方式的线程安全队列的实现。
文章评论