Базовые концепты
Понимание параллелизма и параллелизма
Параллелизм: акцент делается на выполнении нескольких действий в течение определенного периода времени.
Параллелизм: акцент делается на одновременном выполнении нескольких действий.
CSP против модели актера
Actor
Актерская модель — это общая модель параллельного программирования, которую можно использовать практически на любом языке программирования, обычно на Erlang. Несколько субъектов (процессов) могут работать одновременно, не обмениваться состоянием и обмениваться данными, асинхронно отправляя сообщения в связанные с процессом очереди сообщений (также известные как почтовые ящики).
Взаимодействие процессов субъекта-1 и субъекта-2 зависит от очереди сообщений, а очередь сообщений и процесс связаны и связаны друг с другом. После того, как актор-1 отправил сообщение, он может продолжать выполнять другие задачи без обработки сообщения актор-2, что означает, что связь между процессами актора является асинхронной.
преимущество
- Обмен сообщениями и инкапсуляция, несколько акторов могут работать одновременно без совместного использования состояния, а события внутри одного актора выполняются последовательно (благодаря очередям)
- Модель акторов поддерживает как модель разделяемой памяти, так и модель распределенной памяти.
недостаток
- Хотя акторную модель легче отлаживать, чем программы, использующие модель потоков и блокировок, существуют также проблемы взаимоблокировок и необходимость беспокоиться о переполнении очередей в связанных процессах.
- Прямой поддержки параллелизма нет, и параллельные решения необходимо создавать с помощью параллельной технологии.
CSP
CSP означает обмен последовательными процессами.Подобно модели Актера, эта модель также состоит из независимых, одновременно выполняющихся сущностей, которые взаимодействуют посредством отправки сообщений. модель csp в ходуchannel
заgoroutine
является анонимным и не требуетgid
связать, черезchannel
Заканчиватьgoroutine
Связь между. (КАНАЛ в концепции CSP представляет канал, обсуждается только переход, Канал эквивалентен Каналу в ГО)
преимущество
- Самым большим преимуществом CSP перед Актерами является гибкость. Модель актора, носитель, отвечающий за коммуникацию, и исполнительный блок связаны между собой. В ЦСП,
channel
Первоклассный объект, который можно независимо создавать, записывать, оставлять в покое или передавать между исполнительными модулями.
недостаток
- Модель CSP также подвержена взаимоблокировкам и не обеспечивает прямой поддержки параллелизма. Параллелизм должен быть построен на параллелизме, что вносит неопределенность.
разница
- Модель акторов фокусируется на объектах, участвующих в обмене (т.е. процессах), а CSP фокусируется на каналах связи, таких как Go
channel
- Модель CSP фокусируется не на процессе отправки сообщения, а на процессе, используемом для отправки сообщения.
channel
,иchannel
Процессы не так тесно связаны с очередями, как модель Актера. Вместо этого их можно создавать, читать и записывать по отдельности и передавать между процессами (горутинами).
Модель параллелизма в GO
Go использует идею SCP, а канал — это рекомендуемый метод коммуникации Go в параллельном программировании.Роб Пайк, разработчик Go, имеет классическую поговорку:
Do not communicate by sharing memory; instead, share memory by communicating.
Это предложение говорит о том, что «не используйте связь с общей памятью, но следует использовать связь для получения общей памяти». Язык Go рекомендует использовать связь для синхронизации сообщений между процессами. Это дает три преимущества, вытекающие изdravenessСообщение блога.
- Прежде всего, использование отправки сообщений для синхронизации информации является абстракцией более высокого уровня, чем непосредственное использование разделяемой памяти и блокировок мьютексов.Использование абстракций более высокого уровня может обеспечить лучшую инкапсуляцию при разработке программы и сделать логику программы более эффективной. ;
- Во-вторых, отправка сообщений также имеет определенные преимущества по сравнению с общей памятью с точки зрения развязки: мы можем разделить обязанности потоков на производителей и потребителей и развязать их через передачу сообщений, не полагаясь на разделяемую память;
- Наконец, язык Go выбирает способ отправки сообщений, гарантируя, что только один активный поток может получить доступ к данным в одно и то же время, что естественно позволяет избежать проблем конкуренции потоков и конфликтов данных по дизайну;
Шаблоны параллельного проектирования
Модель параллелизма, используемая в Go, была представлена выше и ниже этой модели параллелизма.channel
является важной концепцией, и дизайн каждого из следующих шаблонов основан наchannel
, надо выяснить.
Барьерный режим
Барьер Барьерный режим, как следует из названия, представляет собой барьер, который блокируется до тех пор, пока все горутины не вернут результаты агрегирования. можно использоватьchannel
реализовать.
сцены, которые будут использоваться
- Несколько сетевых запросов одновременно, агрегированные результаты
- Грубые задачи разделяются и выполняются одновременно, а результаты агрегируются.
Код
/*
* Barrier
*/
type barrierResp struct {
Err error
Resp string
Status int
}
// 构造请求
func makeRequest(out chan<- barrierResp, url string) {
res := barrierResp{}
client := http.Client{
Timeout: time.Duration(2*time.Microsecond),
}
resp, err := client.Get(url)
if resp != nil {
res.Status = resp.StatusCode
}
if err != nil {
res.Err = err
out <- res
return
}
byt, err := ioutil.ReadAll(resp.Body)
defer resp.Body.Close()
if err != nil {
res.Err = err
out <- res
return
}
res.Resp = string(byt)
out <- res
}
// 合并结果
func barrier(endpoints ...string) {
requestNumber := len(endpoints)
in := make(chan barrierResp, requestNumber)
response := make([]barrierResp, requestNumber)
defer close(in)
for _, endpoints := range endpoints {
go makeRequest(in, endpoints)
}
var hasError bool
for i := 0; i < requestNumber; i++ {
resp := <-in
if resp.Err != nil {
fmt.Println("ERROR: ", resp.Err, resp.Status)
hasError = true
}
response[i] = resp
}
if !hasError {
for _, resp := range response {
fmt.Println(resp.Status)
}
}
}
func main() {
barrier([]string{"https://www.baidu.com", "http://www.sina.com", "https://segmentfault.com/"}...)
}
Tips
Также можно использовать барьерный режим.errgroup
Расширьте библиотеку для достижения, это более просто и понятно. Этот пакет чем-то похож наsync.WaitGroup
, но разница в том, что при возникновении ошибки в одной из задач эта ошибка может быть возвращена. И это также удовлетворяет потребности нашего шаблона барьера.
func barrier(endpoints ...string) {
var g errgroup.Group
var mu sync.Mutex
response := make([]barrierResp, len(endpoints))
for i, endpoint := range endpoints {
i, endpoint := i, endpoint // create locals for closure below
g.Go(func() error {
res := barrierResp{}
resp, err := http.Get(endpoint)
if err != nil {
return err
}
byt, err := ioutil.ReadAll(resp.Body)
defer resp.Body.Close()
if err != nil {
return err
}
res.Resp = string(byt)
mu.Lock()
response[i] = res
mu.Unlock()
return err
})
}
if err := g.Wait(); err != nil {
fmt.Println(err)
}
for _, resp := range response {
fmt.Println(resp.Status)
}
}
Будущий образец
будущее есть будущее, режим из будущего (собачья ручная голова). Этот шаблон обычно используется в асинхронной обработке, также известной как шаблон Promise, с использованиемfire-and-forget
Этот метод означает, что основная горутина возвращается напрямую, не дожидаясь завершения выполнения подпрограммы, а затем ждет завершения будущего выполнения, чтобы получить результат. Реализация этого шаблона в Go проста благодаря горутинам.
сцены, которые будут использоваться
- асинхронный
Код
/*
* Future
*/
type Function func(string) (string, error)
type Future interface {
SuccessCallback() error
FailCallback() error
Execute(Function) (bool, chan struct{})
}
type AccountCache struct {
Name string
}
func (a *AccountCache) SuccessCallback() error {
fmt.Println("It's success~")
return nil
}
func (a *AccountCache) FailCallback() error {
fmt.Println("It's fail~")
return nil
}
func (a *AccountCache) Execute(f Function) (bool, chan struct{}){
done := make(chan struct{})
go func(a *AccountCache) {
_, err := f(a.Name)
if err != nil {
_ = a.FailCallback()
} else {
_ = a.SuccessCallback()
}
done <- struct{}{}
}(a)
return true, done
}
func NewAccountCache(name string) *AccountCache {
return &AccountCache{
name,
}
}
func testFuture() {
var future Future
future = NewAccountCache("Tom")
updateFunc := func(name string) (string, error){
fmt.Println("cache update:", name)
return name, nil
}
_, done := future.Execute(updateFunc)
defer func() {
<-done
}()
}
func main() {
var future Future
future = NewAccountCache("Tom")
updateFunc := func(name string) (string, error){
fmt.Println("cache update:", name)
return name, nil
}
_, done := future.Execute(updateFunc)
defer func() {
<-done
}()
// do something
}
Вот хитрость: зачем использовать
struct
введите какchannel
анонс?Многие программы с открытым исходным кодом используют этот метод в качестве сигнального механизма, главным образом потому, что пустой
struct
Это занимает наименьшее количество памяти в Go.
Конвейерный режим
сцены, которые будут использоваться
- Вы можете воспользоваться преимуществом многоядерности, чтобы разбить часть грубой логики на несколько горутин для выполнения.
Сам конвейер переводится как средний конвейер.Обратите внимание, что в отличие от режима Баррира, он является последовательным, похожим на конвейер.
Эта диаграмма не очень хорошо выражает концепцию параллелизма. На самом деле три горутины выполняются одновременно. Три горутины связаны друг с другом через буферный канал. Пока горутина предварительного заказа обрабатывает часть данных, он будет передан для достижения цели параллелизма.
Код
Реализуйте функцию, которая по заданному срезу суммирует квадраты его дочерних элементов.
Например, [1, 2, 3] -> 1^2 + 2^2 + 3^2 = 14.
Нормальная логика, пройтись по срезу, затем возвести в квадрат и аккумулировать. В конвейерном режиме суммирование и возведение в квадрат можно разделить на параллельные вычисления.
/*
* Pipeline 模式
*/
func generator(max int) <-chan int{
out := make(chan int, 100)
go func() {
for i := 1; i <= max; i++ {
out <- i
}
close(out)
}()
return out
}
func power(in <-chan int) <-chan int{
out := make(chan int, 100)
go func() {
for v := range in {
out <- v * v
}
close(out)
}()
return out
}
func sum(in <-chan int) <-chan int{
out := make(chan int, 100)
go func() {
var sum int
for v := range in {
sum += v
}
out <- sum
close(out)
}()
return out
}
func main() {
// [1, 2, 3]
fmt.Println(<-sum(power(generator(3))))
}
Режим пула рабочих
сцены, которые будут использоваться
- большое количество одновременных задач
Горутины достаточно легковесны в Go, дажеnet/http
Метод обработки сервера такжеgoroutine-per-connection
, поэтому сценариев может быть немного меньше, чем для других языков. Начальное потребление памяти каждой горутиной составляет 2 ~ 8 КБ. Когда у нас есть большой пакет задач, нам нужно запустить много горутин для обработки, что приведет к большим накладным расходам памяти и нагрузке на системный агент. время, мы можем рассмотреть пул сопрограмм.
Код
/*
* Worker pool
*/
type TaskHandler func(interface{})
type Task struct {
Param interface{}
Handler TaskHandler
}
type WorkerPoolImpl interface {
AddWorker() // 增加 worker
SendTask(Task) // 发送任务
Release() // 释放
}
type WorkerPool struct {
wg sync.WaitGroup
inCh chan Task
}
func (d *WorkerPool) AddWorker() {
d.wg.Add(1)
go func(){
for task := range d.inCh {
task.Handler(task.Param)
}
d.wg.Done()
}()
}
func (d *WorkerPool) Release() {
close(d.inCh)
d.wg.Wait()
}
func (d *WorkerPool) SendTask(t Task) {
d.inCh <- t
}
func NewWorkerPool(buffer int) WorkerPoolImpl {
return &WorkerPool{
inCh: make(chan Task, buffer),
}
}
func main() {
bufferSize := 100
var workerPool = NewWorkerPool(bufferSize)
workers := 4
for i := 0; i < workers; i++ {
workerPool.AddWorker()
}
var sum int32
testFunc := func (i interface{}) {
n := i.(int32)
atomic.AddInt32(&sum, n)
}
var i, n int32
n = 1000
for ; i < n; i++ {
task := Task{
i,
testFunc,
}
workerPool.SendTask(task)
}
workerPool.Release()
fmt.Println(sum)
}
Пул сопрограмм использует отражение для получения выполняемых функций и параметров, что может немного сбивать с толку в Go. Однако, если известны функции, которые должны выполняться в пакетном режиме, их можно оптимизировать в пул сопрограмм, который выполняет только указанные функции, что может повысить производительность.
Режим публикации/подписки
Модель публикации-подписки — это модель уведомления о сообщениях, в которой издатели отправляют сообщения, а подписчики получают сообщения.
сцены, которые будут использоваться
- очередь сообщений
Код
/*
* Pub/Sub
*/
type Subscriber struct {
in chan interface{}
id int
topic string
stop chan struct{}
}
func (s *Subscriber) Close() {
s.stop <- struct{}{}
close(s.in)
}
func (s *Subscriber) Notify(msg interface{}) (err error) {
defer func() {
if rec := recover(); rec != nil {
err = fmt.Errorf("%#v", rec)
}
}()
select {
case s.in <-msg:
case <-time.After(time.Second):
err = fmt.Errorf("Timeout\n")
}
return
}
func NewSubscriber(id int) SubscriberImpl {
s := &Subscriber{
id: id,
in: make(chan interface{}),
stop: make(chan struct{}),
}
go func() {
for{
select {
case <-s.stop:
close(s.stop)
return
default:
for msg := range s.in {
fmt.Printf("(W%d): %v\n", s.id, msg)
}
}
}}()
return s
}
// 订阅者需要实现的方法
type SubscriberImpl interface {
Notify(interface{}) error
Close()
}
// sub 订阅 pub
func Register(sub Subscriber, pub *publisher){
pub.addSubCh <- sub
return
}
// pub 结果定义
type publisher struct {
subscribers []SubscriberImpl
addSubCh chan SubscriberImpl
removeSubCh chan SubscriberImpl
in chan interface{}
stop chan struct{}
}
// 实例化
func NewPublisher () *publisher{
return &publisher{
addSubCh: make(chan SubscriberImpl),
removeSubCh: make(chan SubscriberImpl),
in: make(chan interface{}),
stop: make(chan struct{}),
}
}
// 监听
func (p *publisher) start() {
for {
select {
// pub 发送消息
case msg := <-p.in:
for _, sub := range p.subscribers{
_ = sub.Notify(msg)
}
// 移除指定 sub
case sub := <-p.removeSubCh:
for i, candidate := range p.subscribers {
if candidate == sub {
p.subscribers = append(p.subscribers[:i], p.subscribers[i+1:]...)
candidate.Close()
break
}
}
// 增加一个 sub
case sub := <-p.addSubCh:
p.subscribers = append(p.subscribers, sub)
// 关闭 pub
case <-p.stop:
for _, sub := range p.subscribers {
sub.Close()
}
close(p.addSubCh)
close(p.in)
close(p.removeSubCh)
return
}
}
}
func main() {
// 测试代码
pub := NewPublisher()
go pub.start()
sub1 := NewSubscriber(1)
Register(sub1, pub)
sub2 := NewSubscriber(2)
Register(sub2, pub)
commands:= []int{1, 2, 3, 4, 5, 6, 7, 8, 9}
for _, c := range commands {
pub.in <- c
}
pub.stop <- struct{}{}
time.Sleep(time.Second*1)
}
Меры предосторожности
- Проблемы с синхронизацией, особенно примитивы синхронизации и
channel
При совместном использовании склонен к взаимоблокировке - Проблема сбоя горутины: если дочерняя горутина паникует без восстановления, основная горутина аварийно завершится
- Проблема с утечкой горутины, убедитесь, что горутину можно нормально закрыть
Ссылаться на
- книга шаблонов дизайна
- Книга "Семь семинедельная модель параллелизма"
- Зачем использовать общение, чтобы делиться памятью?Почему дизайн?
- advanced-go-concurrency