[Перевод] Параллельное программирование в Go

Go

параллельное программирование

bouncing balls
bouncing balls

Эта статья будетGoПримеры языка знакомят с параллельным программированием, включая следующие:

  • Параллельное выполнение потоков (горутин)
  • Основные методы синхронизации (каналы и блокировки)
  • Основные шаблоны параллелизма в Go
  • Взаимоблокировки и гонки данных
  • Параллельные вычисления

Прежде чем начать, вам нужно понять, как писать самые простые программы на Go. Если вы уже знакомы с C/C++, Java или Python,A tour of goокажет вам некоторую помощь. вы также можете посмотреть наGo for C++ programmersилиGo for Java programmers.

1. Многопоточное выполнение

goroutineЭто механизм планирования движения. Go использует go для объявления и запуска нового потока выполнения с помощью механизма планирования goroutine. Он выполнит программу во вновь созданной горутине. В рамках одной программы все горутины используют одно и то же адресное пространство.

По сравнению с выделением пространства стека горутины легче и дешевле. Пространство стека инициализируется очень маленьким, и память необходимо расширять за счет выделения и освобождения пространства кучи. Горутины внутренне мультиплексированы между несколькими потоками операционной системы. Если горутина блокирует поток операционной системы, например, ожидая ввода, другие горутины в этом потоке переместятся в другие потоки, чтобы продолжить работу, и вам не нужно заботиться об этих деталях.

Следующая программа напечатает"Hello from main goroutine"печатать ли"Hello from another goroutine", в зависимости от того, какая из двух горутин завершится первой.

func main() {

    go fmt.Println("Hello from another goroutine")
    fmt.Println("Hello from main goroutine")

    // 程序执行到这,所有活着的goroutines都会被杀掉

}

goroutine1.go

следующая программа"Hello from main goroutine"и"Hello from another goroutine"Можно печатать в любом порядке. Но одна из возможностей состоит в том, что вторая горутина работает так медленно, что не будет печатать до тех пор, пока программа не завершится.

func main() {
    go fmt.Println("Hello from another goroutine")
    fmt.Println("Hello from main goroutine")

    time.Sleep(time.Second) // 为其他goroutine完成等1秒钟
}

goroutine2.go

Вот более практичный пример, в котором мы определяем функцию, использующую параллелизм для откладывания событий.

// 在指定时间过期后,文本会被打印到标准输出
// 这无论如何都不会被阻塞
func Publish(text string, delay time.Duration) {
    go func() {
        time.Sleep(delay)
        fmt.Println("BREAKING NEWS:", text)
    }() // 注意括号。我们必须调用匿名函数
}

publish1.go

Вы можете позвонить следующим образомPublishфункция

func main() {
    Publish("A goroutine starts a new thread of execution.", 5*time.Second)
    fmt.Println("Let’s hope the news will published before I leave.")

    // 等待消息被发布
    time.Sleep(10 * time.Second)

    fmt.Println("Ten seconds later: I’m leaving now.")
}

publish1.go

Программа, скорее всего, напечатает три строки в следующем порядке с интервалом в пять секунд между каждой строкой вывода.

$ go run publish1.go
Let’s hope the news will published before I leave.
BREAKING NEWS: A goroutine starts a new thread of execution.
Ten seconds later: I’m leaving now.

В общем, мы не можем позволить потокам спать в ожидании друг друга. В следующем разделе мы представим механизм синхронизации в Go,channels. Затем продемонстрируйте, как использовать каналы, чтобы заставить одну горунтину ждать другую горунтину.

2. Channels

Sushi conveyor belt
Sushi conveyor belt

Конвейерная лента для суши

channel— это конструкция языка Go, предоставляющая механизм для синхронного выполнения двух горутин и обмена данными путем передачи значений определенных типов элементов
.<-Идентификатор указывает направление передачи канала, прием или отправка. Если направление не указано. Тогда канал двунаправленный.

chan Sushi      // 能被用于接收和发送 Sushi 类型的值
chan<- float64  // 只能被用于发送 float64 类型的值
<-chan int      // 只能被用于接收 int 类型的值

Каналы — это ссылочный тип, выделенный make

ic := make(chan int)        // 不带缓存的  int channel
wc := make(chan *Work, 10)  // 带缓冲工作的 channel

Для отправки значений через канал используйте

ic <- 3       // 向channel中发送3
work := <-wc  // 从channel中接收指针到work

Если канал не буферизован, отправитель будет блокироваться до тех пор, пока получатель не получит от него значение. Если буферизовано, только когда значение скопировано в буфер и буфер заполнен, отправитель будет заблокирован до тех пор, пока получатель не сможет получить от него. Приемник будет блокироваться до тех пор, пока в канале не появится значение, которое можно получить.

