Как построить параллельные конвейеры в Go

Go

эта статьяНачалось вмой блогЕсли вы найдете его полезным, приветствуйте большие пальцы на больших пальцах, чтобы увидеть больше друзей.

Автор: Самир Аджмани | Оригинал:blog.golang.org/pipelines

Предисловие переводчика

Эта статья взята с официального сайта Go, который достоин называться официальным блогом, и написана очень подробно. Прежде чем мы приступим к переводу этой статьи, позвольте мне сделать два коротких замечания.

Во-первых, я уже переводил эту статью, но когда я перечитал ее недавно, то обнаружил, что предыдущий перевод был очень плохим. Поэтому было решено заново перевести эту статью, вообще не ссылаясь на предыдущий перевод.

Во-вторых, в статье есть некоторые имена собственные, а план все же выражен на английском для обеспечения оригинального колорита, например, pipe (конвейер), stage (стадия), goroutine (сопрограмма), channel (канал).

Что касается отношений между ними, я просто нарисовал набросок в соответствии со своим пониманием, надеясь помочь лучше понять отношения между ними. следующее:

Чтобы подчеркнуть один момент, если вы чувствуете себя сбитым с толку при чтении этой статьи, рекомендуется еще раз просмотреть эту картинку.

Переведенная часть тела выглядит следующим образом.


Примитивы параллелизма Go упрощают создание конвейеров потоковых данных, которые могут эффективно использовать ввод-вывод и многоядерные процессоры. Эта статья будет основана на этом введении. В этом процессе мы столкнемся с некоторыми нештатными ситуациями, методы обработки которых также будут подробно описаны в этой статье.

Что такое трубопровод

В Go нет четкого определения того, что такое конвейер, это всего лишь один из многих стилей параллельного программирования. Неформально мы понимаем, что он состоит из ряда этапов, соединенных каналом, и каждый этап состоит из группы горутин, выполняющих одну и ту же функцию. Горутины каждого этапа обычно выполняют некоторые из следующих действий:

  • принимают данные из восходящего входного канала;
  • выполнить некоторую обработку полученных данных (обычно) и создать новые данные;
  • Выходные данные отправляются в нисходящий поток через канал;

За исключением первого и последнего каскада, каждый каскад содержит определенное количество входных и выходных каналов. Первый этап имеет только выход, который обычно называют «производителем», а последний этап имеет только вход, который обычно называют «потребителем».

Давайте сначала рассмотрим очень простой пример, чтобы объяснить концепции и технологии, связанные с конвейером, упомянутым выше. Поняв это, давайте посмотрим на другие более практические примеры.

Вычислить квадрат

Трубопровод, состоящий из трех стадий.

Первый этап, функция gen. Он отвечает за отправку серии целых чисел из параметра в указанный канал. Он запускает горутину для отправки данных, и когда все данные будут отправлены, канал будет закрыт.

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

Второй этап, функция sq. Он отвечает за получение данных из входного канала и будет возвращать новый канал, выходной канал, который отвечает за передачу данных в квадрате вниз по течению. Когда входной канал закрыт и все данные отправлены вниз по течению, выходной канал может быть закрыт.

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

Основная функция отвечает за создание пайплайна и выполнение задач последнего этапа. Он будет получать данные со второго этапа и печатать их до закрытия канала.

func main() {
    // Set up the pipeline.
    c := gen(2, 3)
    out := sq(c)

    // Consume the output.
    fmt.Println(<-out) // 4
    fmt.Println(<-out) // 9
}

Поскольку типы входных и выходных каналов sq одинаковы, мы можем объединить их, чтобы сформировать несколько этапов. Например, мы могли бы переписать функцию main следующим образом:

func main() {
    // Set up the pipeline and consume the output.
    for n := range sq(sq(gen(2, 3))) {
        fmt.Println(n) // 16 then 81
    }
}

Разветвление и разветвление

Когда несколько функций считывают данные из канала до тех пор, пока канал не будет закрыт, это называется разветвлением. Используя его, мы можем реализовать распределенный способ работы и добиться параллельного ЦП и ввода-вывода через группу рабочих.

Когда функция считывает данные из нескольких каналов до тех пор, пока все каналы не будут закрыты, это называется разветвлением. Разветвление достигается путем слияния данных из нескольких входных каналов в один и тот же выходной канал, и когда все входные каналы закрыты, выходной канал также будет закрыт.

Давайте изменим конвейер в приведенном выше примере и попробуем запустить на нем две функции sq. Они оба будут считываться из одного и того же входного канала. Мы ввели новую функцию, merge, которая отвечает за результат обработки fan-in, то есть результат обработки слияния двух кв.

