当前位置:网站首页>【goang】 sync.WaitGroup Detailed explanation

【goang】 sync.WaitGroup Detailed explanation

2020-11-10 09:13:36 Go to 1002

One 、 Preface


Go Language is designed to synchronize (Synchronization, Data synchronization and thread synchronization ) Provide a lot of support , such as  goroutine and channel Synchronization primitives , There are 

  
- sync: Provides basic synchronization primitives ( such as Mutex、RWMutex、Locker) and   Tool class (Once、WaitGroup、Cond、Pool、Map)
- sync/atomic: Provides atomic operations for variables ( Based on hardware instruction  compare-and-swap)

-- Quote from 《Golang package sync analyse ( One ): sync.Once》

In the last issue , We introduced sync.Once How to protect exactly once semantics , In this issue, we introduce package sync Another tool class under :sync.WaitGroup.

Two 、 Why WaitGroup

Imagine a scene : We have a user portrait service , When a request comes , need

  1. from request Parsing out user_id and Portrait dimension parameters
  2. according to user_id from ABCDE Five sub services ( Database services 、 Storage service 、rpc Service etc. ) Pull information from different dimensions
  3. Integrate the information read , Return to caller

hypothesis ABCDE Response time for five services p99 yes 20~50ms Between . If we call ABCDE Read information , Regardless of the time consumed by data consolidation , The overall response time of the server p99 yes :

sum(A, B, C, D, E) => [100ms, 250ms]

Let's not say whether it can be accepted in business , There is obviously a lot of room for optimization in response time . The most intuitive optimization direction is , The total time consumption of access logic :

sum(A, B, C, D, E) -> max(A, B, C, D, E)

Specific to the coding On , We need to call in parallel ABCDE Five sub services , To be called All After returning , Data integration . How to protect All How about going back ?

here ,sync.WaitGroup Glaring debut .

3、 ... and 、WaitGroup usage

The official document is right WaitGroup The description of is : One WaitGroup Object can wait for a set of coroutines to end . How to use it :

  1. main By calling wg.Add(delta int) Set up worker The number of coprocesses , Then create worker coroutines ;
  2. worker After the execution of the program , Call the wg.Done();
  3. main Coroutine call wg.Wait() And be block, Until all the worker When all the programs are executed, it returns .

Here's a typical example :

// src/cmd/compile/internal/ssa/gen/main.go
func  main() {
  //  Omitted code  ...
  var wg sync.WaitGroup
  for _, task := range tasks {
    task  := task
    wg.Add(1)
    go func() {
      task()
      wg.Done()
    }()
  }
  wg.Wait()
  //  Omitted code ...
}

This example has WaitGroup Most of the elements of proper use , Include :

  1. wg.Done Must be in all wg.Add After performing , So make sure that both functions are in main Call in association ;
  2. wg.Done stay worker Call in the coroutine , In particular, make sure to call once , Can't because panic Or for any reason ( It is recommended to use defer wg.Done());
  3. wg.Done and wg.Wait There is no order in time sequence .

Careful friends may find a very strange line of code :

task  := task

Go Yes array/slice When traversing ,runtime Will be able to task[i] copy to task Memory address of , Subscript i Will change , and task The memory address of will not change . If you don't do this assignment , all goroutine Maybe all I read is the last one task. In order to give you an intuitive feeling , Let's experiment with the following code :

package main

import (
  "fmt"
  "unsafe"
)

func main() {
  tasks := []func(){
    func() { fmt.Printf("1. ") },
    func() { fmt.Printf("2. ") },
  }

  for idx, task := range tasks {
    task()
    fmt.Printf(" Traverse  = %v, ", unsafe.Pointer(&task))
    fmt.Printf(" Subscript  = %v, ", unsafe.Pointer(&tasks[idx]))
    task  := task
    fmt.Printf(" local variable  = %vn", unsafe.Pointer(&task))
  }
}

The printed result of this code is :

1.  Traverse  = 0x40c140,  Subscript  = 0x40c138,  local variable  = 0x40c150
2.  Traverse  = 0x40c140,  Subscript  = 0x40c13c,  local variable  = 0x40c158

Printing results are different on different machines , But they have something in common :

  1. Ergodic time , The memory address of the data remains unchanged
  2. When accessing data by subscript , Different memory addresses
  3. for-loop Local variables created within , Even if the name is the same , Memory addresses will not be reused

Use WaitGroup when , In addition to the precautions mentioned above , We also need to solve the problem of data collection and exception handling . Here we also provide two ways for reference :

  1. about rpc call , Can pass data channel and error channel Gather information , Or two in one channel
  2. Shared variables , Like locked map

Four 、WaitGroup Realization

Before we discuss the subject , Readers are advised to think about it first : If you're going to make it happen WaitGroup, What would you do ?

lock ? Definitely not !

Semaphore ? How to achieve ?

------------ Cut to the chase ------------

stay Go Source code ,WaitGroup Include logically :

  1. worker Counter :main Coroutine call wg.Add(delta int) Increase when delta, call wg.Done Time minus one .
  2. waiter Counter : call wg.Wait when , Add one counter ; worker Counter down to 0 when , Reset waiter Counter .
  3. Semaphore : For blocking main coroutines . call wg.Wait when , adopt runtime_Semacquire Acquisition semaphore ; Reduce waiter Counter time , adopt runtime_Semrelease Release semaphore .

For demonstration purposes , Let's change the example above :

package main

import (
  "fmt"
  "sync"
  "time"
)

