当前位置:网站首页>AQS analysis

AQS analysis

2020-11-08 11:45:24 Meng Xin J

  What is? AQS?

  AQS yes JUC The cornerstone of memory , It is essentially an abstract class , The rules and procedures of resource contention and release under multithreading are defined , Many implementation classes are inherited from AQS, Use AQS The skeleton of the .

 

 

 AQS Principle

  AQS On the whole, it's made up of FIFO( fifo ) Queue and a state Attribute matching constitutes the resource allocation mechanism .FIFO Used to store thread nodes ,state Property is used to represent the state of a resource , If 0 I'm free , If the resource is acquired by a thread , So this state will +1, Release -1. When other threads try to compete for resources, they check state value , If you don't find out 0 Will give up the fight .

Of course, it's just the general principle , If you want to know the details , Also need to read the corresponding source code to thoroughly understand the details .

 

Source analysis

structure

   If you want to know completely AQS Principle , Need to start from its source code , Look at its internal structure . Only a few important internal classes and properties are described here .

   

   As can be seen from the picture on the left AQS Inner contains an inner class Node, This Node It is the class corresponding to the thread node object stored in the queue mentioned above , You can see that it contains prev,next attribute , So we can see that this is a queue formed by a bidirectional linked list structure .waitStatus Indicates the state of the thread corresponding to the current node , Its value is also set in the property , Namely 1,-1,-2,-3 Those constant properties .1 Indicates that the request of the thread has been canceled ,-1 Indicates that the thread is releasing resources ,-2 Express ,

   The picture on the right is AQS Properties of ,head Represents the head node of the queue ,tail Represents the end of the queue ,state Represents the state of a resource .

 

The process

   From here ReentrantLock For example , View its lock、unlock Method source code process .

   The first thing you need to know is ReentrantLock Inheritance relationship of .

  

   sync、FairSync、NonfairSync All are ReentrantLock The inner class of , among Sync It's direct inheritance AQS Of , and ReentrantLock It can be defined as fair lock or unfair lock , So there are two inner classes set inside , One FairSync Fair lock , One NonfairSync  It means unfair lock , These two classes are inheritance again Sync, The actual execution method will choose to execute the corresponding implementation method in these two classes according to the different lock properties .

 

lock()

public void lock() {
        sync.lock();
}







abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        /**
         * Performs {@link Lock#lock}. The main reason for subclassing
         * is to allow fast path for nonfair version.
         */
        abstract void lock();
        ....
}
View Code

  You can see here jump directly to a sync Abstract method of , It says so , Here, different implementations will be selected according to the nature of the lock .

 1 static final class FairSync extends Sync {
 2         private static final long serialVersionUID = -3000897897090466540L;
 3 
 4         final void lock() {
 5             acquire(1);
 6         }
 7    ....
 8 }
 9 
10 
11 
12 
13 static final class NonfairSync extends Sync {
14         private static final long serialVersionUID = 7316153563782823691L;
15 
16         /**
17          * Performs lock.  Try immediate barge, backing up to normal
18          * acquire on failure.
19          */
20         final void lock() {
21             if (compareAndSetState(0, 1))
22                 setExclusiveOwnerThread(Thread.currentThread());
23             else
24                 acquire(1);
25         }
26     ....
27 }

   We can see that the implementation of fair lock is relatively simple , Because fair lock is required to comply with the queue order , Just do it in order , Non fair locks are not that " honest " , It will try to acquire the lock first , If the thread that acquired the resource just finished executing or called wait Wait for the lock to be released , Then it will " Jump the queue "  Take resources directly and execute . Let's take a look at how more complex unfair locks are implemented .

1、compareAndSetState  Method

   according to NonfairSync Yes lock Method implementation can see , The first step is to do compareAndSetState  Method .

 1 protected final boolean compareAndSetState(int expect, int update) {
 2     // See below for intrinsics setup to support this
 3     return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
 4 }
 5 
 6 
 7 
 8 
 9 
10 public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

