Перейти к параллельному программированию, примитивам синхронизации и блокировкам

Go

5.1 Примитивы синхронизации и блокировки

Когда дело доходит до параллельного программирования и многопоточного программирования, мы часто неотделимы от концепции «блокировки». Как язык, который изначально поддерживает процесс пользовательского режима Goroutine, язык Go определенно предоставит разработчикам эту функцию. Основная функция — чтобы гарантировать, что несколько потоков или Goroutines не будут испытывать путаницу при доступе к одному и тому же фрагменту памяти Блокировки на самом деле являются примитивами синхронизации в параллельном программировании.

В этом разделе мы представим общие примитивы синхронизации в языке Go.Mutex,RWMutex,WaitGroup,Onceа такжеCondи примитивы расширенияErrGroup,Semaphoreа такжеSingleFlightОн также включает в себя общие концепции параллельного программирования, такие как мьютексы и семафоры.

основные примитивы

Перейти на языкsyncВ пакете предусмотрены некоторые базовые примитивы для синхронизации, в том числе общие блокировки мьютексов.Mutexс мьютексом чтения-записиRWMutexтак же какOnce,WaitGroup.

golang-basic-sync-primitives

Основная функция этих основных примитивов состоит в обеспечении относительно базовых функций синхронизации. Мы должны использовать канал и связь для реализации более продвинутых механизмов синхронизации. В этом разделе мы не будем представлять все примитивы стандартной библиотеки, а представим более распространенные.Mutex,RWMutex,Once,WaitGroupа такжеCond, мы не будем рассматривать оставшиеся две структуры для доступа к даннымMapа такжеPool.

Mutex

Мьютексы в языке Go находятся вsyncпакет, который состоит из двух полейstateа такжеsemaсочинение,stateпредставляет текущее состояние мьютекса, иsemaСемафор, который действительно используется для управления состоянием блокировки, эти две структуры, которые в сумме занимают всего 8 байтов, представляют собой мьютекс в языке Go.

type Mutex struct {
    state int32
    sema  uint32
}

условие

Состояние мьютекса используетсяint32для представления, но состояние блокировки не является взаимоисключающим, его младшие три бита представляютmutexLocked,mutexWokenа такжеmutexStarving, оставшиеся позиции используются для указания того, сколько горутин в настоящее время ожидает освобождения мьютекса:

golang-mutex-state

При создании мьютекса значением по умолчанию для всех битов состояния является0, когда мьютекс заблокированmutexLockedбудет установлен на1, когда мьютекс просыпается в обычном режимеmutexWokenбудет установлен на1,mutexStarvingИспользуется для указания того, что текущий мьютекс вошел в состояние, а последние несколько битов — это количество горутин, ожидающих текущего мьютекса.

режим голодания

Прежде чем понять конкретный процесс блокировки и разблокировки, нам нужно кратко понятьMutexРежим голодания, в который можно войти во время использования, режим голодания на языке Go.1.9Функция, представленная версией, ее основная функция заключается в обеспечении «справедливости» приобретения блокировки взаимного исключения.

Мьютексы могут находиться в двух разных режимах одновременно, а именно в нормальном режиме и в голодающем режиме. Новый процесс Goroutine также называетсяLockЧтобы уменьшить возникновение этой ситуации и предотвратить «голодную смерть» горутины, как только горутина не получает блокировку более 1 мс, она переключает текущий мьютекс в режим голодания.

golang-mutex-mode

В голодающем режиме блокировка мьютекса будет напрямую передана горутине, находящейся в начале очереди ожидания. Новая горутина не может получить блокировку и не войдет в состояние вращения в это время. Они будут ждать только в конце очереди. Если горутина получает мьютекс, и это последняя сопрограмма в очереди или она ожидает менее 1 мс, то текущий мьютекс переключается обратно в нормальный режим.

По сравнению с голодающим режимом, блокировки мьютекса в обычном режиме могут обеспечить лучшую производительность.Основная функция голодного режима состоит в том, чтобы избежать высокой задержки хвоста, вызванной некоторыми горутинами, которые находятся в ожидании и не могут получить блокировки.Mutexоптимизация.

замок

МьютексMutexБлокировка поLockМетод завершен, и последний исходный код языка GoLockМетод упрощен, и ствол метода сохраняет только самые распространенные, простые и быстрые случаи, когда состояние блокировки0непосредственно, когдаmutexLockedположение в1:

func (m *Mutex) Lock() {
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return
    }
    m.lockSlow()
}

но когдаLockкогда вызывается методMutexстатус не0войдетlockSlowМетод пытается дождаться освобождения блокировки и захватить мьютекс с помощью вращения или других методов.Тело метода представляет собой очень большойforLoop, мы разделим метод на несколько частей, чтобы представить процесс получения блокировки:

func (m *Mutex) lockSlow() {
    var waitStartTime int64
    starving := false
    awoke := false
    iter := 0
    old := m.state
    for {
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            runtime_doSpin()
            iter++
            old = m.state
            continue
        }

В первой части этого метода будет оцениваться, может ли текущий метод войти в спин, чтобы дождаться освобождения блокировки. Spinnig - это фактически механизм, используемый в процессе многопоточной синхронизации. Текущий процесс входит в spin Во время процесса ЦП будет занят, и он будет продолжать проверять, выполняется ли определенное условие.На многоядерном ЦП преимущество спина состоит в том, чтобы избежать переключения Горутины, поэтому, если он используется правильно, он принесет очень большой прирост производительности.

на языке гоMutexВ мьютексе возможен только вход в спин в обычном режиме, помимо ограничений режима,runtime_canSpinМетод определит, может ли текущий метод войти в спин, а условия для входа в спин очень жесткие:

  1. работа на многопроцессорной машине;
  2. Текущая горутина делает менее четырех оборотов, чтобы получить блокировку;
  3. На текущей машине есть хотя бы один работающий процессорPи обработанная очередь запуска пуста;

Вызывается, когда текущая горутина может вращатьсяruntime_doSpin, который в итоге вызывает метод, написанный на ассемблереprocyieldи выполнить указанное количество разPAUSEинструкция,PAUSEИнструкция ничего не делает, но потребляет процессорное время и вызывается при каждом вращении.30второсортныйPAUSE, ниже приведена реализация этого метода на машине с архитектурой 386:

TEXT runtime·procyield(SB),NOSPLIT,$0-0
    MOVL    cycles+0(FP), AX
again:
    PAUSE
    SUBL    $1, AX
    JNZ    again
    RET

После обработки специальной логики, связанной со вращением, мьютекс затем вычисляет последнее состояние текущего мьютекса в соответствии с контекстом, и соответственно будут обновлены несколько различных условий.stateразличная информация, хранящаяся вmutexLocked,mutexStarving,mutexWokenа такжеmutexWaiterShift:

        new := old
        if old&mutexStarving == 0 {
            new |= mutexLocked
        }
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving
        }
        if awoke {
            new &^= mutexWoken
        }