закрытие

closeРоль заключается в том, чтобы гарантировать, что в канал больше не могут быть отправлены значения. После того, как канал закрыт, из него еще можно получать значения. Операция приема получает нулевое значение без блокировки. Операции приема с несколькими значениями дополнительно возвращают логическое значение, указывающее, было ли отправлено значение.

ch := make(chan string)
go func() {
    ch <- "Hello!"
    close(ch)
}()
fmt.Println(<-ch)  // 打印 "Hello!"
fmt.Println(<-ch)  // 不阻塞的打印空值 ""
fmt.Println(<-ch)  // 再一次打印 ""
v, ok := <-ch      // v 的值是 "" , ok 的值是 false

Оператор for с предложением диапазона непрерывно считывает значения, отправленные через канал, пока канал не будет закрыт.

func main() {
    var ch <-chan Sushi = Producer()
    for s := range ch {
        fmt.Println("Consumed", s)
    }
}

func Producer() <-chan Sushi {
    ch := make(chan Sushi)
    go func() {
        ch <- Sushi("海老握り")  // Ebi nigiri
        ch <- Sushi("鮪とろ握り") // Toro nigiri
        close(ch)
    }()
    return ch
}

sushi.go

3. Синхронизация

В следующем примереPublishФункция возвращает канал, который транслирует отправленный текст в виде сообщения.

// 指定时间过期后函数Publish将会打印文本到标准输出.
// 当文本被发布channel将会被关闭.
func Publish(text string, delay time.Duration) (wait <-chan struct{}) {
    ch := make(chan struct{})
    go func() {
        time.Sleep(delay)
        fmt.Println("BREAKING NEWS:", text)
        close(ch) // broadcast – a closed channel sends a zero value forever
    }()
    return ch
}

publish2.go

Обратите внимание, что мы используем канал с пустой структурой:struct{}. Это указывает на то, что канал используется только для сигналов, а не для передачи данных.

Вы можете использовать такую ​​функцию

func main() {
    wait := Publish("Channels let goroutines communicate.", 5*time.Second)
    fmt.Println("Waiting for the news...")
    <-wait
    fmt.Println("The news is out, time to leave.")
}

publish2.go

Программа напечатает следующие три строки информации в указанном порядке. Последняя строка появится сразу после отправки сообщения

$ go run publish2.go
Waiting for the news...
BREAKING NEWS: Channels let goroutines communicate.
The news is out, time to leave.

4. Тупик

traffic jam
traffic jam

давайте представимPublishОшибка в функции.

func Publish(text string, delay time.Duration) (wait <-chan struct{}) {
    ch := make(chan struct{})
    go func() {
        time.Sleep(delay)
        fmt.Println("BREAKING NEWS:", text)
        **//close(ch)**
    }()
    return ch
}

В это время поPublishГорутина, запущенная функцией, выводит важную информацию, а затем завершает работу, оставляя основную горутину продолжать ждать.

func main() {
    wait := Publish("Channels let goroutines communicate.", 5*time.Second)
    fmt.Println("Waiting for the news...")
    **<-wait**
    fmt.Println("The news is out, time to leave.")
}

В некоторых случаях программа не будет работать, такая ситуация называется тупиковой.

deadlockЭто ситуация, когда потоки ждут друг друга и не могут продолжать выполняться.

Во время выполнения Go имеет хорошую поддержку обнаружения взаимоблокировок во время выполнения. Но в некоторых случаях горутина не может продвинуться вперед, тогда программа Go предоставит подробное сообщение об ошибке.Вот журнал нашей аварийной программы:

Waiting for the news...
BREAKING NEWS: Channels let goroutines communicate.
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive]:
main.main()
    .../goroutineStop.go:11 +0xf6

goroutine 2 [syscall]:
created by runtime.main
    .../go/src/pkg/runtime/proc.c:225

goroutine 4 [timer goroutine (idle)]:
created by addtimer
    .../go/src/pkg/runtime/ztime_linux_amd64.c:73

В большинстве случаев легко понять, что вызывает взаимоблокировку в программе Go. Дальше как исправить.

5. Гонка данных

Взаимоблокировки могут показаться плохими, но что действительно означает катастрофу для параллельного программирования, так это гонки данных. Они довольно распространены и их трудно отлаживать.

Одингонка данныхПроисходит, когда два потока одновременно обращаются к одной и той же переменной, и по крайней мере один поток осуществляет запись в одно и то же время.

