当前位置:网站首页>Concurrent linked queue: a non blocking unbounded thread safe queue

Concurrent linked queue: a non blocking unbounded thread safe queue

2020-11-08 23:46:01 Liu Zhihang

Preface


JUC The following related source code continue to read , This shows a non blocking unbounded thread safe queue —— ConcurrentLinkedQueue, Let's have a look .


official account :『 Liu Zhihang 』, Record the skills in work study 、 Development and source notes ; From time to time to share some of the life experience . You are welcome to guide !

Introduce

Unbounded thread safe queue based on linked nodes , The element FIFO( fifo ) Sort . The head of the queue is the longest element in the queue , At the end of the queue is the element with the shortest time in the queue . Insert a new element at the end of the queue , The queue retrieval operation gets the elements of the queue header .

When many threads share access to a common collection ConcurrentLinkedQueue It's a good choice . Like most other concurrent collection implementations , This class is not allowed to use null Elements .

Basic use

public class ConcurrentLinkedQueueTest {

    public static void main(String[] args) {

        ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();

        //  Inserts the specified element at the end of this queue .
        queue.add("liuzhihang");
        //  Inserts the specified element at the end of this queue .
        queue.offer("liuzhihang");

        //  Get but do not remove the header of this queue , Empty queue returns  null.
        queue.peek();
        //  Get and remove the header of this queue , This queue is empty and returns  null.
        queue.poll();
        
    }
}

Source code analysis

The basic structure

LNKjGH-iwlqrt

Parameter Introduction

private static class Node<E> {
    
    //  The elements in the node 
    volatile E item;
    //  Next node 
    volatile Node<E> next;

    Node(E item) {
        UNSAFE.putObject(this, itemOffset, item);
    }
    // CAS  Set the node element in the way of 
    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }
    //  Set the next node 
    void lazySetNext(Node<E> val) {
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }

    // CAS  Set the next node in the way of 
    boolean casNext(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    //  Omit  ……
}

stay ConcurrentLinkedQueue Inner contains an inner class Node, As shown above , This inner class is used to identify a node in a linked list , You can see from the code that , stay ConcurrentLinkedQueue The linked list in is One way linked list .

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable {
        
    //  Other omitted 

    //  Head node 
    private transient volatile Node<E> head;      

    //  Tail node 
    private transient volatile Node<E> tail;
}

Head and tail nodes use volatile modification , Ensure memory visibility .

Constructors

public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}

When you create an object , The head and tail nodes point to an empty node .

ZzpJTa-lOE9g4

Additive elements

public boolean add(E e) {
    return offer(e);
}
public boolean offer(E e) {
    
    //  Verify that it is empty 
    checkNotNull(e);

    //  Create nodes 
    final Node<E> newNode = new Node<E>(e);

    //  Loop into the queue 
    // t  Is the current tail node ,p  For the initial  t
    for (Node<E> t = tail, p = t;;) {
        // q  Is the next node of the tail node 
        Node<E> q = p.next;
        if (q == null) {
            //  It's empty , There is no node after it , be  CAS  Set tail node 
            if (p.casNext(null, newNode)) {
                //  here  p.next  yes  newNode
                //  If  p != t  There is concurrency 
                if (p != t) 
                    //  Other threads have been updated  tail
                    // q = p.next  therefore  q == null  It's not true 
                    // q  Here we are  t.next
                    //  At this time will be  tail  Updated to   New node 
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        //  multithreading , poll , Operation to remove elements , May lead to  p == q 
        //  At this point, search again 
        else if (p == q)
            // 
            p = (t != (t = tail)) ? t : head;
        else
            //  Check  tail  And update the 
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

Drawing description :

  • In the case of single thread :
  1. When executed Node<E> q = p.next; when , The current situation is shown in the figure :

QJ5fjB-QtmN7F

  1. Judge q == null, Meet the conditions , At this point, it will execute p.casNext(null, newNode) Use CAS Set up p.next.
  2. After setting successfully ,p == t There is no change , So the program exits .
  • multithreading :
  1. When executed Node<E> q = p.next; when , The current situation is shown in the figure :

QJ5fjB-QtmN7F

  1. Multiple threads execute p.casNext(null, newNode) Use CAS Set up p.next.
  2. A Threads CAS Set up the success :

u4O6mX-yQeaI6

  1. B Threads CAS Execution failure , Recycle , Will perform to the p = (p != t && t != (t = tail)) ? t : q.

vkP5tp-jaUZOc

  1. Loop again and you can set it up successfully .

Get elements

public E poll() {
    restartFromHead:
    //  Infinite loop 
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            //  The first node  iterm
            E item = p.item;
            //  If the current node is not  null CAS  Set to  null
            if (item != null && p.casItem(item, null)) {
                // CAS  success   Then the mark is removed 
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            //  The current queue is not empty   return  null
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            //  Self quoted ,  Repeat the cycle 
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

The drawing process is as follows :

  1. When executing the inner loop , If the queue is empty :E item = p.item; here ,iterm by null, Meeting updateHead(h, p) And back to null.
  2. Suppose there are concurrent inserts at the same time , Added an element , Now, as shown in the figure :

zGeE4p-Udb3x0

At this point, the final else take p = q

KkcsgS-8RvW4W

  1. Continue the loop to get item, And implement p.casItem(item, null) , And then determine p != h, to update head And back to item.

mBNd6o-RS2hU0

The situation here is more complicated , Here is just a list of , If you need to, you can list a few more .

The code of viewing element is similar to that of getting element code .

size operation

public int size() {
    int count = 0;
    for (Node<E> p = first(); p != null; p = succ(p))
        if (p.item != null)
            // Collection.size() spec says to max out
            if (++count == Integer.MAX_VALUE)
                break;
    return count;
}

CAS No locks , therefore size It's not accurate . also size I'll go through the list , It costs a lot of performance .

summary

ConcurrentLinkedQueue It's relatively less used at work , So when reading the relevant source code, I just looked at it , Learn about common API, And the underlying principles .

A simple summary is to use One way linked list To save the queue elements , Internal use of non blocking CAS Algorithm , No locks . So the calculation size It may not be accurate , Again size Will traverse the list , So it is not recommended to use .

版权声明
本文为[Liu Zhihang]所创,转载请带上原文链接,感谢