Основа языка Go (5) — параллельное программирование

Go

Предисловие:
Эта тема предназначена для документирования себя (647) изучение и накопление в направлении языка Go. Содержание серии относительно простое, и тем, кто хочет начать работу с языком Go, рекомендуется к прочтению.

Каталог выглядит следующим образом:
Языковая основа Go (1) — введение, конфигурация среды, HelloWorld
Основа языка Go (2) — базовый общий синтаксис
Основа языка Go (3) — объектно-ориентированное программирование
Основа языка Go (4) — качественная отказоустойчивость
Основа языка Go (5) — параллельное программирование
Основа языка Go (шесть) — тестирование, размышление, небезопасно
Go Language Foundation (7) — Архитектура и общие задачи
Основа языка Go (8) — настройка производительности


В этой статье будет представлено следующее:
1. Механизм сопрограммы (Groutine)
2. Механизм параллелизма с общей памятью (безопасность сопрограмм)
3. Механизм параллелизма CSP (channel)
4. Мультиплексирование и контроль времени ожидания (select)
5. Закрытие канала и трансляция (channel)
6. Отмена заданий
7. Контекст и связанная с этим отмена задачи
8. Общие одновременные задачи (боевые)

1. Механизм сопрограмм

Я уверен, что все знают"нить"и"процесс"Концепция чего-либо.

На языке Go «сопрограммы» можно понимать как более легкие потоки. Планируя «сопрограммы», можно максимизировать эффективность ядра системы.

С помощью таблицы сравним разницу между сопрограммой и потоком.

  • Поток против подпрограммы:
\ Размер стека по умолчанию (при создании) Переписка KSE (ядро Space Entity)
нить 1M 1 : 1
Корутина 2K M : N

Преимущества сопрограмм по сравнению с потоками:

  • Переключение между потоками будет включать системные потоки в ядре (kernel entity) переключение, что приведет к большей стоимости.
  • Хотя несколько сопрограмм выполняются в одном и том же системном потоке (kernel entity), можно уменьшить системный поток переключения (kernel entity)цена. (Как показано на фиг.1)

Использование сопрограмм:

грамматика:go + func

func TestGroutine(t *testing.T) {
	for i := 0; i < 10; i++ {
		go func(i int) {
			fmt.Println(i) // 正确案例,值传递。各个协程无竞争关系。
		}(i)

		// go func() {
		// 	fmt.Println(i) // 错误案例,共享变量。各个协程有竞争关系
		// }()
	}
	time.Sleep(time.Millisecond * 50)
}

2. Механизм параллелизма с общей памятью (безопасность сопрограмм)

Когда речь заходит о безопасности сопрограмм, первое, что приходит нам на ум, — это блокировки. Безопасность Coroutine гарантируется блокировкой.

То же самое и в языке Go, давайте посмотрим на пример.

  • Сопрограммы являются параллельными, что приводит к небезопасным сопрограммам:
// 协程不安全demo
func TestCounterThreadUnsafe(t *testing.T) {
	counter := 0
	for i := 0; i < 5000; i++ {
		go func() {
			counter++
		}()
	}
	time.Sleep(1 * time.Second)
	t.Logf("counter = %d", counter)
}

Результат выглядит следующим образом:

=== RUN   TestCounterThreadUnsafe
--- PASS: TestCounterThreadUnsafe (1.00s)
    share_mem_test.go:18: counter = 4765

В это время будет обнаружено, что расчет неправильный, потому что параллелизм вызывает значение утечки.

  • Решение первое: Обычная блокировка и добавление задержки для ожидания завершения выполнения сопрограммы (не рекомендуется)
