основная концепция
Удобный параллелизм — главное преимущество Golang, и использование параллелизма не является чем-то новым для группы ожидания пакета синхронизации. Группа ожидания в основном используется для ожидания параллельных экземпляров Golang, а именно горутин.При использовании go для запуска нескольких параллельных программ группа ожидания может дождаться завершения всех программ go перед выполнением следующей логики кода, например:
func Main() {
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(10 * time.Second)
}()
}
wg.Wait() // 等待在此,等所有go func里都执行了Done()才会退出
}
Принцип реализации
WaitGroup предоставляет три внешних метода: Add(int), Done() и Wait(), из которых Done() вызывает Add(-1). Общий метод использования заключается в унификации сначала Add, одновременно Done в горутине, а затем Wait. .
Группа ожидания в основном поддерживает два счетчика: один — счетчик запросов v, а другой — счетчик ожидания w. Оба образуют 64-битное значение, счетчик запросов занимает 32 бита по высоте, а счетчик ожидания занимает 32 бита по низу.
Каждый раз, когда выполняется Add, счетчик запросов v увеличивается на 1, выполняется метод Done, счетчик запросов уменьшается на 1, и когда v равно 0, семафор просыпается Wait().
Так для чего же нужен счетчик ожидания? Это связано с тем, что метод Wait() одного и того же экземпляра поддерживает несколько вызовов.Каждый раз, когда выполняется метод Wait(), счетчик ожидания w будет увеличиваться на 1, а когда счетчик запроса v равен 0 для запуска Wait(), он будет отправлено в соответствии с количеством w.W долей семафора, правильно инициировать все Wait(), хотя это не часто используемая функция, но она полезна в некоторых особых случаях (например, множественный параллелизм зависит от конечного сигнала экземпляра группы ожидания, чтобы перейти к следующему действию), код демонстрации выглядит следующим образом:
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
}()
}
time.Sleep(2 * time.Second)
for j := 0; j < 3; j++ {
go func(i int) {
// 3个地方调用Wait(),通过等待j计时器,每个Wati都会被hu唤醒
wg.Wait()
fmt.Println("wait done now ", i)
}(j)
}
time.Sleep(10 * time.Second)
return
}
/*
输出如下,数字出现的顺序随机
wait done now 1
wait done now 0
wait done now 2
*/
При этом в WaitGroup строго проверяется логика использования, например, Wait() не может быть Add() после запуска.
Вот аннотированный код с удаленной частью трассировки, которая не влияет на логику кода:
func (wg *WaitGroup) Add(delta int) {
statep := wg.state()
// 更新statep,statep将在wait和add中通过原子操作一起使用
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32)
w := uint32(state)
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
// wait不等于0说明已经执行了Wait,此时不容许Add
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 正常情况,Add会让v增加,Done会让v减少,如果没有全部Done掉,此处v总是会大于0的,直到v为0才往下走
// 而w代表是有多少个goruntine在等待done的信号,wait中通过compareAndSwap对这个w进行加1
if v > 0 || w == 0 {
return
}
// This goroutine has set counter to 0 when waiters > 0.
// Now there can't be concurrent mutations of state:
// - Adds must not happen concurrently with Wait,
// - Wait does not increment waiters if it sees counter == 0.
// Still do a cheap sanity check to detect WaitGroup misuse.
// 当v为0(Done掉了所有)或者w不为0(已经开始等待)才会到这里,但是在这个过程中又有一次Add,导致statep变化,panic
if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// Reset waiters count to 0.
// 将statep清0,在Wait中通过这个值来保护信号量发出后还对这个Waitgroup进行操作
*statep = 0
// 将信号量发出,触发wait结束
for ; w != 0; w-- {
runtime_Semrelease(&wg.sema, false)
}
}
// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
statep := wg.state()
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
w := uint32(state)
if v == 0 {
// Counter is 0, no need to wait.
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
// Increment waiters count.
// 如果statep和state相等,则增加等待计数,同时进入if等待信号量
// 此处做CAS,主要是防止多个goroutine里进行Wait()操作,每有一个goroutine进行了wait,等待计数就加1
// 如果这里不相等,说明statep,在 从读出来 到 CAS比较 的这个时间区间内,被别的goroutine改写了,那么不进入if,回去再读一次,这样写避免用锁,更高效些
if atomic.CompareAndSwapUint64(statep, state, state+1) {
if race.Enabled && w == 0 {
// Wait must be synchronized with the first Add.
// Need to model this is as a write to race with the read in Add.
// As a consequence, can do the write only for the first waiter,
// otherwise concurrent Waits will race with each other.
race.Write(unsafe.Pointer(&wg.sema))
}
// 等待信号量
runtime_Semacquire(&wg.sema)
// 信号量来了,代表所有Add都已经Done
if *statep != 0 {
// 走到这里,说明在所有Add都已经Done后,触发信号量后,又被执行了Add
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}