После вычисления нового состояния мьютекса мы используемatomicФункция CAS, предоставляемая пакетом, изменяет состояние мьютекса.Если текущий мьютекс уже отключен и заблокирован, он пропустит текущий шаг и вызоветruntime_SemacquireMutexметод:

        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
            if old&mutexStarving != 0 {
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                if !starving || old>>mutexWaiterShift == 1 {
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
        } else {
            old = m.state
        }
    }
}

runtime_SemacquireMutexОсновная функция метода заключается в передачеMutexИспользование семафора в мьютексе гарантирует, что ресурс не будет получен двумя горутинами, из чего мы можем видетьMutexНа самом деле, это инкапсуляция семафора более низкого уровня и предоставление внешнего мира более простого в использовании API.runtime_SemacquireMutexбудет вызываться непрерывно в методеgoparkunlockпоставить ток Горутина засыпает, ожидая получения семафора.

Как только текущая горутина может получить семафор, это доказывает, что мьютекс был разблокирован, и метод немедленно возвращается,LockОставшийся код метода также будет продолжать выполняться.Когда текущий мьютекс находится в режиме голодания, если горутина является последней горутиной в очереди или время ожидания блокировки меньшеstarvationThresholdNs(1ms), текущая горутина напрямую захватит мьютекс, выйдет из голодного режима и получит блокировку.

разблокировать

По сравнению с этим процесс разблокировки мьютекса очень прост.Unlockметод будет использоваться непосредственноatomicпакет предоставленAddInt32, если возвращаемое новое состояние не равно0войдетunlockSlowметод:

func (m *Mutex) Unlock() {
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 {
        m.unlockSlow(new)
    }
}

unlockSlowМетод сначала проверяет состояние блокировки.Если текущий мьютекс был разблокирован, сразу будет выброшено исключение.sync: unlock of unlocked mutexПрервите текущую программу.При нормальных обстоятельствах она войдет в разные ветви в зависимости от того, является ли текущее состояние мьютекса нормальным режимом или режимом голодания:

func (m *Mutex) unlockSlow(new int32) {
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }
    if new&mutexStarving == 0 {
        old := new
        for {
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false, 1)
                return
            }
            old = m.state
        }
    } else {
        runtime_Semrelease(&m.sema, true, 1)
    }
}

Если текущее состояние мьютекса находится в режиме голодания, он будет вызываться напрямую.runtime_SemreleaseМетод напрямую передает текущую блокировку следующему официанту, который пытается получить блокировку, и официант будет установлен после пробуждения.mutexLockedсостоянии, так как он все еще находится вmutexStarving, поэтому новая горутина также не может получить блокировку.

В обычном режиме, если для текущего мьютекса нет ожидающего или состояние, указанное младшими тремя битами, равно0, то текущему методу не нужно будить другие горутины, и он может вернуться напрямую.Когда горутина находится в состоянии ожидания, она все равно пройдетruntime_SemreleaseРазбудите соответствующую горутину и передайте право собственности на замок.

резюме

через мьютексMutexИз анализа процесса блокировки и разблокировки мы можем сделать следующие выводы, которые могут помочь нам лучше понять принцип работы блокировок мьютексов.Процесс блокирования блокировок мьютексов более сложен, включая спин, семафор и такие концепции, как планирование Горутины. :

  • Если мьютекс находится в инициализированном состоянии, он будет установлен непосредственноmutexLockedзамок;
  • Если мьютекс находится вmutexLockedИ работать в обычном режиме, он войдет в спин, выполнить 30 разPAUSEИнструкция потребляет процессорное время в ожидании снятия блокировки;
  • Если текущая горутина ожидает блокировки дольше, чем1ms, мьютекс будет переведен в режим голодания;
  • Мьютексы обычно проходят черезruntime_SemacquireMutexметод вызоветLockГорутина переходит в состояние сна, ожидая, пока горутина, удерживающая семафор, разбудит текущую сопрограмму;
  • Если текущая горутина является последней ожидающей горутиной в мьютексе или время ожидания меньше1ms, текущая горутина переключит мьютекс обратно в нормальный режим;

Процесс разблокировки мьютекса относительно прост, хотя есть некоторые различия в обработке обычного режима и режима голодания, логика понятна и понятна из-за небольшого количества строк кода:

  • Если мьютекс был разблокирован, вызовитеUnlockИсключение будет сгенерировано напрямую;
  • Если мьютекс находится в режиме голодания, владение блокировкой будет напрямую передано следующему официанту в очереди, и официант будет нести ответственность за установкуmutexLockedбит флага;
  • Если мьютекс находится в обычном режиме, и нет горутины, ожидающей снятия блокировки, или горутина, которая была разбужена, получила блокировку, она вернется напрямую, в других случаях она вернется черезruntime_SemreleaseРазбудить соответствующую горутину;

RWMutex

Мьютекс чтения-записи также является языком GosyncОдин из интерфейсов предоставляемых пакетом для нас, общий сервис имеет очень высокое отношение чтения и записи к ресурсам.Если большинство запросов это запросы на чтение, они не будут влиять друг на друга, то почему мы не можем читать и писать в ресурсы?Как насчет разделения записи? То естьRWMutexПроблема, решаемая мьютексом чтения-записи, не ограничивает одновременное чтение ресурсов, но операции чтения-записи, записи-записи не могут выполняться параллельно.

читать Напишите
читать Y N
Напишите N N

Реализация мьютекса чтения-записи на языке Go:RWMutex, который не только содержит мьютекс, но и содержит два семафора для записи, ожидающей чтения, и чтения, ожидающей записи:

type RWMutex struct {
    w           Mutex
    writerSem   uint32
    readerSem   uint32
    readerCount int32
    readerWait  int32
}

readerCountСохраняет количество выполняемых в данный момент операций чтения, последниеreaderWaitУказывает количество ожидающих операций чтения, когда операция записи заблокирована.

блокировка чтения

Блокировка блокировки чтения очень проста, мы передаемatomic.AddInt32методreaderCountДобавьте один, если метод возвращает отрицательное число, это означает, что текущая горутина получила блокировку записи, и текущая горутина вызоветruntime_SemacquireMutexЗастрял в спячке, ожидая пробуждения:

func (rw *RWMutex) RLock() {
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
        runtime_SemacquireMutex(&rw.readerSem, false, 0)
    }
}

Если нет операции записи для получения текущего мьютекса, текущий метод будетreaderCountВозврат после увеличения; вызывается, когда горутина хочет снять блокировку чтенияRUnlockметод:

func (rw *RWMutex) RUnlock() {
    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
        rw.rUnlockSlow(r)
    }
}