func main() {
    in := gen(2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    // 分布式处理来自 in channel 的数据
    c1 := sq(in)
    c2 := sq(in)

    // Consume the merged output from c1 and c2.
    // 从 channel c1 和 c2 的合并后的 channel 中接收数据
    for n := range merge(c1, c2) {
        fmt.Println(n) // 4 then 9, or 9 then 4
    }
}

Функция слияния отвечает за объединение данных, полученных из ряда входных каналов, в один канал. Он запускает горутину для каждого входного канала и отправляет полученные в них значения в уникальный выходной канал. После запуска всех горутин будет запущена еще одна горутина, и ее роль заключается в закрытии единственного выходного канала, когда закрыты все входные каналы.

Отправка данных на закрытый канал будет паниковать, поэтому важно убедиться, что все данные отправляются перед закрытием канала. Sync.Waitgroup предоставляет очень простой способ выполнить такой синхронизацию.

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c is closed, then calls wg.Done.
    // 为每个输入 channel 启动一个 goroutine
    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // Start a goroutine to close out once all the output goroutines are
    // done.  This must start after the wg.Add call.
    // 启动一个 goroutine 负责在所有的输入 channel 关闭后,关闭这个唯一的输出 channel
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

Остановиться на полпути

Функции в конвейере содержат шаблон:

  • При отправке данных каждый этап должен закрыть свой входной канал;
  • Пока входной канал не закрыт, каждый этап будет продолжать получать от него данные;

Мы можем написать цикл диапазона, чтобы гарантировать, что все горутины завершатся, когда все данные будут отправлены вниз по течению.

Но в реальном сценарии невозможно, чтобы каждый этап получил все данные в канале. Иногда наш дизайн таков, что получателю нужно получить только подмножество данных. Чаще всего, если в канале есть ошибка на восходящем этапе, текущий этап должен завершиться досрочно. В любом случае приемник не должен продолжать ждать приема оставшихся данных в канале, а восходящий поток должен в это время прекратить выдачу данных, ведь нисходящему они больше не нужны.

В нашем примере, даже если этап не использует все данные успешно, вышестоящий этап все равно попытается отправить данные нижестоящему, что приведет к блокировке программы навсегда.

    // Consume the first value from the output.
    // 从 output 中接收了第一个数据
    out := merge(c1, c2)
    fmt.Println(<-out) // 4 or 9
    return
    // Since we didn't receive the second value from out,
    // one of the output goroutines is hung attempting to send it.
    // 我们并没有从 out channel 中接收第二个数据,
    // 所以上游的其中一个 goroutine 在尝试向下游发送数据时将会被挂起。
}

Это утечка ресурсов, горутина — это потребность потреблять память и ресурсы времени выполнения, справочная информация о куче стека горутины — это не gc.

Нам нужно предоставить способ для успешного завершения восходящего потока, даже когда возникает исключение, когда нисходящий поток получает данные от восходящего потока. Один из способов — изменить канал на буферизованный, чтобы он мог передавать указанный объем данных, и, если в буферном канале есть место, передача данных будет завершена немедленно.

// 缓冲大小 2 buffer size 2 
c := make(chan int, 2)
// 发送立刻成功 succeeds immediately 
c <- 1
// 发送立刻成功 succeeds immediately
c <- 2 
//blocks until another goroutine does <-c and receives 1
// 阻塞,直到另一个 goroutine 从 c 中接收数据
c <- 3

Предыдущий код можно упростить, если мы уже знаем объем данных, которые будут отправлены при создании канала. Например, переписывание функции gen для отправки всех данных в буферный канал также позволяет избежать создания новых горутин.

func gen(nums ...int) <-chan int {
    out := make(chan int, len(nums))
    for _, n := range nums {
        out <- n
    }
    close(out)
    return out
}

Примечание переводчика: После закрытия канала нельзя больше записывать данные, иначе будет паника, но отправленные данные все еще можно прочитать, а значение 0 всегда можно прочитать.

Перейдите к нисходящему этапу, и он вернется к заблокированной горутине.Мы также можем рассмотреть возможность добавления некоторых буферов в выходной канал слияния.

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup

    // enough space for the unread inputs
    // 给未读的输入 channel 预留足够的空间
    out := make(chan int, 1)    
    // ... the rest is unchanged ...

Хотя с помощью этого метода мы можем решить проблему блокировки горутин, это не очень хороший дизайн. Например, размер буфера при слиянии равен 1, исходя из того, что мы уже знаем о размере следующих полученных данных и объеме, который нисходящий поток сможет потреблять. Очевидно, что этот дизайн очень хрупок: если восходящий поток отправит еще немного данных или нисходящий поток не получит столько данных, горутина снова будет заблокирована.

Поэтому, когда нисходящий поток больше не готов принимать данные восходящего потока, должен быть способ уведомить восходящий поток.

явная отмена

