Перед исходным кодом нет никаких секретов | Стандартная библиотека Golang sync.WaitGroup

Go

Эта статья основана на версии go1.14.6.

Применение

  1. новая горутинадо созданияперечислитьWaitGroup.Add(1)И вв конце исполненияперечислитьWaitGroup.Done()
  2. Вызывается внутри блокирующей ожидающей горутиныWaitGroup.Wait(), когда вызов возвращается, горутина, которую он ожидает, должна завершить выполнение

Пример использования

package main

import (
	"fmt"
	"sync"
)

func main() {
	wg := sync.WaitGroup{}

	for i := 0; i < 3; i++ {
		wg.Add(1)
		go func() {
			fmt.Println("hello world!")
			wg.Done()
		}()
	}

	wg.Wait()
	fmt.Println("main done!")
}

Приведенный выше код показывает более типичныйsync.WaitGroupиспользовать сценарии, т.е.Одингорутина ждет другогонесколькоКонец горутины.

Для приведенного выше кодаsync.WaitGroupгарантировано вforсерединавсевыполнение горутиныfmt.Println("hello world!")После окончания выполнитьfmt.Println("main done!"), а затем выйти из программы.

Чтение исходного кода

Следующий исходный код совпадает с /src/sync/waitgroup.go.

хочу знатьsync.WaitGroupДля реализации и ям, с которыми можно столкнуться в процессе использования, необходимо иметь глубокое понимание исходного кода.sync.WaitGroupИсходных кодов не так много, но будет рассмотрено множество ситуаций параллелизма, а общая сложность умеренная, что очень подходит для новичков в качестве отправной точки для чтения исходного кода.

структура

type WaitGroup struct {
    // 这是sync库中常用的一个结构体,内部是空实现.
    // 当结构体被初始化并使用后,如果对改值进行copy则go vet检查代码时会警告
	noCopy noCopy

	// 关于这个成员变量,官方解释很复杂,对于初学者而言,只需要知道其中3个uint32的值分别存储了:
    // 1. 被Add/Done方法操作的计数器,Add表示加,Done表示减,当计数器为0时,Wait会立即返回
    // 2. 正在Wait()处阻塞的goroutine个数
    // 3. 用于等待与唤醒的信号量,信号量在之后有具体介绍
	state1 [3]uint32
}

метод

state

func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
	if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
		return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
	} else {
		return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
	}
}

Анализ методаWaitGroupв структуреstate1вернуть два указателя

вstatepупомянутыйuint64в переменной高32位хранится计数器,低32位хранится в это время正在Wait处阻塞的goroutine个数.

semapУказывает на семафор, используемый для пробуждения и ожидания, подробности см. в последующем объяснении.

Add/Done

DoneНа самом деле этоWaitGroup.Add(-1), так что это не объясняется отдельно

func (wg *WaitGroup) Add(delta int) {
	statep, semap := wg.state()
	if race.Enabled {
		//...新手遇到race.Enabled代码块时直接跳过即可,仅用于-race编译时,与主逻辑无关
	}
    
    //给 计数器 原子增加delta
	state := atomic.AddUint64(statep, uint64(delta)<<32)
	v := int32(state >> 32) //v是本次增加后计数器的值
	w := uint32(state)      //w是此时正在Wait()处出阻塞等待的goroutine个数
	if race.Enabled && delta > 0 && v == int32(delta) {
		//...新手遇到race.Enabled代码块时直接跳过即可,仅用于-race编译时,与主逻辑无关
	}
    
    //在正常情况下,每次创建goroutine,计数器v加1;
    //该goroutine个数执行完毕调用Done使计数器v减1; 所以计数器v一定>=0
	if v < 0 {
		panic("sync: negative WaitGroup counter")
	}
    
    //此处检查一种情况: 当第一个Add()与Wait()并发调用到此处时,如果Wait()被先执行,
    //此时计数器v为0,Wait()会直接返回;
    //如果Add()先执行,此时计数器v>0之后的Wait()执行时会被阻塞等待;
    //此时出现了由于数据竞争导致的结果不一致,所以panic提示调用者.
	if w != 0 && delta > 0 && v == int32(delta) {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
    
    //当计数器v>0时,表示不应该对正在Wait()的goroutine执行任何操作,应直接返回
    //当w==0是表示没有正在Wait()的goroutine,应直接返回
	if v > 0 || w == 0 {
		return
	}
    
    //注意: 在没有错误的情况下,代码执行到此处时(v==0 && w>0)
    
	// This goroutine has set counter to 0 when waiters > 0.
	// Now there can't be concurrent mutations of state:
	// - Adds must not happen concurrently with Wait,
	// - Wait does not increment waiters if it sees counter == 0.
	// Still do a cheap sanity check to detect WaitGroup misuse.
    
    // 此处检测两种并发情况:
    // 1. 当w从>0的值变为0时,禁止新的Add()调用,因为此时新建的goroutine可能不会被Wait
    // 2. 当v==0时,一定不会有新的Wait()使得w++,因为在Wait()内部看到v==0就会立即返回,详见下文
    // 以上任何一种情况出发时,*statep都会被更新,导致(state!=*statep)
	if *statep != state {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
    
    // 代码执行到此处时,我们要根据w的数量,依次唤醒在Wait()处等待的goroutine,所以此时
    // 可以将w也置为0,即v=0,w=0,所以*statep就可以赋值为0
	*statep = 0
	for ; w != 0; w-- {
        //runtime_Semrelease是runtime包的内建函数,
        //可以简单理解为唤醒runtime_Semacquire处阻塞等待的goroutine
		runtime_Semrelease(semap, false, 0)
	}
}

func (wg *WaitGroup) Done() {
	wg.Add(-1)
}

Wait

func (wg *WaitGroup) Wait() {
	statep, semap := wg.state()
	if race.Enabled {
		//...新手遇到race.Enabled代码块时直接跳过即可,仅用于-race编译时,与主逻辑无关
	}
	for {
		state := atomic.LoadUint64(statep)
		v := int32(state >> 32) //计数器
        w := uint32(state)      //正在Wait()的goroutine数量
		if v == 0 {
			// 计数器v==0就不需要等待直接返回即可
			if race.Enabled {
				//...新手遇到race.Enabled代码块时直接跳过即可,仅用于-race编译时,与主逻辑无关
			}
			return
		}
		// 尝试原子地将w+1,如果失败就在for循环内不停尝试,类似乐观锁
		if atomic.CompareAndSwapUint64(statep, state, state+1) {
			if race.Enabled && w == 0 {
				//...新手遇到race.Enabled代码块时直接跳过即可,仅用于-race编译时,与主逻辑无关
			}
            //此处v>0,所以需要等待v减为0;当v通过Add()减为0时,会唤醒此处的等待
			runtime_Semacquire(semap)
            
            //简单检测一个并发问题: 当Wait()被唤醒后,应满足v==0&&w==0
            //否则一定出现了Wait()返回前,Add()被并发调用的问题
			if *statep != 0 {
				panic("sync: WaitGroup is reused before previous Wait has returned")
			}
			if race.Enabled {
				//...新手遇到race.Enabled代码块时直接跳过即可,仅用于-race编译时,与主逻辑无关
			}
			return
		}
	}
}

Суммировать

  1. когдаWaitGroupПосле инициализации и использования (например, при вызове Add()) копирование значения не может быть выполнено, если его необходимо передать, оно передается по адресу.
  2. После выполнения Wait() не следует вызывать Add() для увеличения счетчика, следует вызывать только Done() для уменьшения счетчика до 0.