It turns out that the final call here is a local method , In fact, this method is a CAS Optimistic lock method ,compareAndSwapInt The four parameters of the method are object , Object property name , Expectations , Update value . When modifying, check that the object property value is equal to the expected value, and the update is successful , Otherwise it will fail . And here it is stateOffset Which attribute is it ?

 1 private static final Unsafe unsafe = Unsafe.getUnsafe();
 2     private static final long stateOffset;
 3     private static final long headOffset;
 4     private static final long tailOffset;
 5     private static final long waitStatusOffset;
 6     private static final long nextOffset;
 7 
 8     static {
 9         try {
10             stateOffset = unsafe.objectFieldOffset
11                 (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
12             headOffset = unsafe.objectFieldOffset
13                 (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
14             tailOffset = unsafe.objectFieldOffset
15                 (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
16             waitStatusOffset = unsafe.objectFieldOffset
17                 (Node.class.getDeclaredField("waitStatus"));
18             nextOffset = unsafe.objectFieldOffset
19                 (Node.class.getDeclaredField("next"));
20 
21         } catch (Exception ex) { throw new Error(ex); }
22 }

You can see this stateOffset  The attribute is AQS Of state attribute . So in lock The first step is to try to put state Change to 1, If successful, continue to execute the code in the conditional block . That is to say  setExclusiveOwnerThread  Method , The implementation of this method is like this .

1 protected final void setExclusiveOwnerThread(Thread thread) {
2     exclusiveOwnerThread = thread;
3 }

  As for this method and this property, you can see that it belongs to  AbstractOwnableSynchronizer  This class of , And this class is AQS Parent class of , So it's also from  AbstractOwnableSynchronizer  inherited , This property indicates the thread currently occupying the resource . So the first step is to use CAS Try to preempt the lock , If successful, modify the relevant properties , Then the end . If you fail, stick to it That's ok  acquire  Method .

 

2、acquire  Method

   The method is AQS The method in . There are many small methods in this method . First of all, take a look at the source code .

1 public final void acquire(int arg) {
2   if (!tryAcquire(arg) &&
3     acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4     selfInterrupt();
5 }

  

  2.1、tryAcquire  Method : Try to obtain the lock resource and determine whether the current thread has acquired the lock resource and repeatedly lock it

    This is an abstract method . Here is NonFairSync  Implementation related code .

 1 protected final boolean tryAcquire(int acquires) {
 2   return nonfairTryAcquire(acquires);
 3 }
 4 
 5 
 6 
 7 
 8 final boolean nonfairTryAcquire(int acquires) {
 9   final Thread current = Thread.currentThread();
10   int c = getState();
11   if (c == 0) {
12     if (compareAndSetState(0, acquires)) {
13       setExclusiveOwnerThread(current);
14       return true;
15           }
16        }
17   else if (current == getExclusiveOwnerThread()) {
18     int nextc = c + acquires;
19     if (nextc < 0) // overflow
20       throw new Error("Maximum lock count exceeded");
21       setState(nextc);
22       return true;
23         }
24   return false;
25 }

   This method first determines whether the current resource is idle (state=0), If idle, modify the relevant properties ( It's still said exclusiveOwnerThread  attribute ), Then the end , return true( This is the case when the thread that occupied the resource just released the lock ); Otherwise, check whether the current thread is consistent with the thread properties that occupy the resource , If it's consistent, it's going to be state+ Pass the parameter value ( Usually +1), Then the end , return true( This is to lock the current thread again when it has occupied resources ( Reentrant lock )); Responsible for return false ( Lock acquisition failed ). 

 

  2.2、addWriter  Method : Perform queue initialization and Node Node insert operation and return the node

 1   private Node addWaiter(Node mode) {
 2     Node node = new Node(Thread.currentThread(), mode);
 3     // Try the fast path of enq; backup to full enq on failure
 4     Node pred = tail;       //  Get the tail node 
 5     if (pred != null) {
 6       node.prev = pred;
 7       if (compareAndSetTail(pred, node)) {        //  Judge whether the tail node is empty , If it is not empty, the current node will be added to the tail node as the tail node 
 8         pred.next = node;
 9         return node;
10       }
11     }
12     enq(node);      //  Queue initialization and insert operations 
13     return node;
14  }

 

  enp Method : Initialization method

 private AbstractQueuedSynchronizer.Node enq(final AbstractQueuedSynchronizer.Node node) {
   for (;;) {
     Node t = tail;
     if (t == null) { //  Queue initialization , If the tail node is empty, create an empty node as the head node , And because it's for Loop, so the insert operation will continue after the queue is initialized 
       if (compareAndSetHead(new Node()))
         tail = head;
       } else {
         node.prev = t;
         if (compareAndSetTail(t, node)) {       //  Perform the insert operation , then return  return 
          t.next = node;
          return t;
       }
    }
  }
}

  from addWriter Method source code can be known , This method is Perform queue initialization and Node Node insertion operation Of , And the head node of the queue will be an empty node ( Sentinel node ).

 

  2.3、acquireQueued  Method : Control thread blocking

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();      //  Get the previous node of the current node 
                if (p == head && tryAcquire(arg)) {     //  If it is a header node and attempts to acquire the lock resource successfully , Set the current node as the head node ( Sentinel node ),
                    //  Then all the previous head node references are eliminated , Let it be recycled , Then return to the interrupt state false .  
            // The code here is executed after the current thread gets the lock ( Fair lock , Before joining the queue, it happens that the occupied thread releases the lock resource or receives a blocking wake-up in the queue , That is, other threads execute unlock Method )
setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // First determine the modification of the previous thread node waitStatus( To prevent jumping out and cancel waiting ),
                               // If it meets the requirement, the thread will be blocked again , Set the interrupt status to true( Because of access to resources ), Perform the following outbound operations
interrupted = true; } } finally { if (failed) cancelAcquire(node); // If an exception occurs in the thread , Avoid not executing the code that the thread is out of the queue, so use finally Enforcement , Remove the thread from the queue } }

  shouldParkAfterFailedAcquire: Check the... Of the previous thread waitStatus state , If not 1( Cancel the execution ), Then the current thread will be Officially join the blocking queue .

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)      //  Indicates that the thread is ready , Go straight back to true
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {       //  Indicates that the thread request has been cancelled , Will skip the thread and look back until <=0
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);     //  Find it and set it to -1( be ready )
        }
        return false;       //  Although it's back here false, But the last method is for loop , So the next loop still returns true To continue to execute the following judgment code 
    }

  parkAndCheckInterrupt: Block the thread .

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);     //  Block the current thread , Effect and wait()、notify() equally , But more flexible ,
        //  There is no need to lock and unlock in the synchronization code block , It's through “ pass ” Accomplished 
        return Thread.interrupted();
    }

   Here, if the thread does not release resources , Then the current thread will be due to LockSupport Of park Method into blocking , Enter the blocking queue and wait for the resource to be released . And to get it out of the way is by unlock() Method

 