Этот метод уменьшит количество читаемых ресурсовreaderCount, если текущий метод встречает ситуацию, когда возвращаемое значение меньше нуля, значит, идет текущая операция записи, и в это время она должна пройтиrUnlockSlowметод для уменьшения количества операций чтения, ожидающих текущей операции записиreaderWaitи семафор, запускающий операцию записи после завершения всех операций чтения.writerSem:

func (rw *RWMutex) rUnlockSlow(r int32) {
    if r+1 == 0 || r+1 == -rwmutexMaxReaders {
        throw("sync: RUnlock of unlocked RWMutex")
    }
    if atomic.AddInt32(&rw.readerWait, -1) == 0 {
        runtime_Semrelease(&rw.writerSem, false, 1)
    }
}

writerSemПосле запуска процесс, пытающийся получить блокировку чтения-записи, проснется и получит блокировку.

Блокировка чтения-записи

Когда пользователь ресурса хочет получить блокировку на чтение-запись, он должен передатьLockметод, вLockМетод сначала вызывает мьютекс чтения-записи, удерживаемыйMutexизLockЭтот метод гарантирует, что другие горутины, получившие блокировки чтения-записи, перейдут в состояние ожидания, а последующиеatomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders)По сути, это блокировка последующих операций чтения:

func (rw *RWMutex) Lock() {
    rw.w.Lock()
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
        runtime_SemacquireMutex(&rw.writerSem, false, 0)
    }
}

Если есть еще другие Горутины, в настоящее время удерживающие блокировку чтения мьютекса, Горутина вызоветruntime_SemacquireMutexПереходит в спящее состояние и срабатывает при ожидании снятия блокировки чтения.writerSemСемафор пробуждает текущую сопрограмму.

После завершения операций чтения и записи в ресурсеatomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)Вернитесь к положительному числу и запустите все горутины, ожидающие блокировки чтения, через цикл for:

func (rw *RWMutex) Unlock() {
    r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
    if r >= rwmutexMaxReaders {
        throw("sync: Unlock of unlocked RWMutex")
    }
    for i := 0; i < int(r); i++ {
        runtime_Semrelease(&rw.readerSem, false, 0)
    }
    rw.w.Unlock()
}

В конце метода,RWMutexУдерживаемый мьютекс будет освобожден, чтобы другие сопрограммы могли повторно получить блокировку чтения-записи.

резюме

По сравнению с комплексным мьютексом состоянияMutexНапример, мьютекс чтения-записиRWMutexХотя предоставляемые функции очень сложны, из-заMutexНа «плече» общая реализация будет намного проще.

  1. readerSem— Когда блокировка чтения-записи снята, уведомить горутину, ожидающую получения блокировки чтения;
  2. writerSem— Когда блокировка чтения снята, уведомить горутину, ожидающую получения блокировки чтения-записи;
  3. wБлокировки Mutex — гарантируют взаимное исключение между операциями записи;
  4. readerCount— Подсчитайте количество сопрограмм, выполняющих в данный момент операции чтения, и уменьшите его при срабатывании блокировки записиrwmutexMaxReadersБлокировать последующие операции чтения;
  5. readerWait— Текущая блокировка чтения-записи ожидает количество горутин для операций чтения, после срабатыванияLockкаждый раз послеRUnlockОн будет уменьшен на единицу, и когда он вернется к нулю, горутина получит блокировку чтения-записи;
  6. Когда снята блокировка чтения-записиUnlockСначала будут уведомлены все операции чтения, а затем удерживаемый мьютекс будет освобожден, что может гарантировать, что операции чтения не будут «умирать от голода» из-за непрерывных операций записи;

RWMutexсуществуетMutexВышеупомянутое обеспечивает дополнительную функцию разделения чтения-записи, которая может обеспечить повышение производительности, когда запросов на чтение намного больше, чем запросов на запись Мы также можем выбрать блокировки взаимного исключения чтения-записи, когда это подходит.

WaitGroup

WaitGroupэто язык гоsyncОбщий механизм синхронизации в пакете, его можно использовать для ожидания возврата серии горутин.Общий сценарий использования — выполнение RPC в пакетном режиме или вызов внешних служб:

requests := []*Request{...}

wg := &sync.WaitGroup{}
wg.Add(len(requests))

for _, request := range requests {
    go func(r *Request) {
        defer wg.Done()

        // res, err := service.call(r)
    }(request)
}

wg.Wait()

пройти черезWaitGroupМы можем очень легко синхронизировать информацию между несколькими горутинами, и код, который изначально выполнялся последовательно, также может выполняться одновременно в нескольких горутинах, что ускоряет обработку программы.В приведенном выше коде только после того, как все горутины завершили выполнение.WaitМетод возвращается, и программа может продолжать выполнять другую логику.

golang-syncgroup

В общем, он делает именно то, что следует из его названия,DoneЧаще всего он используется для ожидания завершения всех одновременно выполняемых задач в группе горутин.

структура

WaitGroupПеременные-члены в структуре очень простые, в которыхnoCopyОсновная функция заключается в обеспеченииWaitGroupОн не будет скопирован разработчиками путем переназначения, что приведет к странному поведению:

type WaitGroup struct {
    noCopy noCopy

    state1 [3]uint32
}

copylockПакет — это анализатор для проверки подобных ошибок, и принцип его действия таков:во время компиляцииПроверьте, содержит ли скопированная переменнаяnoCopyилиsyncключевое слово, если оно содержит текущее ключевое слово, будет сообщено о следующей ошибке:

package main

import (
    "fmt"
    "sync"
)

func main() {
    wg := sync.Mutex{}
    yawg := wg
    fmt.Println(wg, yawg)
}

$ go run proc.go
./prog.go:10:10: assignment copies lock value to yawg: sync.Mutex
./prog.go:11:14: call of fmt.Println copies lock value: sync.Mutex
./prog.go:11:18: call of fmt.Println copies lock value: sync.Mutex

Этот код будет назначать и вызыватьfmt.PrintlnКогда происходит копирование значения, анализатор сообщает об ошибке, вы можете получить к ней доступ, обратившисьСвязьПопробуйте запустить этот код.

КромеnoCopyПомимо,WaitGroupСтруктура также содержит массив, занимающий в общей сложности 12 байт, Этот массив будет хранить состояние и семафор, удерживаемые текущей структурой, которая ведет себя совершенно по-разному на 64-битных и 32-битных машинах.

golang-waitgroup-state

WaitGroupчастный методstateможет помочь нам отstate1поле для получения его состояния и семафора.

действовать

WaitGroupЕсть только три открытых интерфейсаAdd,Waitа такжеDoneDoneпросто вызывается методwg.Add(-1)Самой по себе особой логики нет, давайте разберем оставшиеся два метода:

func (wg *WaitGroup) Add(delta int) {
    statep, semap := wg.state()
    state := atomic.AddUint64(statep, uint64(delta)<<32)
    v := int32(state >> 32)
    w := uint32(state)
    if v < 0 {
        panic("sync: negative WaitGroup counter")
    }
    if v > 0 || w == 0 {
        return
    }
    *statep = 0
    for ; w != 0; w-- {
        runtime_Semrelease(semap, false, 0)
    }
}