Если основная функция завершает работу, не получив всех данных, она должна уведомить восходящий поток о прекращении отправки данных. Как это сделать? Мы можем ввести новый канал между восходящим и нисходящим потоками, обычно называемый done.

В примере есть две горутины, которые могут блокироваться, поэтому done нужно отправить два значения, чтобы уведомить их.

func main() {
    in := gen(2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(in)
    c2 := sq(in)

    // Consume the first value from output.
    done := make(chan struct{}, 2)
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // Tell the remaining senders we're leaving.
    // 通知发送方,我们已经停止接收数据了
    done <- struct{}{}
    done <- struct{}{}
}

Слияние отправителя заменяет предыдущую операцию отправки оператором select, который отвечает за отправку данных через выходной канал или получение данных от done. Значение, полученное done, не имеет смысла, оно просто означает, что out должен прекратить отправку данных, просто используйте пустую структуру. Функция вывода будет продолжать зацикливаться, потому что восходящий поток, sq , не блокируется. Позже мы обсудим, как выйти из этого круга.

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c is closed or it receives a value
    // from done, then output calls wg.Done.
    output := func(c <-chan int) {
        for n := range c {
            select {
            case out <- n:
            case <-done:
            }
        }
        wg.Done()
    }
    // ... the rest is unchanged ...

У этого метода есть проблема: только когда нисходящий поток знает количество горутин, которые может заблокировать восходящий поток, он может послать сигнал готовности каждой горутине, чтобы убедиться, что все они могут успешно завершиться. Но поддержание дополнительного счетчика раздражает и чревато ошибками.

Нам нужен способ сообщить всем вышестоящим горутинам, чтобы они прекратили отправлять сообщения вниз по течению. В Go этого действительно можно добиться, закрыв канал, потому что получение данных по закрытому каналу вернет сразу с нулевым значением.

Это означает, что main может разблокировать всех отправителей, просто закрыв канал Done. Операция выключения эквивалентна широковещательному сигналу. Чтобы гарантировать успешные вызовы на любом обратном пути, мы можем закрыть done с помощью инструкции defer.

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c or done is closed, then calls
    // wg.Done.
    // 为每个输入 channel 启动一个 goroutine,将输入 channel 中的数据拷贝到
    // out channel 中,直到输入 channel,即 c,或 done 关闭。
    // 接着,退出循环并执行 wg.Done()
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            select {
            case out <- n:
            case <-done:
                return
            }
        }
    }
    // ... the rest is unchanged ...

Точно так же, как только done будет закрыто, sq также выйдет. sq также использует оператор defer, чтобы убедиться, что его выходной канал, а именно out, должен быть успешно закрыт и освобожден.

func sq(done <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n:
            case <-done:
                return
            }
        }
    }()
    return out
}

Здесь было представлено, как построить конвейер в Go.

Кратко о том, как правильно построить пайплайн.

  • Когда все отправки завершены, каскад должен закрыть выходной канал;
  • Этап должен продолжать получать данные из входного канала, если только канал не закрыт или отправитель активно не уведомлен о прекращении отправки.

В Pipeline есть несколько способов разблокировать отправителя: во-первых, отправитель создает канал с достаточным пространством для отправки данных, а во-вторых, когда получатель перестает получать данные, он четко информирует отправителя.

абстрактное дерево

Настоящий случай.

MD5, алгоритм дайджеста сообщения, может использоваться для вычисления контрольной суммы файла. Вывод ниже представляет собой сводную информацию о файле из инструмента командной строки md5sum.

$ md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

Наш пример похож на md5sum, за исключением того, что аргумент, передаваемый этой программе, является каталогом. Результатом работы программы является сводное значение для каждого файла в каталоге, отсортированное по имени файла.

$ go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

Основная функция на первом этапе вызывает MD5All, которая возвращает карту с именем файла в качестве ключа и значением дайджеста в качестве значения, а затем сортирует и печатает возвращенные результаты.

func main() {
    // Calculate the MD5 sum of all files under the specified directory,
    // then print the results sorted by path name.
    m, err := MD5All(os.Args[1])
    if err != nil {
        fmt.Println(err)
        return
    }
    var paths []string
    for path := range m {
        paths = append(paths, path)
    }
    sort.Strings(paths)
    for _, path := range paths {
        fmt.Printf("%x  %s\n", m[path], path)
    }
}

Функция MD5All будет в центре нашего следующего обсуждения.Серийная версияНет параллельной реализации, только данные, считанные из файла пересчета.

// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents.  If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(root string) (map[string][md5.Size]byte, error) {
    m := make(map[string][md5.Size]byte)
    err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
        if err != nil {
            return err
        }
        if !info.Mode().IsRegular() {
            return nil
        }
        data, err := ioutil.ReadFile(path)
        if err != nil {
            return err
        }
        m[path] = md5.Sum(data)
        return nil
    })
    if err != nil {
        return nil, err
    }
    return m, nil
}

