5.1 Примитивы синхронизации и блокировки
Когда дело доходит до параллельного программирования и многопоточного программирования, мы часто неотделимы от концепции «блокировки». Как язык, который изначально поддерживает процесс пользовательского режима Goroutine, язык Go определенно предоставит разработчикам эту функцию. Основная функция — чтобы гарантировать, что несколько потоков или Goroutines не будут испытывать путаницу при доступе к одному и тому же фрагменту памяти Блокировки на самом деле являются примитивами синхронизации в параллельном программировании.
В этом разделе мы представим общие примитивы синхронизации в языке Go.Mutex
,RWMutex
,WaitGroup
,Once
а такжеCond
и примитивы расширенияErrGroup
,Semaphore
а такжеSingleFlight
Он также включает в себя общие концепции параллельного программирования, такие как мьютексы и семафоры.
основные примитивы
Перейти на языкsyncВ пакете предусмотрены некоторые базовые примитивы для синхронизации, в том числе общие блокировки мьютексов.Mutex
с мьютексом чтения-записиRWMutex
так же какOnce
,WaitGroup
.
Основная функция этих основных примитивов состоит в обеспечении относительно базовых функций синхронизации. Мы должны использовать канал и связь для реализации более продвинутых механизмов синхронизации. В этом разделе мы не будем представлять все примитивы стандартной библиотеки, а представим более распространенные.Mutex
,RWMutex
,Once
,WaitGroup
а такжеCond
, мы не будем рассматривать оставшиеся две структуры для доступа к даннымMap
а такжеPool
.
Mutex
Мьютексы в языке Go находятся вsync
пакет, который состоит из двух полейstate
а такжеsema
сочинение,state
представляет текущее состояние мьютекса, иsema
Семафор, который действительно используется для управления состоянием блокировки, эти две структуры, которые в сумме занимают всего 8 байтов, представляют собой мьютекс в языке Go.
type Mutex struct {
state int32
sema uint32
}
условие
Состояние мьютекса используетсяint32
для представления, но состояние блокировки не является взаимоисключающим, его младшие три бита представляютmutexLocked
,mutexWoken
а такжеmutexStarving
, оставшиеся позиции используются для указания того, сколько горутин в настоящее время ожидает освобождения мьютекса:
При создании мьютекса значением по умолчанию для всех битов состояния является0
, когда мьютекс заблокированmutexLocked
будет установлен на1
, когда мьютекс просыпается в обычном режимеmutexWoken
будет установлен на1
,mutexStarving
Используется для указания того, что текущий мьютекс вошел в состояние, а последние несколько битов — это количество горутин, ожидающих текущего мьютекса.
режим голодания
Прежде чем понять конкретный процесс блокировки и разблокировки, нам нужно кратко понятьMutex
Режим голодания, в который можно войти во время использования, режим голодания на языке Go.1.9Функция, представленная версией, ее основная функция заключается в обеспечении «справедливости» приобретения блокировки взаимного исключения.
Мьютексы могут находиться в двух разных режимах одновременно, а именно в нормальном режиме и в голодающем режиме. Новый процесс Goroutine также называетсяLock
Чтобы уменьшить возникновение этой ситуации и предотвратить «голодную смерть» горутины, как только горутина не получает блокировку более 1 мс, она переключает текущий мьютекс в режим голодания.
В голодающем режиме блокировка мьютекса будет напрямую передана горутине, находящейся в начале очереди ожидания. Новая горутина не может получить блокировку и не войдет в состояние вращения в это время. Они будут ждать только в конце очереди. Если горутина получает мьютекс, и это последняя сопрограмма в очереди или она ожидает менее 1 мс, то текущий мьютекс переключается обратно в нормальный режим.
По сравнению с голодающим режимом, блокировки мьютекса в обычном режиме могут обеспечить лучшую производительность.Основная функция голодного режима состоит в том, чтобы избежать высокой задержки хвоста, вызванной некоторыми горутинами, которые находятся в ожидании и не могут получить блокировки.Mutex
оптимизация.
замок
МьютексMutex
Блокировка поLock
Метод завершен, и последний исходный код языка GoLock
Метод упрощен, и ствол метода сохраняет только самые распространенные, простые и быстрые случаи, когда состояние блокировки0
непосредственно, когдаmutexLocked
положение в1
:
func (m *Mutex) Lock() {
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
m.lockSlow()
}
но когдаLock
когда вызывается методMutex
статус не0
войдетlockSlow
Метод пытается дождаться освобождения блокировки и захватить мьютекс с помощью вращения или других методов.Тело метода представляет собой очень большойfor
Loop, мы разделим метод на несколько частей, чтобы представить процесс получения блокировки:
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
В первой части этого метода будет оцениваться, может ли текущий метод войти в спин, чтобы дождаться освобождения блокировки. Spinnig - это фактически механизм, используемый в процессе многопоточной синхронизации. Текущий процесс входит в spin Во время процесса ЦП будет занят, и он будет продолжать проверять, выполняется ли определенное условие.На многоядерном ЦП преимущество спина состоит в том, чтобы избежать переключения Горутины, поэтому, если он используется правильно, он принесет очень большой прирост производительности.
на языке гоMutex
В мьютексе возможен только вход в спин в обычном режиме, помимо ограничений режима,runtime_canSpin
Метод определит, может ли текущий метод войти в спин, а условия для входа в спин очень жесткие:
- работа на многопроцессорной машине;
- Текущая горутина делает менее четырех оборотов, чтобы получить блокировку;
- На текущей машине есть хотя бы один работающий процессор
P
и обработанная очередь запуска пуста;
Вызывается, когда текущая горутина может вращатьсяruntime_doSpin
, который в итоге вызывает метод, написанный на ассемблереprocyield
и выполнить указанное количество разPAUSE
инструкция,PAUSE
Инструкция ничего не делает, но потребляет процессорное время и вызывается при каждом вращении.30
второсортныйPAUSE
, ниже приведена реализация этого метода на машине с архитектурой 386:
TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
PAUSE
SUBL $1, AX
JNZ again
RET
После обработки специальной логики, связанной со вращением, мьютекс затем вычисляет последнее состояние текущего мьютекса в соответствии с контекстом, и соответственно будут обновлены несколько различных условий.state
различная информация, хранящаяся вmutexLocked
,mutexStarving
,mutexWoken
а такжеmutexWaiterShift
:
new := old
if old&mutexStarving == 0 {
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
new &^= mutexWoken
}
После вычисления нового состояния мьютекса мы используемatomic
Функция CAS, предоставляемая пакетом, изменяет состояние мьютекса.Если текущий мьютекс уже отключен и заблокирован, он пропустит текущий шаг и вызоветruntime_SemacquireMutex
метод:
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
}
runtime_SemacquireMutex
Основная функция метода заключается в передачеMutex
Использование семафора в мьютексе гарантирует, что ресурс не будет получен двумя горутинами, из чего мы можем видетьMutex
На самом деле, это инкапсуляция семафора более низкого уровня и предоставление внешнего мира более простого в использовании API.runtime_SemacquireMutex
будет вызываться непрерывно в методеgoparkunlock
поставить ток
Горутина засыпает, ожидая получения семафора.
Как только текущая горутина может получить семафор, это доказывает, что мьютекс был разблокирован, и метод немедленно возвращается,Lock
Оставшийся код метода также будет продолжать выполняться.Когда текущий мьютекс находится в режиме голодания, если горутина является последней горутиной в очереди или время ожидания блокировки меньшеstarvationThresholdNs(1ms)
, текущая горутина напрямую захватит мьютекс, выйдет из голодного режима и получит блокировку.
разблокировать
По сравнению с этим процесс разблокировки мьютекса очень прост.Unlock
метод будет использоваться непосредственноatomic
пакет предоставленAddInt32
, если возвращаемое новое состояние не равно0
войдетunlockSlow
метод:
func (m *Mutex) Unlock() {
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}
unlockSlow
Метод сначала проверяет состояние блокировки.Если текущий мьютекс был разблокирован, сразу будет выброшено исключение.sync: unlock of unlocked mutex
Прервите текущую программу.При нормальных обстоятельствах она войдет в разные ветви в зависимости от того, является ли текущее состояние мьютекса нормальным режимом или режимом голодания:
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
runtime_Semrelease(&m.sema, true, 1)
}
}
Если текущее состояние мьютекса находится в режиме голодания, он будет вызываться напрямую.runtime_Semrelease
Метод напрямую передает текущую блокировку следующему официанту, который пытается получить блокировку, и официант будет установлен после пробуждения.mutexLocked
состоянии, так как он все еще находится вmutexStarving
, поэтому новая горутина также не может получить блокировку.
В обычном режиме, если для текущего мьютекса нет ожидающего или состояние, указанное младшими тремя битами, равно0
, то текущему методу не нужно будить другие горутины, и он может вернуться напрямую.Когда горутина находится в состоянии ожидания, она все равно пройдетruntime_Semrelease
Разбудите соответствующую горутину и передайте право собственности на замок.
резюме
через мьютексMutex
Из анализа процесса блокировки и разблокировки мы можем сделать следующие выводы, которые могут помочь нам лучше понять принцип работы блокировок мьютексов.Процесс блокирования блокировок мьютексов более сложен, включая спин, семафор и такие концепции, как планирование Горутины. :
- Если мьютекс находится в инициализированном состоянии, он будет установлен непосредственно
mutexLocked
замок; - Если мьютекс находится в
mutexLocked
И работать в обычном режиме, он войдет в спин, выполнить 30 разPAUSE
Инструкция потребляет процессорное время в ожидании снятия блокировки; - Если текущая горутина ожидает блокировки дольше, чем
1ms
, мьютекс будет переведен в режим голодания; - Мьютексы обычно проходят через
runtime_SemacquireMutex
метод вызоветLock
Горутина переходит в состояние сна, ожидая, пока горутина, удерживающая семафор, разбудит текущую сопрограмму; - Если текущая горутина является последней ожидающей горутиной в мьютексе или время ожидания меньше
1ms
, текущая горутина переключит мьютекс обратно в нормальный режим;
Процесс разблокировки мьютекса относительно прост, хотя есть некоторые различия в обработке обычного режима и режима голодания, логика понятна и понятна из-за небольшого количества строк кода:
- Если мьютекс был разблокирован, вызовите
Unlock
Исключение будет сгенерировано напрямую; - Если мьютекс находится в режиме голодания, владение блокировкой будет напрямую передано следующему официанту в очереди, и официант будет нести ответственность за установку
mutexLocked
бит флага; - Если мьютекс находится в обычном режиме, и нет горутины, ожидающей снятия блокировки, или горутина, которая была разбужена, получила блокировку, она вернется напрямую, в других случаях она вернется через
runtime_Semrelease
Разбудить соответствующую горутину;
RWMutex
Мьютекс чтения-записи также является языком Gosync
Один из интерфейсов предоставляемых пакетом для нас, общий сервис имеет очень высокое отношение чтения и записи к ресурсам.Если большинство запросов это запросы на чтение, они не будут влиять друг на друга, то почему мы не можем читать и писать в ресурсы?Как насчет разделения записи? То естьRWMutex
Проблема, решаемая мьютексом чтения-записи, не ограничивает одновременное чтение ресурсов, но операции чтения-записи, записи-записи не могут выполняться параллельно.
читать | Напишите | |
---|---|---|
читать | Y | N |
Напишите | N | N |
Реализация мьютекса чтения-записи на языке Go:RWMutex
, который не только содержит мьютекс, но и содержит два семафора для записи, ожидающей чтения, и чтения, ожидающей записи:
type RWMutex struct {
w Mutex
writerSem uint32
readerSem uint32
readerCount int32
readerWait int32
}
readerCount
Сохраняет количество выполняемых в данный момент операций чтения, последниеreaderWait
Указывает количество ожидающих операций чтения, когда операция записи заблокирована.
блокировка чтения
Блокировка блокировки чтения очень проста, мы передаемatomic.AddInt32
методreaderCount
Добавьте один, если метод возвращает отрицательное число, это означает, что текущая горутина получила блокировку записи, и текущая горутина вызоветruntime_SemacquireMutex
Застрял в спячке, ожидая пробуждения:
func (rw *RWMutex) RLock() {
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
}
Если нет операции записи для получения текущего мьютекса, текущий метод будетreaderCount
Возврат после увеличения; вызывается, когда горутина хочет снять блокировку чтенияRUnlock
метод:
func (rw *RWMutex) RUnlock() {
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
rw.rUnlockSlow(r)
}
}
Этот метод уменьшит количество читаемых ресурсовreaderCount
, если текущий метод встречает ситуацию, когда возвращаемое значение меньше нуля, значит, идет текущая операция записи, и в это время она должна пройтиrUnlockSlow
метод для уменьшения количества операций чтения, ожидающих текущей операции записиreaderWait
и семафор, запускающий операцию записи после завершения всех операций чтения.writerSem
:
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
throw("sync: RUnlock of unlocked RWMutex")
}
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
writerSem
После запуска процесс, пытающийся получить блокировку чтения-записи, проснется и получит блокировку.
Блокировка чтения-записи
Когда пользователь ресурса хочет получить блокировку на чтение-запись, он должен передатьLock
метод, вLock
Метод сначала вызывает мьютекс чтения-записи, удерживаемыйMutex
изLock
Этот метод гарантирует, что другие горутины, получившие блокировки чтения-записи, перейдут в состояние ожидания, а последующиеatomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders)
По сути, это блокировка последующих операций чтения:
func (rw *RWMutex) Lock() {
rw.w.Lock()
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
}
Если есть еще другие Горутины, в настоящее время удерживающие блокировку чтения мьютекса, Горутина вызоветruntime_SemacquireMutex
Переходит в спящее состояние и срабатывает при ожидании снятия блокировки чтения.writerSem
Семафор пробуждает текущую сопрограмму.
После завершения операций чтения и записи в ресурсеatomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
Вернитесь к положительному числу и запустите все горутины, ожидающие блокировки чтения, через цикл for:
func (rw *RWMutex) Unlock() {
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
throw("sync: Unlock of unlocked RWMutex")
}
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
rw.w.Unlock()
}
В конце метода,RWMutex
Удерживаемый мьютекс будет освобожден, чтобы другие сопрограммы могли повторно получить блокировку чтения-записи.
резюме
По сравнению с комплексным мьютексом состоянияMutex
Например, мьютекс чтения-записиRWMutex
Хотя предоставляемые функции очень сложны, из-заMutex
На «плече» общая реализация будет намного проще.
-
readerSem
— Когда блокировка чтения-записи снята, уведомить горутину, ожидающую получения блокировки чтения; -
writerSem
— Когда блокировка чтения снята, уведомить горутину, ожидающую получения блокировки чтения-записи; -
w
Блокировки Mutex — гарантируют взаимное исключение между операциями записи; -
readerCount
— Подсчитайте количество сопрограмм, выполняющих в данный момент операции чтения, и уменьшите его при срабатывании блокировки записиrwmutexMaxReaders
Блокировать последующие операции чтения; -
readerWait
— Текущая блокировка чтения-записи ожидает количество горутин для операций чтения, после срабатыванияLock
каждый раз послеRUnlock
Он будет уменьшен на единицу, и когда он вернется к нулю, горутина получит блокировку чтения-записи; - Когда снята блокировка чтения-записи
Unlock
Сначала будут уведомлены все операции чтения, а затем удерживаемый мьютекс будет освобожден, что может гарантировать, что операции чтения не будут «умирать от голода» из-за непрерывных операций записи;
RWMutex
существуетMutex
Вышеупомянутое обеспечивает дополнительную функцию разделения чтения-записи, которая может обеспечить повышение производительности, когда запросов на чтение намного больше, чем запросов на запись Мы также можем выбрать блокировки взаимного исключения чтения-записи, когда это подходит.
WaitGroup
WaitGroup
это язык гоsync
Общий механизм синхронизации в пакете, его можно использовать для ожидания возврата серии горутин.Общий сценарий использования — выполнение RPC в пакетном режиме или вызов внешних служб:
requests := []*Request{...}
wg := &sync.WaitGroup{}
wg.Add(len(requests))
for _, request := range requests {
go func(r *Request) {
defer wg.Done()
// res, err := service.call(r)
}(request)
}
wg.Wait()
пройти черезWaitGroup
Мы можем очень легко синхронизировать информацию между несколькими горутинами, и код, который изначально выполнялся последовательно, также может выполняться одновременно в нескольких горутинах, что ускоряет обработку программы.В приведенном выше коде только после того, как все горутины завершили выполнение.Wait
Метод возвращается, и программа может продолжать выполнять другую логику.
В общем, он делает именно то, что следует из его названия,Done
Чаще всего он используется для ожидания завершения всех одновременно выполняемых задач в группе горутин.
структура
WaitGroup
Переменные-члены в структуре очень простые, в которыхnoCopy
Основная функция заключается в обеспеченииWaitGroup
Он не будет скопирован разработчиками путем переназначения, что приведет к странному поведению:
type WaitGroup struct {
noCopy noCopy
state1 [3]uint32
}
copylockПакет — это анализатор для проверки подобных ошибок, и принцип его действия таков:во время компиляцииПроверьте, содержит ли скопированная переменнаяnoCopy
илиsync
ключевое слово, если оно содержит текущее ключевое слово, будет сообщено о следующей ошибке:
package main
import (
"fmt"
"sync"
)
func main() {
wg := sync.Mutex{}
yawg := wg
fmt.Println(wg, yawg)
}
$ go run proc.go
./prog.go:10:10: assignment copies lock value to yawg: sync.Mutex
./prog.go:11:14: call of fmt.Println copies lock value: sync.Mutex
./prog.go:11:18: call of fmt.Println copies lock value: sync.Mutex
Этот код будет назначать и вызыватьfmt.Println
Когда происходит копирование значения, анализатор сообщает об ошибке, вы можете получить к ней доступ, обратившисьСвязьПопробуйте запустить этот код.
КромеnoCopy
Помимо,WaitGroup
Структура также содержит массив, занимающий в общей сложности 12 байт, Этот массив будет хранить состояние и семафор, удерживаемые текущей структурой, которая ведет себя совершенно по-разному на 64-битных и 32-битных машинах.
WaitGroup
частный методstate
может помочь нам отstate1
поле для получения его состояния и семафора.
действовать
WaitGroup
Есть только три открытых интерфейсаAdd
,Wait
а такжеDone
,вDone
просто вызывается методwg.Add(-1)
Самой по себе особой логики нет, давайте разберем оставшиеся два метода:
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32)
w := uint32(state)
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if v > 0 || w == 0 {
return
}
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}
Add
Основная функция метода заключается в обновленииWaitGroup
счетчики, проведенные вcounter
, старшие 32 бита 64-битного состояния, хотяAdd
Параметр, передаваемый методу, может быть отрицательным, ноWaitGroup
Счетчик может быть только неотрицательным, при вызовеAdd
вызывает обнуление счетчика и все еще есть ожидающие горутины, он передается черезruntime_Semrelease
Разбудите все горутины, которые находятся в состоянии ожидания.
еще одинWaitGroup
МетодыWait
Данные, хранящиеся в текущем счетчике, больше, чем0
При изменении количества ожидающих горутинwaiter
и позвониruntime_Semacquire
Заснуть.
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
if v == 0 {
return
}
if atomic.CompareAndSwapUint64(statep, state, state+1) {
runtime_Semacquire(semap)
if +statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}
Спящая горутина будет ждатьAdd
метод в счетчике0
вставай.
резюме
через паруWaitGroup
Проведя анализ и исследования, мы можем сделать следующие выводы:
-
Add
не может быть сWait
Метод вызывается одновременно в горутине, и как только он появится, программа рухнет; -
WaitGroup
Должен бытьWait
Метод можно использовать повторно только после возврата; -
Done
в самый разAdd
Простая инкапсуляция метода, мы можемAdd
Метод передает любое отрицательное число (счетчик должен быть гарантированно неотрицательным), чтобы быстро сбросить счетчик до нуля, чтобы разбудить другие ожидающие горутины; - Может быть несколько Горутин, ожидающих текущего
WaitGroup
Когда счетчик обнуляется, эти горутины также будут разбужены «одновременно»;
Once
Язык Go в стандартной библиотекеsync
Также доступно в пакете синхронизацииOnce
Семантика, его основная функция на самом деле хорошо понятна, гарантируется во время работы программы Go.Once
Соответствующий фрагмент кода будет выполнен только один раз.
В коде, показанном ниже,Do
Функция, переданная в метод, будет выполнена только один раз, то есть мы увидим ее только один раз, когда запустим код, показанный ниже.only once
Выходной результат:
func main() {
o := &sync.Once{}
for i := 0; i < 10; i++ {
o.Do(func() {
fmt.Println("only once")
})
}
}
$ go run main.go
only once
так какsync
структуры в пакетах,Once
имеет очень простую структуру данных, каждыйOnce
Структура содержит только один код, используемый для определения того, был ли выполнен кодовый блок.done
и мьютексMutex
:
type Once struct {
done uint32
m Mutex
}
Once
Единственный способ открыть структуру внешнему миру — этоDo
, метод примет функцию с пустыми входными параметрами, если вы используетеatomic.LoadUint32
Проверив, что функция была выполнена, она вернется напрямую, иначе войдетdoSlow
Запустите переданную функцию:
func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 0 {
o.doSlow(f)
}
}
func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}
doSlow
Реализация также очень проста, мы сначала получаем мьютекс для текущей горутины, а затем передаемdefer
ключевые слова будутdone
переменная-член имеет значение1
и запустите переданную функцию, независимо от того, работает ли текущая функция нормально или выдаетpanic
, текущий метод будетdone
установлен в1
Гарантируется, что функция не будет выполнена во второй раз.
резюме
как гарантия того, сколько раз функция будет выполненаOnce
структура, которая использует мьютекс иatomic
Предоставленный метод реализует семантику, что функция может быть выполнена только один раз во время работы программы, В процессе его использования нам также необходимо обратить внимание на следующее:
-
Do
Функция, переданная в метод, будет выполнена только один раз, даже если в функции что-то произойдет.panic
; - позвони дважды
Do
Когда метод передается другой функции, будет выполнена только функция, вызванная в первый раз;
Cond
Язык Go предоставляется в стандартной библиотеке.Cond
По сути, это условная переменная, которая передается черезCond
Мы можем заставить ряд горутин просыпаться только тогда, когда срабатывает определенное событие или условие, каждыйCond
Структура содержит мьютексL
, давайте сначала посмотримCond
Как это используется:
func main() {
c := sync.NewCond(&sync.Mutex{})
for i := 0; i < 10; i++ {
go listen(c)
}
go broadcast(c)
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
<-ch
}
func broadcast(c *sync.Cond) {
c.L.Lock()
c.Broadcast()
c.L.Unlock()
}
func listen(c *sync.Cond) {
c.L.Lock()
c.Wait()
fmt.Println("listen")
c.L.Unlock()
}
$ go run main.go
listen
listen
...
listen
В приведенном выше коде мы запускаем 11 горутин одновременно, 10 из них будут проходить черезWait
Дождитесь желаемого сигнала или события, а оставшаяся горутина вызоветBroadcast
метод уведомляет все горутины, застрявшие в ожидании, при вызовеBoardcast
метод, он будет печатать 10 раз"listen"
и завершите вызов.
структура
Cond
Структура содержитnoCopy
а такжеcopyChecker
Два поля, первое используется для гарантииCond
Больше никакого копирования во время компиляции, что гарантирует, что копирование во время выполнения будет напрямуюpanic
, другой замок, удерживаемыйL
на самом деле это интерфейсLocker
, любая реализацияLock
а такжеUnlock
Структура метода может быть использована какNewCond
Параметры метода:
type Cond struct {
noCopy noCopy
L Locker
notify notifyList
checker copyChecker
}
последняя переменная в структуреnotifyList
На самом деле для достиженияCond
Механизм синхронизации, структура на самом делеGoroutine
Связанный список:
type notifyList struct {
wait uint32
notify uint32
lock mutex
head *sudog
tail *sudog
}
В этой структуреhead
а такжеtail
Они указывают на начало и конец всего связанного списка соответственно, иwait
а такжеnotify
Представляют горутину, ожидающую в данный момент, и горутину, которая была уведомлена, соответственно. Мы можем использовать эти две переменные, чтобы подтвердить, что текущая горутина должна быть уведомлена, и горутина, которая была уведомлена.
действовать
Cond
незащищенныйWait
метод усыпит текущую горутину, сначала она вызоветruntime_notifyListAdd
буду ждать счетчик+1
, затем разблокируйте и позвонитеruntime_notifyListWait
Подождите, пока проснутся другие горутины:
func (c *Cond) Wait() {
c.checker.check()
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
func notifyListAdd(l *notifyList) uint32 {
return atomic.Xadd(&l.wait, 1) - 1
}
notifyListWait
Основная цель метода — получить текущую Goroutine и добавить ее вnotifyList
Конец связанного списка:
func notifyListWait(l *notifyList, t uint32) {
lock(&l.lock)
if less(t, l.notify) {
unlock(&l.lock)
return
}
s := acquireSudog()
s.g = getg()
s.ticket = t
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
releaseSudog(s)
}
Помимо добавления текущей горутины в конец связанного списка, мы также вызываемgoparkunlock
Впадая в спящее состояние, эта функция также является методом, часто используемым при переключении горутин в языке Go: она напрямую откажется от права использования текущего процессора и будет ждать, пока планировщик проснется.
Cond
предоставленный извнеSignal
а такжеBroadcast
метод используется для пробуждения вызоваWait
Для горутины, которая засыпает, судя по названиям двух методов, первый разбудит горутину в начале очереди, а второй разбудит все горутины в очереди:
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
notifyListNotifyAll
Метод возьмет все горутины из связанного списка и вызовет их по очереди.readyWithTime
, метод пройдетgoready
Разбудите горутину цели:
func notifyListNotifyAll(l *notifyList) {
s := l.head
l.head = nil
l.tail = nil
atomic.Store(&l.notify, atomic.Load(&l.wait))
for s != nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}
}
Хотя он будет пробуждать все горутины по очереди, порядок пробуждения здесь на самом деле в порядке присоединения к очереди, и первый будет добавлен первым.goready
После пробуждения добавленной горутине может потребоваться дождаться планирования планировщика.
а такжеnotifyListNotifyOne
функция будет запускаться только сsudog
Сформированный связанный список удовлетворяетsudog.ticket == l.notify
горутина и проходreadyWithTime
будить:
func notifyListNotifyOne(l *notifyList) {
t := l.notify
atomic.Store(&l.notify, t+1)
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
if s.ticket == t {
n := s.next
if p != nil {
p.next = n
} else {
l.head = n
}
if n == nil {
l.tail = p
}
s.next = nil
readyWithTime(s, 4)
return
}
}
}
Как правило, мы выбираем вызов, когда определенные условия не выполняются.Wait
Впадая в сон, когда некоторые горутины обнаруживают, что условия для пробуждения в настоящее время соблюдены, они могут использоватьSignal
уведомить одного илиBroadcast
Уведомляет все горутины о том, что текущие условия выполнены и можно продолжить выполнение работы.
резюме
а такжеMutex
в сравнении с,Cond
Это также механизм синхронизации, который не всем хорошо понятен и понятен, он обеспечивает механизм ожидания типа очереди FIFO, а также обеспечиваетSignal
а такжеBroadcast
Два разных метода пробуждения по сравнению с использованиемfor {}
занят ожиданием, используйтеCond
Функция, которая может отказаться от текущего процессора, когда долгосрочные условия не могут быть выполнены.Если мы используем ее разумно, мы все же можем улучшить производительность в некоторых случаях.В процессе использования нам нужно обратить внимание на:
-
Wait
Метод должен быть использован до вызоваL.Lock
держите ресурс, а то так и будетpanic
вызвать сбой программы; -
Signal
Горутины, пробуждаемые этим методом, находятся в начале очереди и ждут дольше всех; -
Broadcast
Хотя это горутина, которая транслирует все ожидающие уведомления, она также находится в определенном порядке, когда фактически просыпается;
примитив расширения
В дополнение к примитивам синхронизации, представленным в этих стандартных библиотеках, язык Go также доступен в подрепозиториях.x/sync
Дополнительные четыре примитива синхронизации предоставлены в ,ErrGroup
,Semaphore
,SingleFlight
а такжеSyncMap
,один из нихSyncMap
На самом деле этоsync
в упаковкеsync.Map
, это в версии 1.9
Представлено в Gox/sync
пакеты, которые в конечном итоге были перемещены в стандартную библиотеку по мере того, как API совершенствовался и стабилизировался.sync
в сумке.
В этом разделе мы представим три примитива, доступных в пакете расширения в этом разделе.ErrGroup
,Semaphore
а такжеSingleFlight
.
ErrGroup
вспомогательный складx/sync
упаковать вerrgroupПо сути, он предоставляет нам функции синхронизации, распространения ошибок и отмены контекста в наборе горутин.Мы можем получать данные веб-страниц параллельно следующим образом:
var g errgroup.Group
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
}
for i := range urls {
url := urls[i]
g.Go(func() error {
resp, err := http.Get(url)
if err == nil {
resp.Body.Close()
}
return err
})
}
if err := g.Wait(); err == nil {
fmt.Println("Successfully fetched all URLs.")
}
Go
метод может создать горутину и выполнить в ней переданную функцию, аWait
метод будет ждатьGo
После возврата всех горутин, созданных методом, возвращается первая ненулевая ошибка.Если все горутины не возвращают ошибку, функция вернетnil
.
структура
errgroup
в упаковкеGroup
Структура также состоит из еще трех важных частей:
- Создайте
Context
вернулся, когдаcancel
функция, в основном используемая для использования уведомленийcontext
Горутина может останавливать работу и отказываться от ресурсов из-за некоторых ошибок подзадачи; - Используется для ожидания, пока группа горутин выполнит подзадачи.
WaitGroup
примитивы синхронизации; - Используется для принятия подзадач, которые возвращают ошибки
err
и гарантияerr
будет назначен только один разerrOnce
;
type Group struct {
cancel func()
wg sync.WaitGroup
errOnce sync.Once
err error
}
Вместе эти поля образуютGroup
Структурируйте и предоставьте нам такие функции, как синхронизация, распространение ошибок и отмена контекста.
действовать
errgroup
Единственный открытый конструкторWithContext
метод, мы можем начать только сContext
создать новыйGroup
Переменная,WithCancel
Возвращенная функция отмены также будет использоваться только вGroup
Внутреннее использование структуры:
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := context.WithCancel(ctx)
return &Group{cancel: cancel}, ctx
}
Для создания новой параллельной подзадачи необходимо использоватьGo
метод, этот метод будет внутреннеWaitGroup
Добавьте один и создайте новую горутину, запускайте подзадачи внутри горутины и вовремя вызывайте, когда возвращается ошибкаcancel
и кerr
Присвоение, только самая ранняя возвращенная ошибка будет воспринята восходящим потоком, а последующие ошибки будут отброшены:
func (g *Group) Go(f func() error) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel()
}
})
}
}()
}
func (g *Group) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel()
}
return g.err
}
Wait
Метод на самом деле просто вызываетWaitGroup
Синхронный метод отменяется, когда все подзадачи выполнены.Context
и вернуть возможные ошибки.
резюме
errgroup
в упаковкеGroup
Принцип реализации примитивов синхронизации по-прежнему очень прост. Он не использует очень низкоуровневые API-интерфейсы в пакетах времени выполнения, а просто инкапсулирует базовую семантику синхронизации для предоставления более сложных функций. При их использовании также необходимо обратить внимание на следующее. Проблемы:
- Вызывается при возникновении ошибки или завершении ожидания
Context
изcancel
метод отмены контекста; - Будет возвращена только первая возникшая ошибка, а остальные ошибки будут отброшены напрямую;
Semaphore
Семафор — это распространенный механизм синхронизации в параллельном программировании, который гарантирует, что удерживаемый счетчик0
Между инициализированными весами счетчик в семафоре будет вычитаться на соответствующее значение каждый раз, когда ресурс будет получен, и добавлен обратно, когда он будет освобожден.Когда счетчик больше размера семафора, он перейдет в спящий режим и будет ждать другие процессы для выпуска сигнала. , мы часто используем его при контроле количества процессов, обращающихся к ресурсам.
Пакет расширения Golang предоставляет взвешенные семафоры. Мы можем управлять доступом к ресурсам в соответствии с разным весом. Этот пакет предоставляет только четыре внешних метода:
-
NewWeighted
Используется для создания новых семафоров; -
Acquire
Ресурс с указанным весом получен, если в данный момент нет «незанятого ресурса», то он впадает в спящее ожидание; -
TryAcquire
Он также используется для получения ресурсов указанного веса, но если в данный момент нет «незанятого ресурса», он вернется напрямуюfalse
; -
Release
Используется для освобождения ресурсов указанного веса;
структура
NewWeighted
Основная функция метода создает новый семафор веса, а вес с наибольшим входящим семафором вернет новыйWeighted
Указатель структуры:
func NewWeighted(n int64) *Weighted {
w := &Weighted{size: n}
return w
}
type Weighted struct {
size int64
cur int64
mu sync.Mutex
waiters list.List
}
Weighted
Структура содержитwaiters
В списке хранятся «пользователи», ожидающие получения ресурсов, а также верхняя граница текущего семафора и счетчикcur
, диапазон этого счетчика[0, size]
:
Счетчик в семафоре будет меняться при доступе пользователя и освобождении ресурсов.Введенная концепция весов может помочь нам лучше контролировать степень детализации доступа к ресурсам и максимально удовлетворить все распространенные варианты использования.
Получать
Мы упоминали вышеAcquire
Метод – это метод, используемый для получения заданного весового ресурса.Этот метод состоит из трех различных ситуаций:
- Когда оставшиеся ресурсы в семафоре больше, чем полученные ресурсы, и нет ожидающих горутин, семафор будет получен напрямую;
- Когда семафор, который необходимо получить, больше, чем
Weighted
Когда размер , он вернется напрямую, потому что невозможно выполнить условие; - В других случаях текущая горутина будет добавлена в список ожидания и передана
select
Подождите, пока текущая горутина будет разбужена, и после пробуждения семафор будет получен;
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
s.mu.Lock()
if s.size-s.cur >= n && s.waiters.Len() == 0 {
s.cur += n
s.mu.Unlock()
return nil
}
if n > s.size {
s.mu.Unlock()
<-ctx.Done()
return ctx.Err()
}
ready := make(chan struct{})
w := waiter{n: n, ready: ready}
elem := s.waiters.PushBack(w)
s.mu.Unlock()
select {
case <-ctx.Done():
err := ctx.Err()
s.mu.Lock()
select {
case <-ready:
err = nil
default:
s.waiters.Remove(elem)
}
s.mu.Unlock()
return err
case <-ready:
return nil
}
}
Другой способ получения семафораTryAcquire
Напротив, это очень просто: он только оценивает, достаточно ли у текущего семафора ресурсов для получения, и если ресурсов достаточно, он немедленно возвращается.true
иначе вернетсяfalse
:
func (s *Weighted) TryAcquire(n int64) bool {
s.mu.Lock()
success := s.size-s.cur >= n && s.waiters.Len() == 0
if success {
s.cur += n
}
s.mu.Unlock()
return success
}
а такжеAcquire
в сравнении с,TryAcquire
Поскольку он не ждет освобождения ресурсов, он может быть более подходящим для некоторых сценариев, чувствительных к задержке, когда пользователю необходимо немедленно получить результат.
освобожден
последний, чтобы представитьRelease
Метод на самом деле очень прост: когда мы отпускаем семафор,Release
метод будет проходить от начала до концаwaiters
Для всех ожидающих в списке, если семафор после освобождения ресурса имеет достаточно оставшихся ресурсов, указанная Горутина будет вызываться через Канал:
func (s *Weighted) Release(n int64) {
s.mu.Lock()
s.cur -= n
for {
next := s.waiters.Front()
if next == nil {
break
}
w := next.Value.(waiter)
if s.size-s.cur < w.n {
break
}
s.cur += w.n
s.waiters.Remove(next)
close(w.ready)
}
s.mu.Unlock()
}
Конечно, могут быть случаи, когда оставшиеся ресурсы не могут вызвать Горутин. В это время текущий метод снимает блокировку и возвращается напрямую. Анализируя этот код, мы также можем обнаружить, что если семафор требует много ресурсы, он может быть не в состоянии получить блокировку в течение длительного времени, что также может бытьAcquire
метод принимает другой параметрContext
Причина установки тайм-аута для получения семафора.
резюме
Взвешенный семафор имеет больше сценариев применения.Это также единственная реализация семафора, предоставляемая языком Go.В процессе его использования нам необходимо обратить внимание на следующие моменты:
-
Acquire
а такжеTryAcquire
Для получения ресурсов можно использовать оба метода: первый используется для синхронного получения и будет ожидать освобождения блокировки, а второй будет возвращаться напрямую, когда блокировка не может быть получена; -
Release
Метод разбудит горутины, которые можно разбудить в порядке FIFO; - Если горутина получает больше ресурсов, потому что
Release
Стратегия выпуска может ждать долгое время;
SingleFlight
singleflightЭто еще один примитив синхронизации, предоставляемый в пакете расширения языка Go, который на самом деле является любимым механизмом расширения синхронизации автора. Он может подавлять множественные повторные запросы к нисходящему потоку в службе. Обычный сценарий использования: — Мы используем Redis для кэширования некоторых популярных данные в базе данных и установите время ожидания.В момент, когда время ожидания кеша истекает, может быть много параллельных запросов.Обнаружено, что Redis не содержит кеша, поэтому в базу данных попадет много трафика. задержка и качество обслуживания.
ноsingleflight
может эффективно решить эту проблему, его основная функция заключается вKey
В конце концов, будет сделан только один вызов функции.В этом контексте будет сделан только один запрос к базе данных.Результат запроса будет записан обратно в Redis и синхронизирован со всеми запросами.Key
Пользователь:
Это фактически уменьшает мгновенный трафик к нисходящему потоку, и очень много времени требуется для получения нисходящих ресурсов, таких как: доступ к кешам, базам данных и другим сценариям, которые очень подходят для использования.singleflight
Оптимизация сервиса. В приведенном выше примере мы можем использовать его, когда хотим получить данные как из Redis, так и из базы данных.singleflight
Эта предусмотренная функция снижает давление на выходе; ее использование на самом деле очень просто, мы можем использовать ее напрямую.singleflight.Group{}
создать новыйGroup
структуру, а затем, позвонивDo
метод для подавления того же запроса:
type service struct {
requestGroup singleflight.Group
}
func (s *service) handleRequest(ctx context.Context, request Request) (Response, error) {
v, err, _ := requestGroup.Do(request.Hash(), func() (interface{}, error) {
rows, err := // select * from tables
if err != nil {
return nil, err
}
return rows, nil
})
if err != nil {
return nil, err
}
return Response{
rows: rows,
}, nil
}
Приведенный выше код использует хэш запроса в качестве ключа для подавления того же запроса, мы также можем выбрать некоторые более важные или важные поля в качествеDo
Первый параметр метода позволяет избежать мгновенного большого количества запросов к нисходящему.
структура
Group
Сама структура заблокирована мьютексомMutex
и отKey
прибытьcall
Таблица отображения указателей структур состоит из каждогоcall
Структура сохраняет информацию, соответствующую текущему вызову:
type Group struct {
mu sync.Mutex
m map[string]*call
}
type call struct {
wg sync.WaitGroup
val interface{}
err error
dups int
chans []chan<- Result
}
call
в структуреval
а такжеerr
Поля назначаются только один раз при выполнении переданной функции, и они будут толькоWaitGroup
дождитесь окончания чтения, покаdups
а такжеchans
поля используются для хранения текущихsingleflight
Количество запросов для подавления и передачи информации вызывающей стороне при возврате результата.
действовать
singleflight
Пакет предоставляет два метода подавления одного и того же запроса, один из которых — метод синхронного ожидания.Do
, а другой, который возвращает каналDoChan
, эти два метода не имеют большой разницы в функциях, лишь немного отличаются производительностью интерфейса.
КаждыйDo
При вызове метода мьютекс захватывается и пытаетсяGroup
Удерживаемая таблица сопоставления загружается лениво, а затем оценивается, существует ли она ужеkey
Соответствующий вызов функции:
- когда нет соответствующего
call
Когда структура:- инициализировать новый
call
указатель структуры; - Увеличивать
WaitGroup
проведенные счетчики; - Буду
call
Указатель структуры добавляется в таблицу отображения; - Освободить удерживаемый мьютекс
Mutex
; - блокировка вызова
doCall
Метод ожидает возврата результата;
- инициализировать новый
- когда соответствующий
call
структура;- Увеличивать
dups
Счетчик, представляющий текущее количество повторных вызовов; - Освободить удерживаемый мьютекс
Mutex
; - пройти через
WaitGroup.Wait
дождитесь возврата запроса;
- Увеличивать
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err, true
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
потому чтоval
а такжеerr
Оба поля будут только вdoCall
назначен метод, поэтому, когдаdoCall
Методы иWaitGroup.Wait
Когда метод возвращается, эти два значения возвращаются вDo
Вызывающий функцию.
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
c.val, c.err = fn()
c.wg.Done()
g.mu.Lock()
delete(g.m, key)
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
g.mu.Unlock()
}
doCall
будет запускать переданную функциюfn
, возвращаемое значение функции будет присвоеноc.val
а такжеc.err
, она будет вызвана после завершения выполнения функцииWaitGroup.Done
Метод уведомляет все подавленные запросы о том, что текущая функция завершена и к ней можно получить доступ изcall
Возвращаемое значение выносится из структуры и возвращается, после чегоdoCall
Метод захватит удерживаемый мьютекс и синхронизирует информацию с пользователем через конвейер.DoChan
Вызывающий метод.
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1)
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
go g.doCall(c, key, fn)
return ch
}
DoChan
Методы иDo
Разница в том, что он использует Goroutine для асинхронного выполнения.doCall
и кcall
держалchans
добавить к срезуchan Result
переменная, поэтому она может обеспечивать асинхронную передачу значений.
резюме
singleflight
пакет предоставленGroup
Интерфейс действительно прост в использовании.Когда нам нужно подавить тот же запрос к нисходящему, мы можем использовать этот метод для увеличения пропускной способности и качества обслуживания.Мы также должны обратить внимание на следующие проблемы во время использования:
-
Do
а такжеDoChan
Один для синхронной блокировки входящей функции, один для асинхронного вызова входящего параметра и принятия возвращаемого значения функции через канал; -
Forget
способ уведомленияsingleflight
Удалите ключ в удерживаемой таблице сопоставления, и следующий вызов ключа выполнит метод напрямую, вместо того, чтобы ждать возврата предыдущей функции; - Как только вызванная функция вернет ошибку, все ожидающие горутины также получат ту же ошибку;
Суммировать
В этом разделе мы познакомим вас с базовыми примитивами, входящими в стандартную библиотеку языка Go, и примитивами расширения в пакете расширения.Эти примитивы параллельного программирования помогут нам лучше использовать возможности языка Go для создания высокопроизводительных и низкопроизводительных примитивов. Стоимость отложенного обслуживания и устранение ошибок из-за параллелизма, здесь мы возвращаемся к содержанию, представленному в этом разделе:
-
Mutex
Мьютекс- Если мьютекс находится в инициализированном состоянии, он будет установлен непосредственно
mutexLocked
замок; - Если мьютекс находится в
mutexLocked
И работать в обычном режиме, он войдет в спин, выполнить 30 разPAUSE
Инструкция потребляет процессорное время в ожидании снятия блокировки; - Если текущая горутина ожидает блокировки дольше, чем
1ms
, мьютекс будет переведен в режим голодания; - Мьютексы обычно проходят через
runtime_SemacquireMutex
метод вызоветLock
Горутина переходит в состояние сна, ожидая, пока горутина, удерживающая семафор, разбудит текущую сопрограмму; - Если текущая горутина является последней ожидающей горутиной в мьютексе или время ожидания меньше
1ms
, текущая горутина переключит мьютекс обратно в нормальный режим; - Если мьютекс был разблокирован, вызовите
Unlock
Исключение будет сгенерировано напрямую; - Если мьютекс находится в режиме голодания, владение блокировкой будет напрямую передано следующему официанту в очереди, и официант будет нести ответственность за установку
mutexLocked
бит флага; - Если мьютекс находится в обычном режиме, и нет горутины, ожидающей снятия блокировки, или горутина, которая была разбужена, получила блокировку, она вернется напрямую, в других случаях она вернется через
runtime_Semrelease
Разбудить соответствующую горутину;
- Если мьютекс находится в инициализированном состоянии, он будет установлен непосредственно
-
RWMutex
мьютекс чтения-записи-
readerSem
— Когда блокировка чтения-записи снята, уведомить горутину, ожидающую получения блокировки чтения; -
writerSem
— Когда блокировка чтения снята, уведомить горутину, ожидающую получения блокировки чтения-записи; -
w
Блокировки Mutex — гарантируют взаимное исключение между операциями записи; -
readerCount
— Подсчитайте количество сопрограмм, выполняющих в данный момент операции чтения, и уменьшите его при срабатывании блокировки записиrwmutexMaxReaders
Блокировать последующие операции чтения; -
readerWait
— Текущая блокировка чтения-записи ожидает количество горутин для операций чтения, после срабатыванияLock
каждый раз послеRUnlock
Он будет уменьшен на единицу, и когда он вернется к нулю, горутина получит блокировку чтения-записи; - Когда снята блокировка чтения-записи
Unlock
Сначала будут уведомлены все операции чтения, а затем удерживаемый мьютекс будет освобожден, что может гарантировать, что операции чтения не будут «умирать от голода» из-за непрерывных операций записи;
-
-
WaitGroup
Подождите, пока группа горутин закончит-
Add
не может быть сWait
Метод вызывается одновременно в горутине, и как только он появится, программа рухнет; -
WaitGroup
Должен бытьWait
Метод можно использовать повторно только после возврата; -
Done
в самый разAdd
Простая инкапсуляция метода, мы можемAdd
Метод передает любое отрицательное число (счетчик должен быть гарантированно неотрицательным), чтобы быстро сбросить счетчик до нуля, чтобы разбудить другие ожидающие горутины; - Может быть несколько Горутин, ожидающих текущего
WaitGroup
Когда счетчик обнуляется, эти горутины также будут разбужены «одновременно»;
-
-
Once
Выполняется только один раз во время работы программы-
Do
Функция, переданная в метод, будет выполнена только один раз, даже если в функции что-то произойдет.panic
; - позвони дважды
Do
Когда метод передается другой функции, будет выполнена только функция, вызванная в первый раз;
-
-
Cond
Просыпаться, когда происходит указанное событие-
Wait
Метод должен быть использован до вызоваL.Lock
держите ресурс, а то так и будетpanic
вызвать сбой программы; -
Signal
Горутины, пробуждаемые этим методом, находятся в начале очереди и ждут дольше всех; -
Broadcast
Хотя это горутина, которая транслирует все ожидающие уведомления, она также находится в определенном порядке, когда фактически просыпается;
-
-
ErrGroup
Обеспечивает синхронизацию, распространение ошибок и отмену контекста для группы горутин.- Вызывается при возникновении ошибки или завершении ожидания
Context
изcancel
метод отмены контекста; - Будет возвращена только первая возникшая ошибка, а остальные ошибки будут отброшены напрямую;
- Вызывается при возникновении ошибки или завершении ожидания
-
Semaphore
взвешенный семафор-
Acquire
а такжеTryAcquire
Для получения ресурсов можно использовать оба метода: первый используется для синхронного получения и будет ожидать освобождения блокировки, а второй будет возвращаться напрямую, когда блокировка не может быть получена; -
Release
Метод разбудит горутины, которые можно разбудить в порядке FIFO; - Если горутина получает больше ресурсов, потому что
Release
Стратегия выпуска может ждать долгое время;
-
-
SingleFlight
Используется для подавления дублирующих запросов к нижестоящим-
Do
а такжеDoChan
Один для синхронной блокировки входящей функции, один для асинхронного вызова входящего параметра и принятия возвращаемого значения функции через канал; -
Forget
способ уведомленияsingleflight
Удалите ключ в удерживаемой таблице сопоставления, и следующий вызов ключа выполнит метод напрямую, вместо того, чтобы ждать возврата предыдущей функции; - Как только вызванная функция вернет ошибку, все ожидающие горутины также получат ту же ошибку;
-
Реализация этих примитивов синхронизации должна не только учитывать простоту использования интерфейса API и решать проблему конкуренции потоков, которая может возникнуть в параллельном программировании, но также должна оптимизировать задержку хвоста, чтобы избежать некоторых горутин, которые не могут получить блокировки или голодают. к смерти за ресурсы.Изучение примитивов синхронизации может также улучшить наше понимание и понимание параллельного программирования, и это также шаг к пониманию того, что параллельное программирование не может пересекаться.
Reference
- sync: make Mutex more fair
- runtime: fall back to fair locks after repeated sleep-acquire failures #13086
- x/sync · Golang
- The Go Memory Model
- The X-Files: Exploring the Golang Standard Library Sub-Repositories
- Go: Avoid duplicate requests with sync/singleflight
Gitalking ...