Гонки данных происходят нерегулярно. Например, напечатайте цифру 1, попытайтесь узнать, как это произошло — одно из возможных объяснений — после кода.

func race() {
    wait := make(chan struct{})
    n := 0
    go func() {
        **n++** // 一次操作:读,增长,写
        close(wait)
    }()
    **n++** // 另一个冲突访问
    <-wait
    fmt.Println(n) // 输出: 不确定
}

datarace.go

две горутины,g1иg2, во время соревнования у нас нет возможности узнать порядок, в котором они выполняются.Нижеследующее является лишь одним из многих возможных результатов.

  • g1отnпрочитать значение из переменной0
  • g2отnпрочитать значение из переменной0
  • g1увеличить его стоимость с0стать1
  • g1поставить его значение1назначить вn
  • g2увеличить его стоимость с0прибыть1
  • g2поставить его значение1назначить вn
  • Эта программа напечатает значение n, его значение равно1

Термин «гонка данных» несколько вводит в заблуждение не только потому, что нельзя установить порядок его выполнения, но и потому, что нет гарантии того, что произойдет дальше. Компиляторы и аппаратное обеспечение часто корректируют порядок кода для повышения производительности. Если вы внимательно посмотрите на работающий поток, вы можете увидеть больше деталей.

mid action
mid action

Единственный способ избежать гонки данных — синхронизировать все общие изменяемые данные между потоками. В Go есть несколько способов, которые в лучшем случае могут использовать каналы или блокировки. Операции нижнего уровня могут использоватьsync and sync/atomicпакет, который здесь больше не обсуждается.

В Go предпочтительным способом обработки параллельного доступа к данным является использование канала, который передает данные от одной горутины к другой. Есть классическая поговорка: «Не передавайте данные, разделяя память, делитесь памятью, передавая данные».

func sharingIsCaring() {
    ch := make(chan int)
    go func() {
        n := 0 // 局部变量只能对当前 goroutine 可见
        n++
        ch <- n // 数据通过 goroutine 传递
    }()
    n := <-ch   // ...从另外一个 goroutine 中安全接受
    n++
    fmt.Println(n) // 输出: 2
}

datarace.go

В этом коде канал играет двоякую роль. Он действует как точка синхронизации для передачи данных между различными горутинами. Горутина-отправитель будет ждать, пока другие горутины получат данные, а горутина-получатель будет ждать, пока другие горутины отправят данные.

Перейти модель памяти- Когда одна горутина читает переменную, а другая горутина записывает ту же самую переменную, процесс на самом деле очень сложен, но пока вы используете каналы для обмена данными между разными горутинами, эта операция безопасна.

6. Мьютекс

lock
lock

Иногда удобнее синхронизировать данные путем прямой блокировки, а не с помощью каналов. Для этой цели в стандартной библиотеке Go предусмотрены мьютексы.sync.Mutex.

Чтобы этот тип блокировки работал правильно, все операции с общими данными (как чтение, так и запись) должны выполняться, пока горутина удерживает блокировку. Это критично, одной ошибки в горутине достаточно, чтобы сломать программу и вызвать гонку данных.

Поэтому вам нужно разработать собственную структуру данных для API и убедиться, что все операции синхронизации выполняются внутри. В этом примере мы создаем безопасную и простую в использовании параллельную структуру данных,AtomicInt, в котором хранится одно целое число, через которое могут безопасно пройти любые горутиныAddиValueспособ доступа к номерам.

// AtomicInt 是一种持有int类型的支持并发的数据结构。
// 它的初始化值为0.
type AtomicInt struct {
    mu sync.Mutex // 同一时间只能有一个 goroutine 持有锁。
    n  int
}

// Add adds n to the AtomicInt as a single atomic operation.
// 原子性的将n增加到AtomicInt中
func (a *AtomicInt) Add(n int) {
    a.mu.Lock() // 等待锁被释放然后获取。
    a.n += n
    a.mu.Unlock() // 释放锁。
}

// 返回a的值.
func (a *AtomicInt) Value() int {
    a.mu.Lock()
    n := a.n
    a.mu.Unlock()
    return n
}

func lockItUp() {
    wait := make(chan struct{})
    var n AtomicInt
    go func() {
        n.Add(1) // one access
        close(wait)
    }()
    n.Add(1) // 另一个并发访问
    <-wait
    fmt.Println(n.Value()) // Output: 2
}

datarace.go

7. Обнаружение гонки данных

Конкуренцию иногда трудно обнаружить. Когда я запускаю эту программу с гонкой данных, она печатает55555. Попробуйте еще раз, может получиться другой результат.sync.WaitGroupявляется частью стандартной библиотеки go; он ожидает завершения выполнения последовательности горутин.