AddОсновная функция метода заключается в обновленииWaitGroupсчетчики, проведенные вcounter, старшие 32 бита 64-битного состояния, хотяAddПараметр, передаваемый методу, может быть отрицательным, ноWaitGroupСчетчик может быть только неотрицательным, при вызовеAddвызывает обнуление счетчика и все еще есть ожидающие горутины, он передается черезruntime_SemreleaseРазбудите все горутины, которые находятся в состоянии ожидания.

еще одинWaitGroupМетодыWaitДанные, хранящиеся в текущем счетчике, больше, чем0При изменении количества ожидающих горутинwaiterи позвониruntime_SemacquireЗаснуть.

func (wg *WaitGroup) Wait() {
    statep, semap := wg.state()
    for {
        state := atomic.LoadUint64(statep)
        v := int32(state >> 32)
        if v == 0 {
            return
        }
        if atomic.CompareAndSwapUint64(statep, state, state+1) {
            runtime_Semacquire(semap)
            if +statep != 0 {
                panic("sync: WaitGroup is reused before previous Wait has returned")
            }
            return
        }
    }
}

Спящая горутина будет ждатьAddметод в счетчике0вставай.

резюме

через паруWaitGroupПроведя анализ и исследования, мы можем сделать следующие выводы:

  • Addне может быть сWaitМетод вызывается одновременно в горутине, и как только он появится, программа рухнет;
  • WaitGroupДолжен бытьWaitМетод можно использовать повторно только после возврата;
  • Doneв самый разAddПростая инкапсуляция метода, мы можемAddМетод передает любое отрицательное число (счетчик должен быть гарантированно неотрицательным), чтобы быстро сбросить счетчик до нуля, чтобы разбудить другие ожидающие горутины;
  • Может быть несколько Горутин, ожидающих текущегоWaitGroupКогда счетчик обнуляется, эти горутины также будут разбужены «одновременно»;

Once

Язык Go в стандартной библиотекеsyncТакже доступно в пакете синхронизацииOnceСемантика, его основная функция на самом деле хорошо понятна, гарантируется во время работы программы Go.OnceСоответствующий фрагмент кода будет выполнен только один раз.

В коде, показанном ниже,DoФункция, переданная в метод, будет выполнена только один раз, то есть мы увидим ее только один раз, когда запустим код, показанный ниже.only onceВыходной результат:

func main() {
    o := &sync.Once{}
    for i := 0; i < 10; i++ {
        o.Do(func() {
            fmt.Println("only once")
        })
    }
}

$ go run main.go
only once

так какsyncструктуры в пакетах,Onceимеет очень простую структуру данных, каждыйOnceСтруктура содержит только один код, используемый для определения того, был ли выполнен кодовый блок.doneи мьютексMutex:

type Once struct {
    done uint32
    m    Mutex
}

OnceЕдинственный способ открыть структуру внешнему миру — этоDo, метод примет функцию с пустыми входными параметрами, если вы используетеatomic.LoadUint32Проверив, что функция была выполнена, она вернется напрямую, иначе войдетdoSlowЗапустите переданную функцию:

func (o *Once) Do(f func()) {
    if atomic.LoadUint32(&o.done) == 0 {
        o.doSlow(f)
    }
}

func (o *Once) doSlow(f func()) {
    o.m.Lock()
    defer o.m.Unlock()
    if o.done == 0 {
        defer atomic.StoreUint32(&o.done, 1)
        f()
    }
}

doSlowРеализация также очень проста, мы сначала получаем мьютекс для текущей горутины, а затем передаемdeferключевые слова будутdoneпеременная-член имеет значение1и запустите переданную функцию, независимо от того, работает ли текущая функция нормально или выдаетpanic, текущий метод будетdoneустановлен в1Гарантируется, что функция не будет выполнена во второй раз.

резюме

как гарантия того, сколько раз функция будет выполненаOnceструктура, которая использует мьютекс иatomicПредоставленный метод реализует семантику, что функция может быть выполнена только один раз во время работы программы, В процессе его использования нам также необходимо обратить внимание на следующее:

  • DoФункция, переданная в метод, будет выполнена только один раз, даже если в функции что-то произойдет.panic;
  • позвони дваждыDoКогда метод передается другой функции, будет выполнена только функция, вызванная в первый раз;

Cond

Язык Go предоставляется в стандартной библиотеке.CondПо сути, это условная переменная, которая передается черезCondМы можем заставить ряд горутин просыпаться только тогда, когда срабатывает определенное событие или условие, каждыйCondСтруктура содержит мьютексL, давайте сначала посмотримCondКак это используется:

func main() {
    c := sync.NewCond(&sync.Mutex{})

    for i := 0; i < 10; i++ {
        go listen(c)
    }

    go broadcast(c)

    ch := make(chan os.Signal, 1)
    signal.Notify(ch, os.Interrupt)
    <-ch
}

func broadcast(c *sync.Cond) {
    c.L.Lock()
    c.Broadcast()
    c.L.Unlock()
}

func listen(c *sync.Cond) {
    c.L.Lock()
    c.Wait()
    fmt.Println("listen")
    c.L.Unlock()
}

$ go run main.go
listen
listen
...
listen

В приведенном выше коде мы запускаем 11 горутин одновременно, 10 из них будут проходить черезWaitДождитесь желаемого сигнала или события, а оставшаяся горутина вызоветBroadcastметод уведомляет все горутины, застрявшие в ожидании, при вызовеBoardcastметод, он будет печатать 10 раз"listen"и завершите вызов.

golang-cond-broadcast

структура

CondСтруктура содержитnoCopyа такжеcopyCheckerДва поля, первое используется для гарантииCondБольше никакого копирования во время компиляции, что гарантирует, что копирование во время выполнения будет напрямуюpanic, другой замок, удерживаемыйLна самом деле это интерфейсLocker, любая реализацияLockа такжеUnlockСтруктура метода может быть использована какNewCondПараметры метода:

type Cond struct {
    noCopy noCopy

    L Locker

    notify  notifyList
    checker copyChecker
}

последняя переменная в структуреnotifyListНа самом деле для достиженияCondМеханизм синхронизации, структура на самом делеGoroutineСвязанный список:

type notifyList struct {
    wait uint32
    notify uint32

    lock mutex
    head *sudog
    tail *sudog
}

В этой структуреheadа такжеtailОни указывают на начало и конец всего связанного списка соответственно, иwaitа такжеnotifyПредставляют горутину, ожидающую в данный момент, и горутину, которая была уведомлена, соответственно. Мы можем использовать эти две переменные, чтобы подтвердить, что текущая горутина должна быть уведомлена, и горутина, которая была уведомлена.

действовать