unlock Method

   The call here is internal sync Of release Method .

public void unlock() {
        sync.release(1);
    }





    public final boolean release(int arg) {
        if (tryRelease(arg)) {      //  Determine whether the resource is idle 
            Node h = head;
            if (h != null && h.waitStatus != 0)     //  If the head node is not empty and the state is not the initial value, the first valid thread in the queue will be awakened 
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

 

1、tryRelease Method : to update state And return the idle state of the resource

  ReentrantLock Implementation method :

 protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }




protected final boolean tryRelease(int releases) {
        int c = getState() - releases;      //  Give Way state Subtract the parameter value 
        if (Thread.currentThread() != getExclusiveOwnerThread())    //  If the current thread is not a resource consuming thread , Throw an exception 
            throw new IllegalMonitorStateException();
        boolean free = false;       //  Whether the current resource is idle 
        if (c == 0) {
            free = true;        //  If state become 0, Set it to true, Then set the current thread property to null
            setExclusiveOwnerThread(null);
        }
        setState(c);        //  to update state
        return free;
    }

 

2、unparkSuccessor: Will be the head node of waitStatus Set to initial value , And wake up the thread corresponding to the first valid node of the queue ( If the next node of the head node does not meet the conditions, the first suitable thread is found from the end of the queue )
private void unparkSuccessor(Node node) {   //  Because of the head node coming from , So here node That 's the head node 
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);       //  Will be the head node of waitStatus Set to 0

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {    //  The next thread node does not satisfy the condition to execute ( The node is empty or the request has been cancelled )
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)     //  Loop through the end of the queue , Find the previous valid node 
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);   //  Release it 
    }

After executing this code, the first valid thread in the queue will be awakened , Then it became a new sentinel node , The reference of the previous thread will also be broken . The thing to notice is when The node after the head node does not meet the condition , It starts at the end of the queue until it finds the first suitable thread to wake up , Instead of walking through the head . As for the reasons, we can refer to  https://www.zhihu.com/question/50724462 , One explanation is more reasonable , That's up there addWrite  Methods enq  In the method , It's to execute first "node.prev=t " as well as  "compareAndSetTail(t, node)"  Of , And then Carry out the last sentence "t.next = node;"  The release of the lock may occur in between , If from head Start traversing backwards , So because "t.next=node"  Not implemented yet , So the newly added nodes can't be traversed , This will affect the overall performance .

 

And in the unlock() After waking up the appropriate thread , above lock The code in will continue to execute later .

 

Here is the general flow chart :

 

summary

  AQS yes JUC The cornerstone of concurrent programming , It defines the process of thread execution . Generally speaking, its principle is mainly through state And a FIFO The formation of the queue .state Show the occupancy status of resources , Queues are used to store queued threads ( The head node is the sentinel node ). Each thread node contains a wait state attribute waitStatus, It is used to indicate the waiting state of the corresponding thread . It should be noted that ,1. Queues don't have to be FIFO , When the first thread in sequence interrupts the wait and no other thread grabs resources , From the end of the queue, it will find the first non interrupted thread to wake up the execution .2. The queue header is not the next thread node that checks execution , It's a sentinel node , The next one will check the second one .

版权声明
本文为[Meng Xin J]所创,转载请带上原文链接,感谢