// 协程等待demo(停1秒,不推荐)
func TestCounterThreadSafe(t *testing.T) {
	var mut sync.Mutex
	counter := 0
	for i := 0; i < 5000; i++ {
		go func() {
			defer func() {
				mut.Unlock() //函数调用完成后:解锁,保证协程安全
			}()
			mut.Lock() // 函数将要调用前:加锁,保证协程安全
			counter++
		}()
	}
	time.Sleep(1 * time.Second) // 等待一秒,等协程全部执行完
	t.Logf("counter = %d", counter)
}

Результат выглядит следующим образом:

=== RUN   TestCounterThreadSafe
--- PASS: TestCounterThreadSafe (1.01s)
    share_mem_test.go:35: counter = 5000

Результат правильный, но есть проблема. Поскольку существует 1-секундная задержка ожидания, гарантируется, что сопрограмма завершит работу до вызова результата. Итак, есть ли лучший способ справиться с этим? Далее оптимизируем.

  • Решение второе:рекомендовать!Используйте синхронную очередь ожидания (WaitGroup) гарантирует последовательное выполнение.
// 协程安全Demo
func TestCounterWaitGroup(t *testing.T) {
	var mut sync.Mutex    // 互斥锁
	var wg sync.WaitGroup // 等待队列
	counter := 0
	for i := 0; i < 5000; i++ {
		wg.Add(1) // 加个任务
		go func() {
			defer func() {
				mut.Unlock() //函数调用完成后:解锁,保证协程安全
			}()
			mut.Lock() // 函数将要调用前:加锁,保证协程安全
			counter++
			wg.Done() // 做完任务
		}()
	}
	wg.Wait() //等待所有任务执行完毕
	t.Logf("counter = %d", counter)
}

Результаты приведены ниже:

=== RUN   TestCounterWaitGroup
--- PASS: TestCounterWaitGroup (0.00s)
    share_mem_test.go:55: counter = 5000

В этом случае видно, что: MutexMutexи очередь ожиданияWaitGroupЭто не только обеспечивает безопасность сопрограммы, но и позволяет избежать предварительной печати результата. (✔️)


3. Механизм параллелизма CSP

1. CSP

CSP (Communicating sequential processes): Последовательные процессы связи (конвейерная связь). Проще говоря, CSP выполняется черезChannel(труба) для связи.

в ГоChannel(конвейер) имеет ограниченную мощность и не зависит от обработкиGroutine(корутина).

2. Channel

Общее в GoChannelЕсть два вида, соответствующиеChannel,Buffer Channel.

  • Первый: канал (небуферизованный)

Во-первых, отправитель и получатель должны стоять одновременноChannelДва конца взаимодействуют. Если один из них отсутствует, другой блокируется на одном конце и не взаимодействует до тех пор, пока не появятся оба конца.

Создать синтаксис:make(chan [type])

retChannel := make(chan string) // 创建无缓冲channel,并指明channel中的数据为string,双端等待

Синтаксис ввода:channel <-

channel <- object // channel输入

Получить синтаксис:<- channel

object <- channel // channel输出
  • Второй: Buffer Channel (с буферизацией)

Это немного более продвинутыйChannelспособ, (более слабосвязанный).

Во-первых, дайтеChannelУстановите размер емкости, который не требует одновременного присутствия отправителя и получателя на обоих концах. Затем отправитель начнет сBufferформа, непрерывноChannelОтправить сообщение. до того какChannelБлокируется, когда емкость заполнена. В это время, пока получатель получает сообщение (т.е.Channelоставшейся емкости), отправитель продолжит отправлять сообщения.

Создать синтаксис:make(chan [type], Int)

retChannel := make(chan string, 1) // 创建有缓冲channel,并指明channel中的数据为string

Синтаксис ввода:channel <-

channel <- object // channel输入

Получить синтаксис:<- channel

object <- channel // channel输出

Демонстрация: имитирует процесс вызова метода сетевого запроса черезChannelЧтобы управлять текущей сопрограммой для выполнения других задач во время ожидания сетевого запроса.