CondнезащищенныйWaitметод усыпит текущую горутину, сначала она вызоветruntime_notifyListAddбуду ждать счетчик+1, затем разблокируйте и позвонитеruntime_notifyListWaitПодождите, пока проснутся другие горутины:

func (c *Cond) Wait() {
    c.checker.check()
    t := runtime_notifyListAdd(&c.notify)
    c.L.Unlock()
    runtime_notifyListWait(&c.notify, t)
    c.L.Lock()
}

func notifyListAdd(l *notifyList) uint32 {
    return atomic.Xadd(&l.wait, 1) - 1
}

notifyListWaitОсновная цель метода — получить текущую Goroutine и добавить ее вnotifyListКонец связанного списка:


func notifyListWait(l *notifyList, t uint32) {
    lock(&l.lock)

    if less(t, l.notify) {
        unlock(&l.lock)
        return
    }

    s := acquireSudog()
    s.g = getg()
    s.ticket = t
    if l.tail == nil {
        l.head = s
    } else {
        l.tail.next = s
    }
    l.tail = s
    goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
    releaseSudog(s)
}

Помимо добавления текущей горутины в конец связанного списка, мы также вызываемgoparkunlockВпадая в спящее состояние, эта функция также является методом, часто используемым при переключении горутин в языке Go: она напрямую откажется от права использования текущего процессора и будет ждать, пока планировщик проснется.

golang-cond-notifylist

Condпредоставленный извнеSignalа такжеBroadcastметод используется для пробуждения вызоваWaitДля горутины, которая засыпает, судя по названиям двух методов, первый разбудит горутину в начале очереди, а второй разбудит все горутины в очереди:

func (c *Cond) Signal() {
    c.checker.check()
    runtime_notifyListNotifyOne(&c.notify)
}

func (c *Cond) Broadcast() {
    c.checker.check()
    runtime_notifyListNotifyAll(&c.notify)
}

notifyListNotifyAllМетод возьмет все горутины из связанного списка и вызовет их по очереди.readyWithTime, метод пройдетgoreadyРазбудите горутину цели:

func notifyListNotifyAll(l *notifyList) {
    s := l.head
    l.head = nil
    l.tail = nil

    atomic.Store(&l.notify, atomic.Load(&l.wait))

    for s != nil {
        next := s.next
        s.next = nil
        readyWithTime(s, 4)
        s = next
    }
}

Хотя он будет пробуждать все горутины по очереди, порядок пробуждения здесь на самом деле в порядке присоединения к очереди, и первый будет добавлен первым.goreadyПосле пробуждения добавленной горутине может потребоваться дождаться планирования планировщика.

а такжеnotifyListNotifyOneфункция будет запускаться только сsudogСформированный связанный список удовлетворяетsudog.ticket == l.notifyгорутина и проходreadyWithTimeбудить:

func notifyListNotifyOne(l *notifyList) {
    t := l.notify
    atomic.Store(&l.notify, t+1)

    for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
        if s.ticket == t {
            n := s.next
            if p != nil {
                p.next = n
            } else {
                l.head = n
            }
            if n == nil {
                l.tail = p
            }
            s.next = nil
            readyWithTime(s, 4)
            return
        }
    }
}

Как правило, мы выбираем вызов, когда определенные условия не выполняются.WaitВпадая в сон, когда некоторые горутины обнаруживают, что условия для пробуждения в настоящее время соблюдены, они могут использоватьSignalуведомить одного илиBroadcastУведомляет все горутины о том, что текущие условия выполнены и можно продолжить выполнение работы.

резюме

а такжеMutexв сравнении с,CondЭто также механизм синхронизации, который не всем хорошо понятен и понятен, он обеспечивает механизм ожидания типа очереди FIFO, а также обеспечиваетSignalа такжеBroadcastДва разных метода пробуждения по сравнению с использованиемfor {}занят ожиданием, используйтеCondФункция, которая может отказаться от текущего процессора, когда долгосрочные условия не могут быть выполнены.Если мы используем ее разумно, мы все же можем улучшить производительность в некоторых случаях.В процессе использования нам нужно обратить внимание на:

  • WaitМетод должен быть использован до вызоваL.Lockдержите ресурс, а то так и будетpanicвызвать сбой программы;
  • SignalГорутины, пробуждаемые этим методом, находятся в начале очереди и ждут дольше всех;
  • BroadcastХотя это горутина, которая транслирует все ожидающие уведомления, она также находится в определенном порядке, когда фактически просыпается;

примитив расширения

В дополнение к примитивам синхронизации, представленным в этих стандартных библиотеках, язык Go также доступен в подрепозиториях.x/syncДополнительные четыре примитива синхронизации предоставлены в ,ErrGroup,Semaphore,SingleFlightа такжеSyncMap,один из нихSyncMapНа самом деле этоsyncв упаковкеsync.Map, это в версии 1.9 Представлено в Gox/syncпакеты, которые в конечном итоге были перемещены в стандартную библиотеку по мере того, как API совершенствовался и стабилизировался.syncв сумке.

golang-extension-sync-primitives

В этом разделе мы представим три примитива, доступных в пакете расширения в этом разделе.ErrGroup,Semaphoreа такжеSingleFlight.

ErrGroup

вспомогательный складx/syncупаковать вerrgroupПо сути, он предоставляет нам функции синхронизации, распространения ошибок и отмены контекста в наборе горутин.Мы можем получать данные веб-страниц параллельно следующим образом:

var g errgroup.Group
var urls = []string{
    "http://www.golang.org/",
    "http://www.google.com/",
    "http://www.somestupidname.com/",
}
for i := range urls {
    url := urls[i]
    g.Go(func() error {
        resp, err := http.Get(url)
        if err == nil {
            resp.Body.Close()
        }
        return err
    })
}
if err := g.Wait(); err == nil {
    fmt.Println("Successfully fetched all URLs.")
}

Goметод может создать горутину и выполнить в ней переданную функцию, аWaitметод будет ждатьGoПосле возврата всех горутин, созданных методом, возвращается первая ненулевая ошибка.Если все горутины не возвращают ошибку, функция вернетnil.

структура

errgroupв упаковкеGroupСтруктура также состоит из еще трех важных частей:

  1. СоздайтеContextвернулся, когдаcancelфункция, в основном используемая для использования уведомленийcontextГорутина может останавливать работу и отказываться от ресурсов из-за некоторых ошибок подзадачи;
  2. Используется для ожидания, пока группа горутин выполнит подзадачи.WaitGroupпримитивы синхронизации;
  3. Используется для принятия подзадач, которые возвращают ошибкиerrи гарантияerrбудет назначен только один разerrOnce;
type Group struct {
    cancel func()

    wg sync.WaitGroup

    errOnce sync.Once
    err     error
}

Вместе эти поля образуютGroupСтруктурируйте и предоставьте нам такие функции, как синхронизация, распространение ошибок и отмена контекста.

действовать