func race() {
    var wg sync.WaitGroup
    wg.Add(5)
    for i := 0; i < 5; **i++** {
        go func() {
            **fmt.Print(i)** // 局部变量i被6个goroutine共享
            wg.Done()
        }()
    }
    wg.Wait() // 等待5个goroutine执行结束
    fmt.Println()
}

raceClosure.go

для вывода55555Более разумное объяснение состоит в том, чтобы реализоватьi++Операционная горутина выполняется 5 раз, прежде чем печатаются другие горутины. По сути, обновленныйiВидимость для других горутин случайна.

Очень простое решение — запустить другую горутину, используя в качестве аргумента локальную переменную.

func correct() {
    var wg sync.WaitGroup
    wg.Add(5)
    for i := 0; i < 5; i++ {
        go func(n int) { // 局部变量。
            fmt.Print(n)
            wg.Done()
        }(i)
    }
    wg.Wait()
    fmt.Println()
}

raceClosure.go

Этот код правильный, он печатает желаемый результат,24031. Напомним, что среди разных горутин порядок выполнения программ не соответствует порядку.

Мы по-прежнему можем использовать замыкания, чтобы избежать гонок данных. Но мы должны быть осторожны с тем, что нам нужны разные переменные в каждой горутине.

func alsoCorrect() {
    var wg sync.WaitGroup
    wg.Add(5)
    for i := 0; i < 5; i++ {
        n := i // 为每个闭包创建单独的变量
        go func() {
            fmt.Print(n)
            wg.Done()
        }()
    }
    wg.Wait()
    fmt.Println()
}

raceClosure.go

7. Автоматическое определение конкуренции

В общем, мы не можем автоматически найти все гонки данных. Но Go (начиная с версии 1.1) предоставляет мощный детектор гонки данных.data race detector.

Этот инструмент очень прост в использовании: просто добавьте-raceприбытьgoпосле команды. Запуск вышеуказанной программы автоматически проверит и распечатает приведенный ниже вывод.

$ go run -race raceClosure.go 
Data race:
==================
WARNING: DATA RACE
Read at 0x00c420074168 by goroutine 6:
  main.race.func1()
      ../raceClosure.go:22 +0x3f

Previous write at 0x00c420074168 by main goroutine:
  main.race()
      ../raceClosure.go:20 +0x1bd
  main.main()
      ../raceClosure.go:10 +0x2f

Goroutine 6 (running) created at:
  main.race()
      ../raceClosure.go:24 +0x193
  main.main()
      ../raceClosure.go:10 +0x2f
==================
12355
Correct:
01234
Also correct:
01234
Found 1 data race(s)
exit status 66

Инструмент обнаружил, что в строке 20 программы была гонка данных, одна горутина записывала значение в переменную, а другая горутина асинхронно читала значение этой переменной в строке 22.

Обратите внимание, что этот инструмент может обнаруживать только гонки данных, которые происходят во время фактического выполнения.

8. Выберите Заявление

В параллельном программировании Go последнийselectутверждение. Он выбирает, что может быть выполнено в серии коммуникативных операций. Если какая-либо из коммуникационных операций является исполняемой, она будет выбрана случайным образом, и соответствующий оператор будет выполнен. В противном случае, если оператор выполнения по умолчанию отсутствует, он будет заблокирован до тех пор, пока не будет выполнена какая-либо из операций связи.

Вот пример, показывающий, как использовать select для генерации случайных чисел.

// RandomBits 返回产生随机位数的channel
func RandomBits() <-chan int {
    ch := make(chan int)
    go func() {
        for {
            select {
            case ch <- 0: // 没有相关操作语句
            case ch <- 1:
            }
        }
    }()
    return ch
}

randBits.go

Еще проще, здесь select используется для установки времени ожидания. Этот код может печатать только новости или сообщения об истечении времени ожидания, в зависимости от того, какой из двух операторов приема может быть выполнен.

select {
case news := <-NewsAgency:
    fmt.Println(news)
case <-time.After(time.Minute):
    fmt.Println("Time out: no news in one minute.")
}

time.Afterявляется частью стандартной библиотеки go; он ждет, пока истечет определенное время, а затем отправляет текущее время на возвращаемый канал.

9. Самый простой пример параллелизма

couples
couples

Потратьте время, чтобы внимательно понять этот пример. Когда вы полностью поймете это, вы полностью поймете, как параллелизм работает внутри Go.

