Эта статья основана на версии go1.14.6.
Применение
- новая горутинадо созданияперечислить
WaitGroup.Add(1)
И вв конце исполненияперечислитьWaitGroup.Done()
- Вызывается внутри блокирующей ожидающей горутины
WaitGroup.Wait()
, когда вызов возвращается, горутина, которую он ожидает, должна завершить выполнение
Пример использования
package main
import (
"fmt"
"sync"
)
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 3; i++ {
wg.Add(1)
go func() {
fmt.Println("hello world!")
wg.Done()
}()
}
wg.Wait()
fmt.Println("main done!")
}
Приведенный выше код показывает более типичныйsync.WaitGroup
использовать сценарии, т.е.Одингорутина ждет другогонесколькоКонец горутины.
Для приведенного выше кодаsync.WaitGroup
гарантировано вfor
серединавсевыполнение горутиныfmt.Println("hello world!")
После окончания выполнитьfmt.Println("main done!")
, а затем выйти из программы.
Чтение исходного кода
Следующий исходный код совпадает с /src/sync/waitgroup.go.
хочу знатьsync.WaitGroup
Для реализации и ям, с которыми можно столкнуться в процессе использования, необходимо иметь глубокое понимание исходного кода.sync.WaitGroup
Исходных кодов не так много, но будет рассмотрено множество ситуаций параллелизма, а общая сложность умеренная, что очень подходит для новичков в качестве отправной точки для чтения исходного кода.
структура
type WaitGroup struct {
// 这是sync库中常用的一个结构体,内部是空实现.
// 当结构体被初始化并使用后,如果对改值进行copy则go vet检查代码时会警告
noCopy noCopy
// 关于这个成员变量,官方解释很复杂,对于初学者而言,只需要知道其中3个uint32的值分别存储了:
// 1. 被Add/Done方法操作的计数器,Add表示加,Done表示减,当计数器为0时,Wait会立即返回
// 2. 正在Wait()处阻塞的goroutine个数
// 3. 用于等待与唤醒的信号量,信号量在之后有具体介绍
state1 [3]uint32
}
метод
state
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}
Анализ методаWaitGroup
в структуреstate1
вернуть два указателя
вstatep
упомянутыйuint64
в переменной高32位
хранится计数器
,低32位
хранится в это время正在Wait处阻塞的goroutine个数
.
semap
Указывает на семафор, используемый для пробуждения и ожидания, подробности см. в последующем объяснении.
Add/Done
Done
На самом деле этоWaitGroup.Add(-1)
, так что это не объясняется отдельно
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
if race.Enabled {
//...新手遇到race.Enabled代码块时直接跳过即可,仅用于-race编译时,与主逻辑无关
}
//给 计数器 原子增加delta
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32) //v是本次增加后计数器的值
w := uint32(state) //w是此时正在Wait()处出阻塞等待的goroutine个数
if race.Enabled && delta > 0 && v == int32(delta) {
//...新手遇到race.Enabled代码块时直接跳过即可,仅用于-race编译时,与主逻辑无关
}
//在正常情况下,每次创建goroutine,计数器v加1;
//该goroutine个数执行完毕调用Done使计数器v减1; 所以计数器v一定>=0
if v < 0 {
panic("sync: negative WaitGroup counter")
}
//此处检查一种情况: 当第一个Add()与Wait()并发调用到此处时,如果Wait()被先执行,
//此时计数器v为0,Wait()会直接返回;
//如果Add()先执行,此时计数器v>0之后的Wait()执行时会被阻塞等待;
//此时出现了由于数据竞争导致的结果不一致,所以panic提示调用者.
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
//当计数器v>0时,表示不应该对正在Wait()的goroutine执行任何操作,应直接返回
//当w==0是表示没有正在Wait()的goroutine,应直接返回
if v > 0 || w == 0 {
return
}
//注意: 在没有错误的情况下,代码执行到此处时(v==0 && w>0)
// This goroutine has set counter to 0 when waiters > 0.
// Now there can't be concurrent mutations of state:
// - Adds must not happen concurrently with Wait,
// - Wait does not increment waiters if it sees counter == 0.
// Still do a cheap sanity check to detect WaitGroup misuse.
// 此处检测两种并发情况:
// 1. 当w从>0的值变为0时,禁止新的Add()调用,因为此时新建的goroutine可能不会被Wait
// 2. 当v==0时,一定不会有新的Wait()使得w++,因为在Wait()内部看到v==0就会立即返回,详见下文
// 以上任何一种情况出发时,*statep都会被更新,导致(state!=*statep)
if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 代码执行到此处时,我们要根据w的数量,依次唤醒在Wait()处等待的goroutine,所以此时
// 可以将w也置为0,即v=0,w=0,所以*statep就可以赋值为0
*statep = 0
for ; w != 0; w-- {
//runtime_Semrelease是runtime包的内建函数,
//可以简单理解为唤醒runtime_Semacquire处阻塞等待的goroutine
runtime_Semrelease(semap, false, 0)
}
}
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
Wait
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
if race.Enabled {
//...新手遇到race.Enabled代码块时直接跳过即可,仅用于-race编译时,与主逻辑无关
}
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32) //计数器
w := uint32(state) //正在Wait()的goroutine数量
if v == 0 {
// 计数器v==0就不需要等待直接返回即可
if race.Enabled {
//...新手遇到race.Enabled代码块时直接跳过即可,仅用于-race编译时,与主逻辑无关
}
return
}
// 尝试原子地将w+1,如果失败就在for循环内不停尝试,类似乐观锁
if atomic.CompareAndSwapUint64(statep, state, state+1) {
if race.Enabled && w == 0 {
//...新手遇到race.Enabled代码块时直接跳过即可,仅用于-race编译时,与主逻辑无关
}
//此处v>0,所以需要等待v减为0;当v通过Add()减为0时,会唤醒此处的等待
runtime_Semacquire(semap)
//简单检测一个并发问题: 当Wait()被唤醒后,应满足v==0&&w==0
//否则一定出现了Wait()返回前,Add()被并发调用的问题
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
if race.Enabled {
//...新手遇到race.Enabled代码块时直接跳过即可,仅用于-race编译时,与主逻辑无关
}
return
}
}
}
Суммировать
- когда
WaitGroup
После инициализации и использования (например, при вызове Add()) копирование значения не может быть выполнено, если его необходимо передать, оно передается по адресу. - После выполнения Wait() не следует вызывать Add() для увеличения счетчика, следует вызывать только Done() для уменьшения счетчика до 0.