errgroupЕдинственный открытый конструкторWithContextметод, мы можем начать только сContextсоздать новыйGroupПеременная,WithCancelВозвращенная функция отмены также будет использоваться только вGroupВнутреннее использование структуры:

func WithContext(ctx context.Context) (*Group, context.Context) {
    ctx, cancel := context.WithCancel(ctx)
    return &Group{cancel: cancel}, ctx
}

Для создания новой параллельной подзадачи необходимо использоватьGoметод, этот метод будет внутреннеWaitGroupДобавьте один и создайте новую горутину, запускайте подзадачи внутри горутины и вовремя вызывайте, когда возвращается ошибкаcancelи кerrПрисвоение, только самая ранняя возвращенная ошибка будет воспринята восходящим потоком, а последующие ошибки будут отброшены:

func (g *Group) Go(f func() error) {
    g.wg.Add(1)

    go func() {
        defer g.wg.Done()

        if err := f(); err != nil {
            g.errOnce.Do(func() {
                g.err = err
                if g.cancel != nil {
                    g.cancel()
                }
            })
        }
    }()
}

func (g *Group) Wait() error {
    g.wg.Wait()
    if g.cancel != nil {
        g.cancel()
    }
    return g.err
}

WaitМетод на самом деле просто вызываетWaitGroupСинхронный метод отменяется, когда все подзадачи выполнены.Contextи вернуть возможные ошибки.

резюме

errgroupв упаковкеGroupПринцип реализации примитивов синхронизации по-прежнему очень прост. Он не использует очень низкоуровневые API-интерфейсы в пакетах времени выполнения, а просто инкапсулирует базовую семантику синхронизации для предоставления более сложных функций. При их использовании также необходимо обратить внимание на следующее. Проблемы:

  • Вызывается при возникновении ошибки или завершении ожиданияContextизcancelметод отмены контекста;
  • Будет возвращена только первая возникшая ошибка, а остальные ошибки будут отброшены напрямую;

Semaphore

Семафор — это распространенный механизм синхронизации в параллельном программировании, который гарантирует, что удерживаемый счетчик0Между инициализированными весами счетчик в семафоре будет вычитаться на соответствующее значение каждый раз, когда ресурс будет получен, и добавлен обратно, когда он будет освобожден.Когда счетчик больше размера семафора, он перейдет в спящий режим и будет ждать другие процессы для выпуска сигнала. , мы часто используем его при контроле количества процессов, обращающихся к ресурсам.

Пакет расширения Golang предоставляет взвешенные семафоры. Мы можем управлять доступом к ресурсам в соответствии с разным весом. Этот пакет предоставляет только четыре внешних метода:

  • NewWeightedИспользуется для создания новых семафоров;
  • AcquireРесурс с указанным весом получен, если в данный момент нет «незанятого ресурса», то он впадает в спящее ожидание;
  • TryAcquireОн также используется для получения ресурсов указанного веса, но если в данный момент нет «незанятого ресурса», он вернется напрямуюfalse;
  • ReleaseИспользуется для освобождения ресурсов указанного веса;

структура

NewWeightedОсновная функция метода создает новый семафор веса, а вес с наибольшим входящим семафором вернет новыйWeightedУказатель структуры:

func NewWeighted(n int64) *Weighted {
    w := &Weighted{size: n}
    return w
}

type Weighted struct {
    size    int64
    cur     int64
    mu      sync.Mutex
    waiters list.List
}

WeightedСтруктура содержитwaitersВ списке хранятся «пользователи», ожидающие получения ресурсов, а также верхняя граница текущего семафора и счетчикcur, диапазон этого счетчика[0, size]:

golang-semaphore

Счетчик в семафоре будет меняться при доступе пользователя и освобождении ресурсов.Введенная концепция весов может помочь нам лучше контролировать степень детализации доступа к ресурсам и максимально удовлетворить все распространенные варианты использования.

Получать

Мы упоминали вышеAcquireМетод – это метод, используемый для получения заданного весового ресурса.Этот метод состоит из трех различных ситуаций:

  1. Когда оставшиеся ресурсы в семафоре больше, чем полученные ресурсы, и нет ожидающих горутин, семафор будет получен напрямую;
  2. Когда семафор, который необходимо получить, больше, чемWeightedКогда размер , он вернется напрямую, потому что невозможно выполнить условие;
  3. В других случаях текущая горутина будет добавлена ​​в список ожидания и переданаselectПодождите, пока текущая горутина будет разбужена, и после пробуждения семафор будет получен;
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
    s.mu.Lock()
    if s.size-s.cur >= n && s.waiters.Len() == 0 {
        s.cur += n
        s.mu.Unlock()
        return nil
    }

    if n > s.size {
        s.mu.Unlock()
        <-ctx.Done()
        return ctx.Err()
    }

    ready := make(chan struct{})
    w := waiter{n: n, ready: ready}
    elem := s.waiters.PushBack(w)
    s.mu.Unlock()

    select {
    case <-ctx.Done():
        err := ctx.Err()
        s.mu.Lock()
        select {
        case <-ready:
            err = nil
        default:
            s.waiters.Remove(elem)
        }
        s.mu.Unlock()
        return err

    case <-ready:
        return nil
    }
}

Другой способ получения семафораTryAcquireНапротив, это очень просто: он только оценивает, достаточно ли у текущего семафора ресурсов для получения, и если ресурсов достаточно, он немедленно возвращается.trueиначе вернетсяfalse:

func (s *Weighted) TryAcquire(n int64) bool {
    s.mu.Lock()
    success := s.size-s.cur >= n && s.waiters.Len() == 0
    if success {
        s.cur += n
    }
    s.mu.Unlock()
    return success
}

а такжеAcquireв сравнении с,TryAcquireПоскольку он не ждет освобождения ресурсов, он может быть более подходящим для некоторых сценариев, чувствительных к задержке, когда пользователю необходимо немедленно получить результат.

освобожден

последний, чтобы представитьReleaseМетод на самом деле очень прост: когда мы отпускаем семафор,Releaseметод будет проходить от начала до концаwaitersДля всех ожидающих в списке, если семафор после освобождения ресурса имеет достаточно оставшихся ресурсов, указанная Горутина будет вызываться через Канал:

func (s *Weighted) Release(n int64) {
    s.mu.Lock()
    s.cur -= n
    for {
        next := s.waiters.Front()
        if next == nil {
            break
        }

        w := next.Value.(waiter)
        if s.size-s.cur < w.n {
            break
        }

        s.cur += w.n
        s.waiters.Remove(next)
        close(w.ready)
    }
    s.mu.Unlock()
}

Конечно, могут быть случаи, когда оставшиеся ресурсы не могут вызвать Горутин. В это время текущий метод снимает блокировку и возвращается напрямую. Анализируя этот код, мы также можем обнаружить, что если семафор требует много ресурсы, он может быть не в состоянии получить блокировку в течение длительного времени, что также может бытьAcquireметод принимает другой параметрContextПричина установки тайм-аута для получения семафора.