// 模拟网络请求
func serviceTask() string {
	fmt.Println("- start working on service task.")
	time.Sleep(time.Millisecond * 50)
	return "- service task is Done."
}

// 别的任务
func otherTask() {
	fmt.Println("start working on something else.")
	time.Sleep(time.Millisecond * 100)
	fmt.Println("other task is Done.")
}

// csp异步管道
func AsyncService() chan string {
	retChannel := make(chan string) // 无缓冲channel,创建并指明channel中的数据为string,双端等待
	// retChannel := make(chan string, 1) // 有缓冲channel,创建并指明channel中的数据为string
	go func() {
		ret := serviceTask()
		fmt.Println("returned result.")
		retChannel <- ret // channel输入
		fmt.Println("service exited.")
	}()
	return retChannel
}

func TestAsyncService(t *testing.T) {
	retCh := AsyncService()
	otherTask()
	fmt.Println(<-retCh) // channel输出
	time.Sleep(time.Second * 1)
}

4. Мультиплексирование и контроль времени ожидания

использоватьselectключевое слово для завершения "множественный выбор" и "управление временем ожидания".

  • Большой выбор: когда вернулсяchannelКогда их может быть несколько, можно использовать select для обработки нескольких событий ответа.

Примечание: здесь сswitchВроде как, но обратите внимание, что он не оценивается последовательно. то есть, еслиchannel1иchannel2Когда оба удовлетворены, возможный путьchannel1, также может бытьchannel2, не какswitchСделайте то же самое решение порядка.

Демо:

	select {
	case ret := <-channel1: 
		t.Log(ret)
	case ret:= <- channel2:
		t.Log(ret)
	case default:
		t.Error("No one returned.")
	}
  • Контроль времени ожидания:

В то же время мы также можем установить шунт для ожидания тайм-аута, когдаchannelКогда тайм-аут не вернулся, соответствующий код может быть выполнен.

Демо:

	select {
	case ret := <-AsyncService(): //正常返回
		t.Log(ret)
	case <-time.After(time.Millisecond * 100): // 超时等待
		t.Error("time out")
	}

5. Закрытие и трансляция каналов

Основные моменты заключаются в следующем:

  1. ужеcloseизchannelотправить сообщение, вызовет программуpanic.
  2. v, ok <- channel. в,okзаboolценность, какok==trueкогда, указываяchannelвopenгосударство. какok==falseкогда, указываяchannelвcloseгосударство.
  3. всеchannelполучательchannelПри закрытии он немедленно возвращается из ожидания блокировки, иokзначениеfalse. (PS: широковещательный механизм, обычно используемый для одновременной отправки сигналов нескольким подписчикам. Например, выходной сигнал.)

Демо:

// 消息生产者
func dataProducer(ch chan int, wg *sync.WaitGroup) {
	go func() {
		for i := 0; i < 10; i++ {
			ch <- i
		}

		fmt.Println("channel close.")
		close(ch) // 关闭channel

		wg.Done()
	}()
}

// 消息接收者
func dataReceiver(ch chan int, wg *sync.WaitGroup) {
	go func() {
		for {
			if data, ok := <-ch; ok { // 有消息就打印,直到channel被close。
				fmt.Println(data)
			} else {
				fmt.Println("Receiver close.")
				break // channel被close
			}
		}
		wg.Done()
	}()
}

func TestCloseChannel(t *testing.T) {
	var wg sync.WaitGroup
	ch := make(chan int)
	wg.Add(1)
	dataProducer(ch, &wg) // 开启生产者
	wg.Add(1)
	dataReceiver(ch, &wg) // 开启消费者
	wg.Wait()
}

6. Отмена заданий

через вышеуказанноеclose channel(механизм вещания), мы можем расширить его,close channelуведомить всехchannelОтменить текущую задачу.

Демо выглядит следующим образом:

func isCancelled(cancelChan chan struct{}) bool {
	select {
	case <-cancelChan:
		return true
	default:
		return false
	}
}