func main() {
  tasks  := []func(){
    func() { time.Sleep(time.Second); fmt.Println("1 sec later") },
    func() { time.Sleep(time.Second *  2); fmt.Println("2 sec later") },
}

  var wg sync.WaitGroup // 1-1
  wg.Add(len(tasks))    // 1-2
  for _, task := range tasks {
    task  := task
    go func() {       // 1-3-1
      defer wg.Done() // 1-3-2
      task()          // 1-3-3
    }()               // 1-3-1
  }
  wg.Wait()           // 1-4
  fmt.Println("exit")
}

In the above code ,

  1. 1-1 Create a WaitGroup object ,worker The counter and waiter The default value of the counter is 0.
  2. 1-2 Set up worker The counter for len(tasks).
  3. 1-3-1 establish worker coroutines , And start the task .
  4. 1-4 Set up waiter Counter , Acquisition semaphore ,main Coprocess blocked .
  5. 1-3-3 After the end of execution ,1-3-2 Reduce worker Counter . When worker Counter down to 0 when ,

    • Reset waiter Counter
    • Release semaphore ,main The coroutine is activated ,1-4 wg.Wait return

Even though Add(delta int) in delta It can be a positive number 、0、 negative . When we use ,delta Always positive .

wg.Done Equivalent to wg.Add(-1). In this paper , We mentioned wg.Add when , Default delta > 0.

I understand WaitGroup After the principle of , Let's take a look at the source code . For the sake of understanding , I only keep the core logic . For this part of logic , Let's talk about... In three parts :

  1. WaitGroup structure
  2. Add and Done
  3. Wait

Tips : If you just want to know WaitGroup The correct use of , This is enough for this article . Those who are interested in the bottom can continue to read , But it's better to open it IDE, Read the source code together .

4.1 WaitGroup structure

type WaitGroup struct {
  noCopy noCopy
  state1 [3]uint32
}

WaitGroup There are in the structure noCopy and state1 Two fields .

When compiling code ,go vet Tools will check noCopy Field , avoid WaitGroup The object is copied .

state1 Field comparison , Logically, it contains worker Counter 、waiter Counter and semaphore . How to read these three variables , Refer to the following code :

// state returns pointers to the state and sema fields stored within wg.state1.
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
  if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
    return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
  } else {
    return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
  }
}

//  Read counter and semaphore 
statep, semap := wg.state()
state  := atomic.LoadUint64(statep)
v := int32(state >> 32)
w := uint32(state)

The retrieval logic of the three variables is :

  • worker Counter :v yes statep *uint64 Of Left 32 position
  • waiter Counter :w yes statep *uint64 Of Right 32 position
  • Semaphore :semap yes state1 [3]uint32 The first byte of / Last byte

therefore , to update worker Counter , It needs to be done :

state := atomic.AddUint64(statep, uint64(delta)<<32)

to update waiter Counter , It needs to be done :

statep, semap := wg.state()
for {
  state := atomic.LoadUint64(statep)
  if atomic.CompareAndSwapUint64(statep, state, state+1)   {
    //  Ignore other logic 
    return
  }
}

Careful friends may find out ,worker The update of the counter is a direct accumulation , and waiter The update of the counter is CompareAndSwap. This is because in the main In the process of cooperation wg.Add when , Only main Association pair state1 Making a change ; and wg.Wait Revision in China waiter Counter time , There may be many collaborators updating state1. If you don't quite understand this passage , You might as well go down first , understand wg.Add and wg.Wait And then look back at the details of .

4.2 Add and Done

wg.Add The core logic of the operation is relatively simple , I.e. modification worker Counter , according to worker The state of the counter is followed up . The code for the simplified version is as follows :

func (wg *WaitGroup) Add(delta int) {
  statep, semap := wg.state()
  // 1.  modify worker Counter 
  state := atomic.AddUint64(statep, uint64(delta)<<32)
  v := int32(state >> 32)
  w := uint32(state)
  if v <  0 {
    panic("sync: negative WaitGroup counter")
  }
  if w != 0 && delta > 0 && v == int32(delta) {
    panic("sync: WaitGroup misuse: Add called concurrently with Wait")
  }
  // 2.  Judgment counter 
  if v > 0 || w == 0 {
    return
  }
  
  // 3.  When  worker Counter down to 0 when 
  //  Reset  waiter Counter , And release the semaphore 
  *statep = 0
  for ; w != 0; w-- {
    runtime_Semrelease(semap, false)
  }
}

func (wg *WaitGroup) Done() {
  wg.Add(-1)
}

4.3 Wait

wg.Wait The logic is to modify waiter Counter , And wait for the semaphore to be released . The code for the simplified version is as follows :

func (wg *WaitGroup) Wait() {
  statep, semap  := wg.state()
  for {
    // 1.  Read counter 
    state := atomic.LoadUint64(statep)
    v := int32(state >> 32)
    w := uint32(state)
    if v == 0 {
      return
    }

    // 2.  increase waiter Counter 
    if atomic.CompareAndSwapUint64(statep, state, state+1) {
      // 3.  Acquisition semaphore 
      runtime_Semacquire(semap)
      if *statep != 0 {
        panic("sync: WaitGroup is reused before previous Wait has returned")
      }
    
      // 4.  Semaphore acquisition successful 
      return
    }
  }
}

Because the source code is relatively long , Contains a lot of validation logic and comments , In this paper, when quoting , At the same time, the core logic has been deleted in varying degrees . Last , Recommend that you download the source code , Read it carefully , Right in detail WaitGroup There is a deeper understanding of the design of .

版权声明
本文为[Go to 1002]所创,转载请带上原文链接,感谢