Программа демонстрирует один канал, отправляющий и получающий данные от нескольких горутин одновременно. Он также показывает, как оператор select может выбирать одну из нескольких операций связи для выполнения.

func main() {
    people := []string{"Anna", "Bob", "Cody", "Dave", "Eva"}
    match := make(chan string, 1) // 给未匹配的元素预留空间
    wg := new(sync.WaitGroup)
    for _, name := range people {
        wg.Add(1)
        go Seek(name, match, wg)
    }
    wg.Wait()
    select {
    case name := <-match:
        fmt.Printf("No one received %s’s message.\n", name)
    default:
        // 没有待处理的发送操作.
    }
}

// 寻求发送或接收匹配上名称名称的通道,并在完成后通知等待组.
func Seek(name string, match chan string, wg *sync.WaitGroup) {
    select {
    case peer := <-match:
        fmt.Printf("%s received a message from %s.\n", name, peer)
    case match <- name:
        // 等待其他人接受消息.
    }
    wg.Done()
}

matching.go

Пример вывода:

$ go run matching.go
Anna received a message from Eva.
Cody received a message from Bob.
No one received Dave’s message.

10. Параллельные вычисления

CPUs
CPUs

Приложение с функциями параллелизма делит большой расчет на небольшие вычислительные блоки, каждый из которых будет работать независимо.

Распределенные вычисления на нескольких процессорах — это не просто наука, а искусство.

  • Время выполнения каждого вычислительного блока составляет от 100 мкс до 1 мс.Если эти блоки слишком малы, могут возрасти проблемы распределения и накладные расходы на управление подмодулями. Если эти единицы слишком велики, вся вычислительная система может быть заблокирована небольшой трудоемкой операцией. На скорость вычислений могут влиять многие факторы, такие как планирование, программный терминал, расположение памяти (обратите внимание, что количество рабочих единиц не имеет ничего общего с количеством процессоров).

  • Минимизируйте объем обмена данными. Одновременная запись очень дорогая, особенно когда несколько горутин выполняются на разных процессорах. Влияние операций чтения общих данных на производительность невелико.

  • Правильная организация данных является эффективным способом. Если данные хранятся в кеше, скорость загрузки и хранения данных будет значительно увеличена. Опять же, это очень важно для операций записи.

В следующем примере показано, как распределить несколько трудоемких вычислений между несколькими доступными процессорами. Это код, который мы хотим оптимизировать.

type Vector []float64

// Convolve computes w = u * v, where w[k] = Σ u[i]*v[j], i + j = k.
// Precondition: len(u) > 0, len(v) > 0.
func Convolve(u, v Vector) Vector {
    n := len(u) + len(v) - 1
    w := make(Vector, n)

    for k := 0; k < n; k++ {
        w[k] = mul(u, v, k)
    }
    return w
}

// mul returns Σ u[i]*v[j], i + j = k.
func mul(u, v Vector, k int) float64 {
    var res float64
    n := min(k+1, len(u))
    j := min(k, len(v)-1)
    for i := k - j; i < n; i, j = i+1, j-1 {
        res += u[i] * v[j]
    }
    return res
}

Идея проста: определить единицы работы подходящего размера, а затем запустить каждую единицу работы в отдельной горутине.Convolveпараллельная версия .

func Convolve(u, v Vector) Vector {
    n := len(u) + len(v) - 1
    w := make(Vector, n)

    // 将w划分为多个将会计算100us-1ms时间计算的工作单元
    size := max(1, 1000000/n)

    var wg sync.WaitGroup
    for i, j := 0, size; i < n; i, j = j, j+size {
        if j > n {
            j = n
        }
        // goroutines只为读共享内存.
        wg.Add(1)
        go func(i, j int) {
            for k := i; k < j; k++ {
                w[k] = mul(u, v, k)
            }
            wg.Done()
        }(i, j)
    }
    wg.Wait()
    return w
}

convolution.go

Когда единица вычислений определена, обычно лучше оставить планирование для выполнения программы и операционной системы. Однако в версиях Go1.* вам необходимо указать количество горутин.

func init() {
    numcpu := runtime.NumCPU()
    runtime.GOMAXPROCS(numcpu) // 尽量使用所有可用的 CPU
}

Stefan Nilsson


Программа перевода самородковэто сообщество, которое переводит высококачественные технические статьи из Интернета сНаггетсДелитесь статьями на английском языке на . Охват контентаAndroid,iOS,React,внешний интерфейс,задняя часть,продукт,дизайни другие поля, если вы хотите видеть больше качественных переводов, пожалуйста, продолжайте обращать вниманиеПрограмма перевода самородков,официальный Вейбо,Знай колонку.