Go — использование пакета atomic и анализ исходного кода atomic.Value

Go

1. Атомарные операции в Go

Атомарность:Свойство, состоящее в том, что одна или несколько операций не прерываются во время выполнения ЦП, называется атомарностью. Эти операции представляются внешнему миру как неразрывное целое, они либо выполняются, либо не выполняются, и внешний мир не увидит, что они выполнены лишь наполовину.

Атомная операция:Операции, которые не могут быть прерваны во время процесса, атомарные операции поддерживаются базовым оборудованием, а блокировки реализуются API, предоставляемым операционной системой.Если реализована одна и та же функция, первая обычно более эффективна.

Минимальный случай:

package main

import (
	"sync"
	"fmt"
)

var count int

func add(wg *sync.WaitGroup) {
	defer wg.Done()
	count++
}

func main() {
	wg := sync.WaitGroup{}
	wg.Add(1000)
	for i := 0; i < 1000; i++ {
		go add(&wg)
	}
	wg.Wait()
	fmt.Println(count)
}

countне будет равно 1000, потому чтоcount++Этот шаг фактически состоит из трех операций:

  • читать по памятиcount
  • обновление процессораcount = count + 1
  • написатьcountна память

Поэтому несколько горутин будут считывать одно и то же значение, а затем обновлять одно и то же значение в памяти, что приводит к меньшим, чем ожидалось, конечным результатам.

2. Пакет sync/atomic в Go

Атомарные операции, предоставляемые языком Go, не являются навязчивыми и предоставляются стандартной библиотекой.sync/aotomicМножество функций в представлении

В пакете atomic поддерживается шесть типов

  • int32
  • uint32
  • int64
  • uint64
  • uintptr
  • unsafe.Pointer

Для каждого типа предусмотрено пять типов атомарных операций:

  • LoadXXX(addr): атомное приобретение*addrзначение, которое эквивалентно:
    return *addr
    
  • StoreXXX(addr, val): атомная воляvalЗначение сохраняется в*addr, Эквивалентно:
    addr = val
    
  • AddXXX(addr, delta): атомная воляdeltaЗначение добавляется к*addrи вернуть новое значение (unsafe.Pointerне поддерживается), эквивалентно:
    *addr += delta
    return *addr
    
  • SwapXXX(addr, new) old: атомная воляnewЗначение сохраняется в*addrи возвращает старое значение, которое эквивалентно:
    old = *addr
    *addr = new
    return old
    
  • CompareAndSwapXXX(addr, old, new) bool: атомарное сравнение*addrиold, если одинаковые, тоnewназначить в*addrи вернутьсяtrue, Эквивалентно:
    if *addr == old {
        *addr = new
        return true
    }
    return false
    

Go sync/atomic api

Поэтому случай первой части можно модифицировать следующим образом, что можно передать

// 修改方式1
func add(wg *sync.WaitGroup) {
	defer wg.Done()
	for {
		if atomic.CompareAndSwapInt32(&count, count, count+1) {
			break
		}
	}
}
// 修改方式2
func add(wg *sync.WaitGroup) {
	defer wg.Done()
	atomic.AddInt32(&count, 1)
}

3. Расширьте область атомарных операций: atomic.Value

Перейти язык в версии 1.4 наsync/atomicв пакет добавлены новые типыValue, который эквивалентен контейнеру и используется для «атомарного» хранения и загрузки значений любого типа

  • type Value
    • func(v *Value) Load() (x interface{}): операция чтения, чтение содержимого, сохраненного на предыдущем шаге, из потокобезопасного v
    • func(v *Value) Store(x interface{}): операция записи, сохранение исходной переменной x вatomic.Valueтип v

Например, автору было 22 года, когда он написал статью, и ему было 23 года, когда он написал ее.

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

func main() {
	// 此处依旧选用简单的数据类型,因为代码量少
	config := atomic.Value{}
	config.Store(22)

	wg := sync.WaitGroup{}
	wg.Add(10)
	for i := 0; i < 10; i++ {
		go func(i int) {
			defer wg.Done()
			// 在某一个goroutine中修改配置
			if i == 0 {
				config.Store(23)
			}
			// 输出中夹杂22,23
			fmt.Println(config.Load())
		}(i)
	}
	wg.Wait()
}

4. Анализ исходного кода atomic.Value