резюме

Взвешенный семафор имеет больше сценариев применения.Это также единственная реализация семафора, предоставляемая языком Go.В процессе его использования нам необходимо обратить внимание на следующие моменты:

  • Acquireа такжеTryAcquireДля получения ресурсов можно использовать оба метода: первый используется для синхронного получения и будет ожидать освобождения блокировки, а второй будет возвращаться напрямую, когда блокировка не может быть получена;
  • ReleaseМетод разбудит горутины, которые можно разбудить в порядке FIFO;
  • Если горутина получает больше ресурсов, потому чтоReleaseСтратегия выпуска может ждать долгое время;

SingleFlight

singleflightЭто еще один примитив синхронизации, предоставляемый в пакете расширения языка Go, который на самом деле является любимым механизмом расширения синхронизации автора. Он может подавлять множественные повторные запросы к нисходящему потоку в службе. Обычный сценарий использования: — Мы используем Redis для кэширования некоторых популярных данные в базе данных и установите время ожидания.В момент, когда время ожидания кеша истекает, может быть много параллельных запросов.Обнаружено, что Redis не содержит кеша, поэтому в базу данных попадет много трафика. задержка и качество обслуживания.

golang-query-without-single-flight

ноsingleflightможет эффективно решить эту проблему, его основная функция заключается вKeyВ конце концов, будет сделан только один вызов функции.В этом контексте будет сделан только один запрос к базе данных.Результат запроса будет записан обратно в Redis и синхронизирован со всеми запросами.KeyПользователь:

golang-extension-single-flight

Это фактически уменьшает мгновенный трафик к нисходящему потоку, и очень много времени требуется для получения нисходящих ресурсов, таких как: доступ к кешам, базам данных и другим сценариям, которые очень подходят для использования.singleflightОптимизация сервиса. В приведенном выше примере мы можем использовать его, когда хотим получить данные как из Redis, так и из базы данных.singleflightЭта предусмотренная функция снижает давление на выходе; ее использование на самом деле очень просто, мы можем использовать ее напрямую.singleflight.Group{}создать новыйGroupструктуру, а затем, позвонивDoметод для подавления того же запроса:

type service struct {
    requestGroup singleflight.Group
}

func (s *service) handleRequest(ctx context.Context, request Request) (Response, error) {
    v, err, _ := requestGroup.Do(request.Hash(), func() (interface{}, error) {
        rows, err := // select * from tables
        if err != nil {
            return nil, err
        }
        return rows, nil
    })
    if err != nil {
        return nil, err
    }

    return Response{
        rows: rows,
    }, nil
}

Приведенный выше код использует хэш запроса в качестве ключа для подавления того же запроса, мы также можем выбрать некоторые более важные или важные поля в качествеDoПервый параметр метода позволяет избежать мгновенного большого количества запросов к нисходящему.

структура

GroupСама структура заблокирована мьютексомMutexи отKeyприбытьcallТаблица отображения указателей структур состоит из каждогоcallСтруктура сохраняет информацию, соответствующую текущему вызову:

type Group struct {
    mu sync.Mutex
    m  map[string]*call
}

type call struct {
    wg sync.WaitGroup

    val interface{}
    err error

    dups  int
    chans []chan<- Result
}

callв структуреvalа такжеerrПоля назначаются только один раз при выполнении переданной функции, и они будут толькоWaitGroupдождитесь окончания чтения, покаdupsа такжеchansполя используются для хранения текущихsingleflightКоличество запросов для подавления и передачи информации вызывающей стороне при возврате результата.

действовать

singleflightПакет предоставляет два метода подавления одного и того же запроса, один из которых — метод синхронного ожидания.Do, а другой, который возвращает каналDoChan, эти два метода не имеют большой разницы в функциях, лишь немного отличаются производительностью интерфейса.

КаждыйDoПри вызове метода мьютекс захватывается и пытаетсяGroupУдерживаемая таблица сопоставления загружается лениво, а затем оценивается, существует ли она ужеkeyСоответствующий вызов функции:

  1. когда нет соответствующегоcallКогда структура:
    1. инициализировать новыйcallуказатель структуры;
    2. УвеличиватьWaitGroupпроведенные счетчики;
    3. БудуcallУказатель структуры добавляется в таблицу отображения;
    4. Освободить удерживаемый мьютексMutex;
    5. блокировка вызоваdoCallМетод ожидает возврата результата;
  2. когда соответствующийcallструктура;
    1. УвеличиватьdupsСчетчик, представляющий текущее количество повторных вызовов;
    2. Освободить удерживаемый мьютексMutex;
    3. пройти черезWaitGroup.Waitдождитесь возврата запроса;
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
    g.mu.Lock()
    if g.m == nil {
        g.m = make(map[string]*call)
    }
    if c, ok := g.m[key]; ok {
        c.dups++
        g.mu.Unlock()
        c.wg.Wait()
        return c.val, c.err, true
    }
    c := new(call)
    c.wg.Add(1)
    g.m[key] = c
    g.mu.Unlock()

    g.doCall(c, key, fn)
    return c.val, c.err, c.dups > 0
}

потому чтоvalа такжеerrОба поля будут только вdoCallназначен метод, поэтому, когдаdoCallМетоды иWaitGroup.WaitКогда метод возвращается, эти два значения возвращаются вDoВызывающий функцию.

func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
    c.val, c.err = fn()
    c.wg.Done()

    g.mu.Lock()
    delete(g.m, key)
    for _, ch := range c.chans {
        ch <- Result{c.val, c.err, c.dups > 0}
    }
    g.mu.Unlock()
}

doCallбудет запускать переданную функциюfn, возвращаемое значение функции будет присвоеноc.valа такжеc.err, она будет вызвана после завершения выполнения функцииWaitGroup.DoneМетод уведомляет все подавленные запросы о том, что текущая функция завершена и к ней можно получить доступ изcallВозвращаемое значение выносится из структуры и возвращается, после чегоdoCallМетод захватит удерживаемый мьютекс и синхронизирует информацию с пользователем через конвейер.DoChanВызывающий метод.

func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
    ch := make(chan Result, 1)
    g.mu.Lock()
    if g.m == nil {
        g.m = make(map[string]*call)
    }
    if c, ok := g.m[key]; ok {
        c.dups++
        c.chans = append(c.chans, ch)
        g.mu.Unlock()
        return ch
    }
    c := &call{chans: []chan<- Result{ch}}
    c.wg.Add(1)
    g.m[key] = c
    g.mu.Unlock()

    go g.doCall(c, key, fn)

    return ch
}

DoChanМетоды иDoРазница в том, что он использует Goroutine для асинхронного выполнения.doCallи кcallдержалchansдобавить к срезуchan Resultпеременная, поэтому она может обеспечивать асинхронную передачу значений.

резюме