Параллельные вычисления

существуетПараллельная версия, мы разделим вычисление MD5All на конвейер с двумя этапами. Первый этап, sumFiles, отвечает за обход каталога и вычисление итогового значения файла.Вычисление сводки запустит выполнение горутины, а результат вычисления будет отправлен через канал типа result.

type result struct {
    path string
    sum  [md5.Size]byte
    err  error
}

sumFiles возвращает 2 канала, один для получения результата вычисления и один для получения ошибочного возврата filepath.Walk. walk запустит горутину для каждого файла для выполнения сводных вычислений и проверки выполнения. Если done выключено, прогулка немедленно остановится.

func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
    // For each regular file, start a goroutine that sums the file and sends
    // the result on c.  Send the result of the walk on errc.
    c := make(chan result)
    errc := make(chan error, 1)
    go func() {
        var wg sync.WaitGroup
        err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            wg.Add(1)
            go func() {
                data, err := ioutil.ReadFile(path)
                select {
                case c <- result{path, md5.Sum(data), err}:
                case <-done:
                }
                wg.Done()
            }()
            // Abort the walk if done is closed.
            select {
            case <-done:
                return errors.New("walk canceled")
            default:
                return nil
            }
        })
        // Walk has returned, so all calls to wg.Add are done.  Start a
        // goroutine to close c once all the sends are done.
        go func() {
            wg.Wait()
            close(c)
        }()
        // No select needed here, since errc is buffered.
        // 不需要使用 select,因为 errc 是带有 buffer 的 channel
        errc <- err
    }()
    return c, errc
}

MD5All получит результат расчета из канала c и закроется по отложенному, если произойдет ошибка.

func MD5All(root string) (map[string][md5.Size]byte, error) {
    // MD5All closes the done channel when it returns; it may do so before
    // receiving all the values from c and errc.
    done := make(chan struct{})
    defer close(done)

    c, errc := sumFiles(done, root)

    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}

Ограничение параллелизма

существуетПараллельная версия, MD5All запускает горутину для каждого файла. Но если в каталоге слишком много файлов, это может привести к тому, что выделенная память станет настолько большой, что превысит лимит текущей машины.

Мы можем ограничить выделение памяти, ограничив количество файлов, читаемых параллельно. существуетПараллельная ограниченная версия, мы создаем фиксированное количество горутин для чтения файла. Наш конвейер теперь включает 3 этапа: обход каталога, чтение файла и вычисление дайджеста, а также сбор результатов.

Первый этап, пересекает каталоги и испускают файлы по каналу путей.

func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
    paths := make(chan string)
    errc := make(chan error, 1)
    go func() {
        // Close the paths channel after Walk returns.
        defer close(paths)
        // No select needed for this send, since errc is buffered.
        errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            select {
            case paths <- path:
            case <-done:
                return errors.New("walk canceled")
            }
            return nil
        })
    }()
    return paths, errc
}

На втором этапе запускается фиксированное количество горутин, считывается имя файла из канала Paths, и результаты обработки отправляются в канал C.

func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
    for path := range paths {
        data, err := ioutil.ReadFile(path)
        select {
        case c <- result{path, md5.Sum(data), err}:
        case <-done:
            return
        }
    }
}

В отличие от предыдущего примера, дайджестер не закроет канал c, потому что этот канал используют несколько горутин, и результаты вычислений будут отправляться на этот канал.

Соответственно, MD5All будет отвечать за закрытие канала c после завершения всех дайджестов.

    // Start a fixed number of goroutines to read and digest files.
    c := make(chan result)
    var wg sync.WaitGroup
    const numDigesters = 20
    wg.Add(numDigesters)
    for i := 0; i < numDigesters; i++ {
        go func() {
            digester(done, paths, c)
            wg.Done()
        }()
    }
    go func() {
        wg.Wait()
        close(c)
    }()

Мы также могли бы создать отдельный канал для каждого дайджеста и передавать результаты по его собственному каналу. Но таким образом, мы должны запустить новую горутину для слияния результатов.

Последний этап отвечает за получение результата обработки от c и проверку наличия ошибки через errc. Эту проверку нельзя выполнить заблаговременно, потому что это заблокирует передачу данных walkFile вниз по течению.

    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    // Check whether the Walk failed.
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}

Суммировать

В этом посте описывается, как правильно создавать потоковые конвейеры данных в Go. Его обработка исключений очень сложна, каждый этап в конвейере может вызвать блокировку восходящего потока, а нижестоящий может больше не заботиться о следующих данных. Закрытие канала отправляет сигнал готовности всем работающим горутинам, что помогает нам успешно разблокировать. Как правильно построить конвейер потоковой передачи данных, в статье также приведены некоторые рекомендации.