当前位置:网站首页>How does semaphore, a thread synchronization tool that uses an up counter, look like?

How does semaphore, a thread synchronization tool that uses an up counter, look like?

2020-11-08 23:46:57 Liu Zhihang

Preface

stay JUC Thread synchronizer in addition to CountDownLatch and CycleBarrier , There's another one called Semaphore ( Semaphore ), Again based on AQS Realized . Let's look at the internal principle of semaphores .

official account :『 Liu Zhihang 』, Record the skills in work study 、 Development and source notes ; From time to time to share some of the life experience . You are welcome to guide !

Introduce

One count semaphore . conceptually , Semaphores maintain a set of permissions . If necessary , Call before permission is available acquire Methods will be blocked , Until the license is available . call release Method will add a license , To release the blocked thread .

  1. Specify the initial license number when declaring .
  2. call acquire(int permits) Method , Specify the target number of permits .
  3. call release(int permits) Method , Issue a specified number of licenses .

When the permitted quantity does not reach the specified target quantity , call acquire Method's thread will be blocked .

Basic use

public class SemaphoreTest1 {

    private static final Semaphore SEMAPHORE = new Semaphore(0);

    public static void main(String[] args) throws InterruptedException {

        ExecutorService pool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024),
                new ThreadFactoryBuilder().setNameFormat("Thread-pool-%d").build(),
                new ThreadPoolExecutor.AbortPolicy());


        for (int i = 0; i < 5; i++) {

            pool.submit(() -> {

                try {
                    Thread.sleep(1000 + new Random().nextInt(1000));
                } catch (InterruptedException ignored) {
                }

                System.out.println(" Current thread : " + Thread.currentThread().getName() + "  Issue a license ");
                SEMAPHORE.release(1);

            });
        }

        System.out.println("----->  This is the main thread ");

        SEMAPHORE.acquire(5);

        System.out.println("----->  Main thread execution completed ");

        pool.shutdown();
    }

}
----->  This is the main thread 
 Current thread : Thread-pool-2  Issue a license 
 Current thread : Thread-pool-4  Issue a license 
 Current thread : Thread-pool-1  Issue a license 
 Current thread : Thread-pool-0  Issue a license 
 Current thread : Thread-pool-3  Issue a license 
----->  Main thread execution completed 

The above method is similar to CountDownLatch Usage of , After the child thread has finished executing , The main thread continues to execute . It's just Semaphore and CountDownLatch The biggest difference is :

Semaphore Is to increase from the specified value , Until the number of permits is reached , Then the blocked thread starts to execute .

CountDownLatch It starts with a specified number of threads , Until 0 when , The blocked thread begins to execute .

Of course, it's just the simplest use , In addition, let the main thread wait , You can also make other threads wait , And then start to execute .

Question question

  1. Semaphore and AQS What does it matter ?
  2. Semaphore and CountDownLatch What's the difference? ?

Source code analysis

The basic structure

Semaphore-cover-iGaTzJ

It can be seen from the class diagram that Semaphore There is a static inner class inside Sync Inherited AQS, At the same time, in order to distinguish between fair and unfair situations ,Sync There are two subclasses respectively :NonfairSync 、FairSync.

Next, according to the case, from the constructor 、acquire()、release() Starting with , To understand the internal implementation principle .

initialization

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

Initialize default unfair lock , At the same time, you need to pass in the specified number of permissions , You can see that this code is calling AQS Of setState(permits) Method . The code is as follows :

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {
        super(permits);
    }
}

abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        Sync(int permits) {
            setState(permits);
        }
 }

setState The method is actually to AQS Of state Assign a value .

Add

  1. stay ReentrantLock in state It stands for lock state ,0 No thread gets lock , Greater than or equal to 1 A thread has already acquired a lock , Greater than 1 Indicates that the thread that obtained the lock has re entered multiple times .
  2. stay ReentrantReadWriteLock in state Represents the state of the lock .state by 0 , No thread holds lock ,state The height of 16 Represents the read lock state , low 16 Represents the write lock state . The actual value of the read-write lock can be obtained by bit operation .
  3. And in here, (CountDownLatch) It represents the value of the latch or count .