singleflightпакет предоставленGroupИнтерфейс действительно прост в использовании.Когда нам нужно подавить тот же запрос к нисходящему, мы можем использовать этот метод для увеличения пропускной способности и качества обслуживания.Мы также должны обратить внимание на следующие проблемы во время использования:

  • Doа такжеDoChanОдин для синхронной блокировки входящей функции, один для асинхронного вызова входящего параметра и принятия возвращаемого значения функции через канал;
  • Forgetспособ уведомленияsingleflightУдалите ключ в удерживаемой таблице сопоставления, и следующий вызов ключа выполнит метод напрямую, вместо того, чтобы ждать возврата предыдущей функции;
  • Как только вызванная функция вернет ошибку, все ожидающие горутины также получат ту же ошибку;

Суммировать

В этом разделе мы познакомим вас с базовыми примитивами, входящими в стандартную библиотеку языка Go, и примитивами расширения в пакете расширения.Эти примитивы параллельного программирования помогут нам лучше использовать возможности языка Go для создания высокопроизводительных и низкопроизводительных примитивов. Стоимость отложенного обслуживания и устранение ошибок из-за параллелизма, здесь мы возвращаемся к содержанию, представленному в этом разделе:

  • MutexМьютекс
    • Если мьютекс находится в инициализированном состоянии, он будет установлен непосредственноmutexLockedзамок;
    • Если мьютекс находится вmutexLockedИ работать в обычном режиме, он войдет в спин, выполнить 30 разPAUSEИнструкция потребляет процессорное время в ожидании снятия блокировки;
    • Если текущая горутина ожидает блокировки дольше, чем1ms, мьютекс будет переведен в режим голодания;
    • Мьютексы обычно проходят черезruntime_SemacquireMutexметод вызоветLockГорутина переходит в состояние сна, ожидая, пока горутина, удерживающая семафор, разбудит текущую сопрограмму;
    • Если текущая горутина является последней ожидающей горутиной в мьютексе или время ожидания меньше1ms, текущая горутина переключит мьютекс обратно в нормальный режим;
    • Если мьютекс был разблокирован, вызовитеUnlockИсключение будет сгенерировано напрямую;
    • Если мьютекс находится в режиме голодания, владение блокировкой будет напрямую передано следующему официанту в очереди, и официант будет нести ответственность за установкуmutexLockedбит флага;
    • Если мьютекс находится в обычном режиме, и нет горутины, ожидающей снятия блокировки, или горутина, которая была разбужена, получила блокировку, она вернется напрямую, в других случаях она вернется черезruntime_SemreleaseРазбудить соответствующую горутину;
  • RWMutexмьютекс чтения-записи
    • readerSem— Когда блокировка чтения-записи снята, уведомить горутину, ожидающую получения блокировки чтения;
    • writerSem— Когда блокировка чтения снята, уведомить горутину, ожидающую получения блокировки чтения-записи;
    • wБлокировки Mutex — гарантируют взаимное исключение между операциями записи;
    • readerCount— Подсчитайте количество сопрограмм, выполняющих в данный момент операции чтения, и уменьшите его при срабатывании блокировки записиrwmutexMaxReadersБлокировать последующие операции чтения;
    • readerWait— Текущая блокировка чтения-записи ожидает количество горутин для операций чтения, после срабатыванияLockкаждый раз послеRUnlockОн будет уменьшен на единицу, и когда он вернется к нулю, горутина получит блокировку чтения-записи;
    • Когда снята блокировка чтения-записиUnlockСначала будут уведомлены все операции чтения, а затем удерживаемый мьютекс будет освобожден, что может гарантировать, что операции чтения не будут «умирать от голода» из-за непрерывных операций записи;
  • WaitGroupПодождите, пока группа горутин закончит
    • Addне может быть сWaitМетод вызывается одновременно в горутине, и как только он появится, программа рухнет;
    • WaitGroupДолжен бытьWaitМетод можно использовать повторно только после возврата;
    • Doneв самый разAddПростая инкапсуляция метода, мы можемAddМетод передает любое отрицательное число (счетчик должен быть гарантированно неотрицательным), чтобы быстро сбросить счетчик до нуля, чтобы разбудить другие ожидающие горутины;
    • Может быть несколько Горутин, ожидающих текущегоWaitGroupКогда счетчик обнуляется, эти горутины также будут разбужены «одновременно»;
  • OnceВыполняется только один раз во время работы программы
    • DoФункция, переданная в метод, будет выполнена только один раз, даже если в функции что-то произойдет.panic;
    • позвони дваждыDoКогда метод передается другой функции, будет выполнена только функция, вызванная в первый раз;
  • CondПросыпаться, когда происходит указанное событие
    • WaitМетод должен быть использован до вызоваL.Lockдержите ресурс, а то так и будетpanicвызвать сбой программы;
    • SignalГорутины, пробуждаемые этим методом, находятся в начале очереди и ждут дольше всех;
    • BroadcastХотя это горутина, которая транслирует все ожидающие уведомления, она также находится в определенном порядке, когда фактически просыпается;
  • ErrGroupОбеспечивает синхронизацию, распространение ошибок и отмену контекста для группы горутин.
    • Вызывается при возникновении ошибки или завершении ожиданияContextизcancelметод отмены контекста;
    • Будет возвращена только первая возникшая ошибка, а остальные ошибки будут отброшены напрямую;
  • Semaphoreвзвешенный семафор
    • Acquireа такжеTryAcquireДля получения ресурсов можно использовать оба метода: первый используется для синхронного получения и будет ожидать освобождения блокировки, а второй будет возвращаться напрямую, когда блокировка не может быть получена;
    • ReleaseМетод разбудит горутины, которые можно разбудить в порядке FIFO;
    • Если горутина получает больше ресурсов, потому чтоReleaseСтратегия выпуска может ждать долгое время;
  • SingleFlightИспользуется для подавления дублирующих запросов к нижестоящим
    • Doа такжеDoChanОдин для синхронной блокировки входящей функции, один для асинхронного вызова входящего параметра и принятия возвращаемого значения функции через канал;
    • Forgetспособ уведомленияsingleflightУдалите ключ в удерживаемой таблице сопоставления, и следующий вызов ключа выполнит метод напрямую, вместо того, чтобы ждать возврата предыдущей функции;
    • Как только вызванная функция вернет ошибку, все ожидающие горутины также получат ту же ошибку;

Реализация этих примитивов синхронизации должна не только учитывать простоту использования интерфейса API и решать проблему конкуренции потоков, которая может возникнуть в параллельном программировании, но также должна оптимизировать задержку хвоста, чтобы избежать некоторых горутин, которые не могут получить блокировки или голодают. к смерти за ресурсы.Изучение примитивов синхронизации может также улучшить наше понимание и понимание параллельного программирования, и это также шаг к пониманию того, что параллельное программирование не может пересекаться.

Reference

Gitalking ...