// 只能取消单个channel
func cancel_1(cancelChan chan struct{}) {
	cancelChan <- struct{}{}
}

// 所有channel全部取消
func cancel_2(cancelChan chan struct{}) {
	close(cancelChan)
}

func TestCancel(t *testing.T) {
	cancelChan := make(chan struct{}, 0) // 创建了一个channal,通过它来控制事件取消
	for i := 0; i < 5; i++ {             // 开启5个协程
		go func(i int, chanclCh chan struct{}) { // 每个协程里面都有一个死循环,去等待取消消息
			for {
				if isCancelled(cancelChan) {
					break
				}
				time.Sleep(time.Millisecond * 5) // 模拟延迟5毫秒
			}
			fmt.Println(i, "Cancelled") // 说明退出了死循环,打印日志
		}(i, cancelChan)
	}
	cancel_2(cancelChan) // 通知所有channel关闭。
	time.Sleep(time.Second * 1)
}

Семь, Контекст и связанная с этим отмена задачи

только что мы прошлиclose channelотменить миссию, но будут некоторые проблемы. Например, при отмене задачи связанные с ней подзадачи также должны быть немедленно отменены.

Для решения этой проблемы,go 1.9.0после,golangприсоединилсяcontext, чтобы обеспечить отмену связанной задачи.

1. Context

contextЭто контекст, используемый для управления связанными задачами, включая доставку общих значений, тайм-ауты и уведомления об отмене.

Структура выглядит следующим образом:

type Context interface {
    Deadline() (deadline time.Time, ok bool)
    Done() <-chan struct{}
    Err() error
    Value(key interface{}) interface{}
}
  1. Deadlineвернет тайм-аут,GoroutineНапример, после получения тайм-аута для некоторых операций ввода-вывода может быть установлен тайм-аут.
  2. Doneметод возвращает канал (channel),когдаContextПри отзыве или истечении срока действия канал закрывается, т. е. является представлениемContextОтключен ли сигнал.
  3. когдаDoneПосле закрытия канала метод Err указывает, почему контекст был отозван.
  4. Valueможет позволитьGoroutineСовместное использование некоторых данных и, конечно же, получение данных безопасны для сопрограммы. Но обратите внимание на синхронизацию при использовании этих данных, например, на возвратmap, и этоmapЧтение и запись будут заблокированы.

Ключевые моменты:

  • корневой контекст: черезcontext.Background()Создайте.
  • Дочерний контекст: пройтиcontext.WithCancel(parentContext)Создайте.
  • Когда текущий контекст отменяется, все дочерние контексты, основанные на нем, будут отменены.
  • Чтобы получать уведомления об отмене:<-ctx.Done

2. Отменить связанную задачу

Мы немного подкорректировали предыдущий пример и отменили все связанные задачи через контекст.

  • Сначала создайтеcontext:
ctx, cancel := context.WithCancel(context.Background()) // 创建一个子context
  • Напишите метод отмены, который помещаетcontextкак параметр.
func isCancelled(ctx context.Context) bool {
    select {
    case <-ctx.Done():
        return true
    default:
        return false
    }
}
  • Откройте пять бесконечных циклов сопрограммы, каждая сопрограмма имеет бесконечный цикл, ожидающий сообщения об отмене задачи. позвони сноваcancelметод.
for i := 0; i < 5; i++ {                                // 开启5个协程
        go func(i int, ctx context.Context) { // 每个协程里面都有一个死循环,去等待取消消息
            for {
                if isCancelled(ctx) {
                    break
                }
                time.Sleep(time.Millisecond * 5) // 模拟延迟5毫秒
            }
            fmt.Println(i, "Cancelled") // 说明退出了死循环,打印日志
        }(i, ctx)
    }
    cancel() // 取消ctx

Полный пример кода выглядит следующим образом:

func isCancelled(ctx context.Context) bool {
	select {
	case <-ctx.Done():
		return true
	default:
		return false
	}
}

func TestCancel(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background()) // 创建一个子context
	for i := 0; i < 5; i++ {                                // 开启5个协程
		go func(i int, ctx context.Context) { // 每个协程里面都有一个死循环,去等待取消消息
			for {
				if isCancelled(ctx) {
					break
				}
				time.Sleep(time.Millisecond * 5) // 模拟延迟5毫秒
			}
			fmt.Println(i, "Cancelled") // 说明退出了死循环,打印日志
		}(i, ctx)
	}
	cancel() // 取消ctx
	time.Sleep(time.Second * 1)
}

Восемь, общие одновременные задачи (бой)

1. Выполнить только один раз (одноэлементный шаблон)

Сценарий: в случае с несколькими сопрограммами гарантируется, что определенный фрагмент кода будет выполнен только один раз.

type Singleton struct {
	data string
}

var singleInstance *Singleton
var once sync.Once

func GetSingletonObj() *Singleton {
	once.Do(func() {
		fmt.Println("Create Obj")
		singleInstance = new(Singleton)
	})
	return singleInstance
}

func TestGetSingletonObj(t *testing.T) {
	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			obj := GetSingletonObj()
			fmt.Printf("%p\n", obj)
			wg.Done()
		}()
	}
	wg.Wait()
}

2. Нужно только выполнить любую задачу

Используя механизм связи канала, мы можем отправить сообщение объекту, когда какая-либо сопрограмма завершит задачу.

func runTask(id int) string {
	time.Sleep(10 * time.Millisecond)
	return fmt.Sprintf("The result is from %d", id)
}

func firstResponse() string {
	numOfRunner := 10
	ch := make(chan string, numOfRunner) // 创建bufferChannel。(如果用channel会导致协程泄漏,剩下9个channel会一直阻塞在系统中。)
	for i := 0; i < numOfRunner; i++ { // 开了10个协程
		go func(i int) {
			ret := runTask(i) // 每个协程去执行任务
			ch <- ret
		}(i)
	}
	return <-ch // 返回channel里的第一个Response。(因为channel是一个先进先出的管道)
}

func TestFirstResponse(t *testing.T) {
	t.Log(firstResponse()) // 发现每次运行返回的都不一样,会根据协程完成任务的一个顺序返回。
}

3. Все задания выполнены

Только что мы ввели первый ответ, а затем давайте посмотрим, что делать с ответом all. Идея та же, просто получить всеchannelВозвращенные данные могут быть возвращены.

func runTask(id int) string {
	time.Sleep(10 * time.Millisecond)
	return fmt.Sprintf("The result is from %d", id)
}

func allResponse() string {
	numOfRunner := 10
	ch := make(chan string, numOfRunner) // 创建bufferChannel。
	for i := 0; i < numOfRunner; i++ {   // 开了10个协程
		go func(i int) {
			ret := runTask(i) // 每个协程去执行任务
			ch <- ret
		}(i)
	}
	finalRet := ""
	for j := 0; j < numOfRunner; j++ {
		finalRet += <-ch + "\n"
	}
	return finalRet // 返回channel里的所有的Response。(因为channel是一个先进先出的管道)
}

func TestAllResponse(t *testing.T) {
	t.Log("Before:", runtime.NumGoroutine()) // 打印一下当前的协程数量
	t.Log(allResponse())                     // 发现每次运行返回的都不一样,会根据协程完成任务的一个顺序返回。
	t.Log("After:", runtime.NumGoroutine()) // 再打印一下当前的协程数量
}

4. Пул объектов

Мы можем использовать функцию конвейера буферного канала для создания пула объектов.

Демо:

type ReusableObj struct {
}

type ObjPool struct {
	bufChan chan *ReusableObj // 用于缓冲可重用对象
}