If the state Forgotten , You can read the preceding AQS 、CAS Related codes . state Here is the allowed number of semaphores .

acquire()

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

acquire() and acquire(int permits) All the calls are sync.acquireSharedInterruptibly(permits) Method , It's just a passing parameter , One default is 1.

acquireSharedInterruptibly Method , In fact, that is Sync Inherited from AQS Of .

This one can be read AQS The article , Here is a brief introduction :

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
  1. After failure, you will use doAcquireSharedInterruptibly(arg); Keep getting resources ;
  2. final Node node = addWaiter(Node.SHARED); Nodes will be created and put into the queue in shared mode ;
  3. Continue to judge the previous node in the loop , If it is head, Then try to get shared resources ;
  4. In the shared mode, you will use setHeadAndPropagate(node, r); Set the head node , At the same time, wake up the subsequent nodes .

tryAcquireShared It needs subclasses to implement , That is to say Semaphore.Sync In the implementation class of , Here we use FairSync Explain :


static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;

    FairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        for (;;) {
            //  If there's a node in front of it , Then return directly  -1  It means failure 
            if (hasQueuedPredecessors())
                return -1;
            //  Get current semaphore 
            int available = getState();
            //  Get the current surplus 
            int remaining = available - acquires;
            //  If it is less than  0  perhaps  CAS  Set semaphore successfully   Then return directly 
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

And the meaning of this code is :

  1. If there's a node in front of it , It's blocking directly ;
  2. If the current residual semaphore is less than 0 , Then return a negative value , Direct blocking ;
  3. If the current surplus is greater than or equal to 0 , Meeting CAS Update semaphore , And return a nonnegative number .

The meaning of this value is , stay AQS It defines , The meaning is as follows :

  1. Less than 0: It means failure ;
  2. be equal to 0: Indicates that the shared mode has successfully obtained resources , However, subsequent nodes cannot succeed in shared mode ;
  3. Greater than 0: Indicates that the shared mode has successfully obtained resources , Subsequent nodes may also succeed in sharing mode , under these circumstances , Subsequent waiting threads must check availability .

release()

public void release() {
    sync.releaseShared(1);
}
public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}

A given number of issuance licenses , This number increases the number of licenses available . Look at its internal call is Sync Of releaseShared, In fact, that is AQS The corresponding method of :


public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

If you realize tryReleaseShared return true, Release resources in shared mode . Among them tryReleaseShared Partly from Semaphore.Sync To realize , The logic is as follows :


protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        //  Get current  state
        int current = getState();
        //  Yes  state  Add 
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        //  Use  CAS  assignment 
        if (compareAndSetState(current, next))
            return true;
    }
}

As you can see from the code above , stay Semaphore Of release The main method is to state Add , After successful addition, it will call AQS Of doReleaseShared Method to wake up the head node .

summary

Q&A

Q: since Semaphore Is based on AQS, That's in Semaphore in state What does it mean ?
A: stay Semaphore in state Represents the number of licenses ,acquire Method when the permission is less than a specified number of threads will be blocked ,release Method to increase the permission. When the permission increase is successful, the blocking node will be awakened .

Q: Semaphore be based on AQS How does this work ?
A:

  1. Initial settings state The initial value of the , That's the initial number of licenses .
  2. acquire Method to set the target number , When the target quantity is greater than the current quantity , Will block the thread and put it in the blocking queue . This is based on AQS Realization .
  3. release Yes state Add , When successful, it will call AQS Of doReleaseShared Wake up the head node . Again based on AQS Realization .

Q: Semaphore and CountDownLatch What's the difference? ?
A: Semaphore The counter is incremental , and CountDownLatch It's decreasing . The same thing is that the counters can't be reset .

Conclusion

In the reading Semaphore In the process of source code , Found that its main functions are based on AQS Realized , You can look back and read AQS Notes on . Again Semaphore It also supports fair and unfair models , You need to read this by yourself .

版权声明
本文为[Liu Zhihang]所创,转载请带上原文链接,感谢