Перейти в режим параллелизма

Go

Базовые концепты

Понимание параллелизма и параллелизма

Параллелизм: акцент делается на выполнении нескольких действий в течение определенного периода времени.

Параллелизм: акцент делается на одновременном выполнении нескольких действий.

CSP против модели актера

Actor

Актерская модель — это общая модель параллельного программирования, которую можно использовать практически на любом языке программирования, обычно на Erlang. Несколько субъектов (процессов) могут работать одновременно, не обмениваться состоянием и обмениваться данными, асинхронно отправляя сообщения в связанные с процессом очереди сообщений (также известные как почтовые ящики).

Взаимодействие процессов субъекта-1 и субъекта-2 зависит от очереди сообщений, а очередь сообщений и процесс связаны и связаны друг с другом. После того, как актор-1 отправил сообщение, он может продолжать выполнять другие задачи без обработки сообщения актор-2, что означает, что связь между процессами актора является асинхронной.

преимущество
  • Обмен сообщениями и инкапсуляция, несколько акторов могут работать одновременно без совместного использования состояния, а события внутри одного актора выполняются последовательно (благодаря очередям)
  • Модель акторов поддерживает как модель разделяемой памяти, так и модель распределенной памяти.
недостаток
  • Хотя акторную модель легче отлаживать, чем программы, использующие модель потоков и блокировок, существуют также проблемы взаимоблокировок и необходимость беспокоиться о переполнении очередей в связанных процессах.
  • Прямой поддержки параллелизма нет, и параллельные решения необходимо создавать с помощью параллельной технологии.
CSP

CSP означает обмен последовательными процессами.Подобно модели Актера, эта модель также состоит из независимых, одновременно выполняющихся сущностей, которые взаимодействуют посредством отправки сообщений. модель csp в ходуchannelзаgoroutineявляется анонимным и не требуетgidсвязать, черезchannelЗаканчиватьgoroutineСвязь между. (КАНАЛ в концепции CSP представляет канал, обсуждается только переход, Канал эквивалентен Каналу в ГО)

преимущество
  • Самым большим преимуществом CSP перед Актерами является гибкость. Модель актора, носитель, отвечающий за коммуникацию, и исполнительный блок связаны между собой. В ЦСП,channelПервоклассный объект, который можно независимо создавать, записывать, оставлять в покое или передавать между исполнительными модулями.
недостаток
  • Модель CSP также подвержена взаимоблокировкам и не обеспечивает прямой поддержки параллелизма. Параллелизм должен быть построен на параллелизме, что вносит неопределенность.
разница
  • Модель акторов фокусируется на объектах, участвующих в обмене (т.е. процессах), а CSP фокусируется на каналах связи, таких как Gochannel
  • Модель CSP фокусируется не на процессе отправки сообщения, а на процессе, используемом для отправки сообщения.channelchannelПроцессы не так тесно связаны с очередями, как модель Актера. Вместо этого их можно создавать, читать и записывать по отдельности и передавать между процессами (горутинами).
Модель параллелизма в GO

Go использует идею SCP, а канал — это рекомендуемый метод коммуникации Go в параллельном программировании.Роб Пайк, разработчик Go, имеет классическую поговорку:

Do not communicate by sharing memory; instead, share memory by communicating.

Это предложение говорит о том, что «не используйте связь с общей памятью, но следует использовать связь для получения общей памяти». Язык Go рекомендует использовать связь для синхронизации сообщений между процессами. Это дает три преимущества, вытекающие изdravenessСообщение блога.

  1. Прежде всего, использование отправки сообщений для синхронизации информации является абстракцией более высокого уровня, чем непосредственное использование разделяемой памяти и блокировок мьютексов.Использование абстракций более высокого уровня может обеспечить лучшую инкапсуляцию при разработке программы и сделать логику программы более эффективной. ;
  2. Во-вторых, отправка сообщений также имеет определенные преимущества по сравнению с общей памятью с точки зрения развязки: мы можем разделить обязанности потоков на производителей и потребителей и развязать их через передачу сообщений, не полагаясь на разделяемую память;
  3. Наконец, язык Go выбирает способ отправки сообщений, гарантируя, что только один активный поток может получить доступ к данным в одно и то же время, что естественно позволяет избежать проблем конкуренции потоков и конфликтов данных по дизайну;

Шаблоны параллельного проектирования