// 生产指定数量对象的对象池
func NewObjPool(numOfObj int) *ObjPool {
	ObjPool := ObjPool{}
	ObjPool.bufChan = make(chan *ReusableObj, numOfObj)
	for i := 0; i < numOfObj; i++ {
		ObjPool.bufChan <- &ReusableObj{}
	}
	return &ObjPool
}

// 从对象池中获得对象
func (p *ObjPool) GetObj(timeout time.Duration) (*ReusableObj, error) {
	select {
	case ret := <-p.bufChan:
		return ret, nil
	case <-time.After(timeout): // 超时控制
		return nil, errors.New("time out")
	}
}

// 释放对象池里的对象
func (p *ObjPool) ReleaseObj(obj *ReusableObj) error {
	select {
	case p.bufChan <- obj:
		return nil
	default:
		return errors.New("overflow")
	}
}

func TestObjPool(t *testing.T) {
	pool := NewObjPool(10) // 生产一个10容量大小的对象池
	for i := 0; i < 10; i++ {
		if v, err := pool.GetObj(time.Second * 1); err != nil { // 获取obj
			t.Error(err)
		} else {
			fmt.Printf("%T\n", v)                      // 获取成功,答应日志。
			if err := pool.ReleaseObj(v); err != nil { // 释放obj
				t.Error(err)
			}
		}
	}
	fmt.Println("Done.")
}

5. Кэш объектов sync.pool

Мы можем выполнять кэширование объектов (стратегия создания, получения, кэширования) через sync.pool.

Стратегия приобретения объекта:
  1. Во-первых, попробуйте получить от частного объекта.

  2. Во-вторых, если частный объект не существует, попробуйтеProcessобщего пула.

  3. если текущийProcessобщий пул пуст, просто попробуйте получитьProcessобщего пула.

  4. я упалProcessОбщий пул пуст, просто начните сsync.poolназначенныйNewметод“New”Возвращается новый объект.

Жизненный цикл объекта кэша sync.pool:
  • каждый разGC(сборка мусора) очистит объекты кэша sync.pool.

  • Таким образом, кеш объектов действителен для следующего разаGCДо.

Основное использование:

func TestSyncPool(t *testing.T) {
	pool := &sync.Pool{
		New: func() interface{} { // 创建一个新的对象
			fmt.Println("Create a new object.")
			return 100
		},
	}

	v := pool.Get().(int) // 获取对象
	fmt.Println(v)
	pool.Put(3) // 放回对象
	// runtime.GC() // 触发GC,会清除sync.pool中缓存的对象
	v1, _ := pool.Get().(int)
	fmt.Println(v1)
}

Используйте под мульти-сопрограммами:

func TestSyncPoolInMultiGroutine(t *testing.T) {
	pool := &sync.Pool{
		New: func() interface{} {
			fmt.Println("Create a new object.")
			return 10
		},
	}

	pool.Put(100)
	pool.Put(100)
	pool.Put(100)

	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {// 创建10个协程
		wg.Add(1) 
		go func(id int) {
			fmt.Println(pool.Get()) // 获取对象
			wg.Done() 
		}(i)
	}
	wg.Wait()
}
Преимущества и недостатки sync.pool:
  • Достоинства: проходитsync.poolСокращение затрат на создание сложных объектов и сборку мусора.

  • проблема:sync.poolОн будет возвращен сборщиком мусора, и при одновременном использовании необходимо учитывать блокировку. Поэтому делайте компромиссы в программе. (Подумайте, высока ли стоимость создания объекта? Или стоимость использования sync.pool для блокировки кеша и его повторного использования?)


Наконец, в этой серии я в роли Учителя Цай Чао.обмен технологиямиСледующие резюме и фактический бой завершены, Спасибо, мистер Цай Чао.обмен технологиями.

PS: Прикрепил, поделитесь ссылкой:«Перейти от входа к реальному бою»Желаю всем успехов в учебе и успехов в работе. Спасибо!