atomic.Valueпредназначен для хранения любых типов данных, поэтому его внутреннее поле представляет собойinterface{}тип

type Value struct {
	v interface{}
}

есть еще одинifaceWordsтип, как внутренний формат представления пустого интерфейса,тип представляет исходный тип, данные представляют реальное значение

// ifaceWords is interface{} internal representation.
type ifaceWords struct {
	typ  unsafe.Pointer
	data unsafe.Pointer
}

4.1 unsafe.Pointer

Язык Go не поддерживает прямое манипулирование памятью, но его стандартная библиотека предоставляетТипы указателей, для которых не гарантируется обратная совместимостьunsafe.Pointer, Он позволяет программе гибко манипулировать памятью, его особенностями являются:Может обходить системные проверки типа языка Go

Это:Если оба типа имеют одинаковую структуру памяти, мы можем положитьunsafe.PointerВ качестве моста пусть эти два типа указателей конвертируют друг друга, чтобы у одной и той же памяти было два метода интерпретации

Например, внутренняя структура хранения типа int и типа int32 одинакова, но необходимо выполнить преобразование типа указателя:

var a int32
// 获得a的*int类型指针
(*int)(unsafe.Pointer(&a))

4.2 Реализация операций атомарного чтения произвольной структуры

func (v *Value) Load() (x interface{}) {
    // 将*Value指针类型转换为*ifaceWords指针类型
	vp := (*ifaceWords)(unsafe.Pointer(v))
	// 原子性的获取到v的类型typ的指针
	typ := LoadPointer(&vp.typ)
	// 如果没有写入或者正在写入,先返回,^uintptr(0)代表过渡状态,见下文
	if typ == nil || uintptr(typ) == ^uintptr(0) {
		return nil
	}
	// 原子性的获取到v的真正的值data的指针,然后返回
	data := LoadPointer(&vp.data)
	xp := (*ifaceWords)(unsafe.Pointer(&x))
	xp.typ = typ
	xp.data = data
	return
}

4.3 Внедрение атомарного хранения операций произвольной структуры

Перед этим есть более важный фрагмент кода, которыйruntime_procPinМетод может использовать горутину, чтобы занять текущую используемуюP(ссылка здесьГорутин-планировщик (1): отношение P, M, G, не расходится) Никакие другие горутины не могут выполнять вытеснение, иruntime_procUnpinметод выпуска

// Disable/enable preemption, implemented in runtime.
func runtime_procPin()
func runtime_procUnpin()

Storeметод

func (v *Value) Store(x interface{}) {
	if x == nil {
		panic("sync/atomic: store of nil value into Value")
	}
	// 将现有的值和要写入的值转换为ifaceWords类型,这样下一步就能获取到它们的原始类型和真正的值
	vp := (*ifaceWords)(unsafe.Pointer(v))
	xp := (*ifaceWords)(unsafe.Pointer(&x))
	for {
		// 获取现有的值的type
		typ := LoadPointer(&vp.typ)
		// 如果typ为nil说明这是第一次Store
		if typ == nil {
			// 如果你是第一次,就死死占住当前的processor,不允许其他goroutine再抢
			runtime_procPin()
			// 使用CAS操作,先尝试将typ设置为^uintptr(0)这个中间状态
			// 如果失败,则证明已经有别的线程抢先完成了赋值操作
			// 那它就解除抢占锁,然后重新回到 for 循环第一步
			if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(^uintptr(0))) {
				runtime_procUnpin()
				continue
			}
			// 如果设置成功,说明当前goroutine中了jackpot
			// 那么就原子性的更新对应的指针,最后解除抢占锁
			StorePointer(&vp.data, xp.data)
			StorePointer(&vp.typ, xp.typ)
			runtime_procUnpin()
			return
		}
		// 如果typ为^uintptr(0)说明第一次写入还没有完成,继续循环等待
		if uintptr(typ) == ^uintptr(0) {
			continue
		}
		// 如果要写入的类型和现有的类型不一致,则panic
		if typ != xp.typ {
			panic("sync/atomic: store of inconsistently typed value into Value")
		}
		// 更新data
		StorePointer(&vp.data, xp.data)
		return
	}
}

5. Ссылка

Go sync/atomic официальная документация

Атомарные операции sync/atomic в Go

Прошлое и настоящее atomic.Value в стандартной библиотеке языка Go

Как оптимизирован sync.Pool в Go 1.13?