Модель параллелизма, используемая в Go, была представлена ​​выше и ниже этой модели параллелизма.channelявляется важной концепцией, и дизайн каждого из следующих шаблонов основан наchannel, надо выяснить.

Барьерный режим

Барьер Барьерный режим, как следует из названия, представляет собой барьер, который блокируется до тех пор, пока все горутины не вернут результаты агрегирования. можно использоватьchannelреализовать.

сцены, которые будут использоваться
  • Несколько сетевых запросов одновременно, агрегированные результаты
  • Грубые задачи разделяются и выполняются одновременно, а результаты агрегируются.

Код
/*
* Barrier
 */
type barrierResp struct {
    Err error
    Resp string
    Status int
}

// 构造请求
func makeRequest(out chan<- barrierResp, url string)  {
    res := barrierResp{}

    client := http.Client{
        Timeout: time.Duration(2*time.Microsecond),
    }

    resp, err := client.Get(url)
    if resp != nil {
        res.Status = resp.StatusCode
    }
    if err != nil {
        res.Err = err
        out <- res
        return
    }

    byt, err := ioutil.ReadAll(resp.Body)
    defer resp.Body.Close()
    if err != nil {
        res.Err = err
        out <- res
        return
    }

    res.Resp = string(byt)
    out <- res
}

// 合并结果
func barrier(endpoints ...string) {
    requestNumber := len(endpoints)

    in := make(chan barrierResp, requestNumber)
    response := make([]barrierResp, requestNumber)

    defer close(in)

    for _, endpoints := range endpoints {
        go makeRequest(in, endpoints)
    }

    var hasError bool
    for i := 0; i < requestNumber; i++ {
        resp := <-in
        if resp.Err != nil {
            fmt.Println("ERROR: ", resp.Err, resp.Status)
            hasError = true
        }
        response[i] = resp
    }
    if !hasError {
        for _, resp := range response {
            fmt.Println(resp.Status)
        }
    }
}

func main() {
    barrier([]string{"https://www.baidu.com", "http://www.sina.com", "https://segmentfault.com/"}...)
}
Tips

Также можно использовать барьерный режим.errgroupРасширьте библиотеку для достижения, это более просто и понятно. Этот пакет чем-то похож наsync.WaitGroup, но разница в том, что при возникновении ошибки в одной из задач эта ошибка может быть возвращена. И это также удовлетворяет потребности нашего шаблона барьера.

func barrier(endpoints ...string) {
    var g errgroup.Group
    var mu sync.Mutex
  
    response := make([]barrierResp, len(endpoints))

    for i, endpoint := range endpoints {
        i, endpoint := i, endpoint // create locals for closure below
        g.Go(func() error {
            res := barrierResp{}
            resp, err := http.Get(endpoint)
            if err != nil {
                return err
            }

            byt, err := ioutil.ReadAll(resp.Body)
            defer resp.Body.Close()
            if err != nil {
                return err
            }

            res.Resp = string(byt)
            mu.Lock()
            response[i] = res
            mu.Unlock()
            return err
        })
    }
    if err := g.Wait(); err != nil {
       fmt.Println(err)
    }
    for _, resp := range response {
        fmt.Println(resp.Status)
    }
}

Будущий образец

будущее есть будущее, режим из будущего (собачья ручная голова). Этот шаблон обычно используется в асинхронной обработке, также известной как шаблон Promise, с использованиемfire-and-forgetЭтот метод означает, что основная горутина возвращается напрямую, не дожидаясь завершения выполнения подпрограммы, а затем ждет завершения будущего выполнения, чтобы получить результат. Реализация этого шаблона в Go проста благодаря горутинам.

сцены, которые будут использоваться
  • асинхронный

Код
/*
* Future
 */
type Function func(string) (string, error)

type Future interface {
    SuccessCallback() error
    FailCallback()    error
    Execute(Function) (bool, chan struct{})
}

type AccountCache struct {
    Name string
}

func (a *AccountCache) SuccessCallback() error {
    fmt.Println("It's success~")
    return nil
}

func (a *AccountCache) FailCallback() error {
    fmt.Println("It's fail~")
    return nil
}

func (a *AccountCache) Execute(f Function) (bool, chan struct{}){
    done := make(chan struct{})
    go func(a *AccountCache) {
        _, err := f(a.Name)
        if err != nil {
            _ = a.FailCallback()
        } else {
            _ = a.SuccessCallback()
        }
        done <- struct{}{}
    }(a)
    return true, done
}

func NewAccountCache(name string) *AccountCache {
    return &AccountCache{
        name,
    }
}

func testFuture() {
    var future Future
    future = NewAccountCache("Tom")
    updateFunc := func(name string) (string, error){
        fmt.Println("cache update:", name)
        return name, nil
    }
    _, done := future.Execute(updateFunc)
    defer func() {
        <-done
    }()
}

func main() {
    var future Future
    future = NewAccountCache("Tom")
    updateFunc := func(name string) (string, error){
        fmt.Println("cache update:", name)
        return name, nil
    }
    _, done := future.Execute(updateFunc)
    defer func() {
        <-done
    }()
    // do something
}

Вот хитрость: зачем использоватьstructвведите какchannelанонс?

Многие программы с открытым исходным кодом используют этот метод в качестве сигнального механизма, главным образом потому, что пустойstructЭто занимает наименьшее количество памяти в Go.

Конвейерный режим

сцены, которые будут использоваться
  • Вы можете воспользоваться преимуществом многоядерности, чтобы разбить часть грубой логики на несколько горутин для выполнения.

Сам конвейер переводится как средний конвейер.Обратите внимание, что в отличие от режима Баррира, он является последовательным, похожим на конвейер.

Эта диаграмма не очень хорошо выражает концепцию параллелизма. На самом деле три горутины выполняются одновременно. Три горутины связаны друг с другом через буферный канал. Пока горутина предварительного заказа обрабатывает часть данных, он будет передан для достижения цели параллелизма.

Код

Реализуйте функцию, которая по заданному срезу суммирует квадраты его дочерних элементов.

Например, [1, 2, 3] -> 1^2 + 2^2 + 3^2 = 14.

Нормальная логика, пройтись по срезу, затем возвести в квадрат и аккумулировать. В конвейерном режиме суммирование и возведение в квадрат можно разделить на параллельные вычисления.

/*
* Pipeline 模式
 */

func generator(max int) <-chan int{
    out := make(chan int, 100)
    go func() {
        for i := 1; i <= max; i++ {
            out <- i
        }
        close(out)
    }()
    return out
}

func power(in <-chan int) <-chan int{
    out := make(chan int, 100)
    go func() {
        for v := range in {
            out <- v * v
        }
        close(out)
    }()
    return out
}

func sum(in <-chan int) <-chan int{
    out := make(chan int, 100)
    go func() {
        var sum int
        for v := range in {
            sum += v
        }
        out <- sum
        close(out)
    }()
    return out
}

func main() {
    // [1, 2, 3]
    fmt.Println(<-sum(power(generator(3))))
}

Режим пула рабочих

сцены, которые будут использоваться
  • большое количество одновременных задач

Горутины достаточно легковесны в Go, дажеnet/httpМетод обработки сервера такжеgoroutine-per-connection, поэтому сценариев может быть немного меньше, чем для других языков. Начальное потребление памяти каждой горутиной составляет 2 ~ 8 КБ. Когда у нас есть большой пакет задач, нам нужно запустить много горутин для обработки, что приведет к большим накладным расходам памяти и нагрузке на системный агент. время, мы можем рассмотреть пул сопрограмм.

Код
/*
* Worker pool
 */
type TaskHandler func(interface{})

type Task struct {
    Param   interface{}
    Handler TaskHandler
}

type WorkerPoolImpl interface {
    AddWorker()                  // 增加 worker
    SendTask(Task)               // 发送任务
    Release()                    // 释放
}

type WorkerPool struct {
    wg   sync.WaitGroup
    inCh chan Task
}

func (d *WorkerPool) AddWorker() {
    d.wg.Add(1)
    go func(){
        for task := range d.inCh {
            task.Handler(task.Param)
        }
        d.wg.Done()
    }()
}

func (d *WorkerPool) Release() {
    close(d.inCh)
    d.wg.Wait()
}

func (d *WorkerPool) SendTask(t Task) {
    d.inCh <- t
}

func NewWorkerPool(buffer int) WorkerPoolImpl {
    return &WorkerPool{
        inCh: make(chan Task, buffer),
    }
}

func main() {
    bufferSize := 100
    var workerPool = NewWorkerPool(bufferSize)
    workers := 4
    for i := 0; i < workers; i++ {
        workerPool.AddWorker()
    }

    var sum int32
    testFunc := func (i interface{}) {
        n := i.(int32)
        atomic.AddInt32(&sum, n)
    }
    var i, n int32
    n = 1000
    for ; i < n; i++ {
        task := Task{
            i,
            testFunc,
        }
        workerPool.SendTask(task)
    }
    workerPool.Release()
    fmt.Println(sum)
}

Пул сопрограмм использует отражение для получения выполняемых функций и параметров, что может немного сбивать с толку в Go. Однако, если известны функции, которые должны выполняться в пакетном режиме, их можно оптимизировать в пул сопрограмм, который выполняет только указанные функции, что может повысить производительность.

Режим публикации/подписки

Модель публикации-подписки — это модель уведомления о сообщениях, в которой издатели отправляют сообщения, а подписчики получают сообщения.

сцены, которые будут использоваться
  • очередь сообщений

Код
/*
* Pub/Sub
 */
type Subscriber struct {
    in     chan interface{}
    id     int
    topic  string
    stop   chan struct{}
}

func (s *Subscriber) Close() {
    s.stop <- struct{}{}
    close(s.in)
}

func (s *Subscriber) Notify(msg interface{}) (err error) {
    defer func() {
        if rec := recover(); rec != nil {
            err = fmt.Errorf("%#v", rec)
        }
    }()
    select {
    case s.in <-msg:
    case <-time.After(time.Second):
        err = fmt.Errorf("Timeout\n")
    }
    return
}

func NewSubscriber(id int) SubscriberImpl {
    s := &Subscriber{
        id: id,
        in: make(chan interface{}),
        stop: make(chan struct{}),
    }
    go func() {
        for{
            select {
            case <-s.stop:
                close(s.stop)
                return
            default:
                for msg := range s.in {
                    fmt.Printf("(W%d): %v\n", s.id, msg)
                }
            }
    }}()
    return s
}

// 订阅者需要实现的方法
type SubscriberImpl interface {
    Notify(interface{}) error
    Close()
}

// sub 订阅 pub
func Register(sub Subscriber, pub *publisher){
    pub.addSubCh <- sub
    return
}

// pub 结果定义
type publisher struct {
    subscribers []SubscriberImpl          
    addSubCh    chan SubscriberImpl
    removeSubCh chan SubscriberImpl
    in          chan interface{}
    stop        chan struct{}
}

// 实例化
func NewPublisher () *publisher{
    return &publisher{
        addSubCh: make(chan SubscriberImpl),
        removeSubCh: make(chan SubscriberImpl),
        in: make(chan interface{}),
        stop: make(chan struct{}),
    }
}

// 监听
func (p *publisher) start() {
    for {
        select {
        // pub 发送消息
        case msg := <-p.in:
            for _, sub := range p.subscribers{
                _ = sub.Notify(msg)
            }
        // 移除指定 sub
        case sub := <-p.removeSubCh:
            for i, candidate := range p.subscribers {
                if candidate == sub {
                    p.subscribers = append(p.subscribers[:i], p.subscribers[i+1:]...)
                    candidate.Close()
                    break
                }
            }
        // 增加一个 sub
        case sub := <-p.addSubCh:
            p.subscribers = append(p.subscribers, sub)
        // 关闭 pub
        case <-p.stop:
            for _, sub := range p.subscribers {
                sub.Close()
            }
            close(p.addSubCh)
            close(p.in)
            close(p.removeSubCh)
            return
        }
    }
}

func main() {
    // 测试代码
    pub := NewPublisher()
    go pub.start()

    sub1 := NewSubscriber(1)
    Register(sub1, pub)

    sub2 := NewSubscriber(2)
    Register(sub2, pub)

    commands:= []int{1, 2, 3, 4, 5, 6, 7, 8, 9}
    for _, c := range commands {
        pub.in <- c
    }

    pub.stop <- struct{}{}
    time.Sleep(time.Second*1)
}

Меры предосторожности

  • Проблемы с синхронизацией, особенно примитивы синхронизации иchannelПри совместном использовании склонен к взаимоблокировке
  • Проблема сбоя горутины: если дочерняя горутина паникует без восстановления, основная горутина аварийно завершится
  • Проблема с утечкой горутины, убедитесь, что горутину можно нормально закрыть

Ссылаться на

  1. книга шаблонов дизайна
  2. Книга "Семь семинедельная модель параллелизма"
  3. Зачем использовать общение, чтобы делиться памятью?Почему дизайн?
  4. advanced-go-concurrency