практика параллелизма goroutine (пул сопрограмм + тайм-аут + быстрый возврат ошибки)

Go

когда мы используемgoroutineКогда функция выполняется одновременно, вы можете использоватьsync.WaitGroup{}возможность, где код выглядит следующим образом:

func testGoroutine() {
	wg := sync.WaitGroup{}
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			wg.Done()
			fmt.Println("hello world")
		}()
	}
	wg.Wait()
}

После прочтения приведенного выше кода нам нужно рассмотреть, предположим,goroutineпотому что некоторыеrpcЗапрос слишком медленныйhangживи, в это времяgoroutineостанется застрявшимwg.Wait(), что в конечном итоге приводит к сбою запроса

Если используемый вами фреймворк не предоставляет возможность тайм-аута, или если выgoвнеrpcЗапрос имеет возможность отключения с таймаутом

Так как же нам сделать так, чтобы код не былhangГде жить?

Самое простое решение — увеличить время ожидания!

На самом деле, есть много решений тайм-аута

  • на основеctxизcontext.WithTimeOut()выполнить
  • на основеselectвыполнить

Здесь я выбираю на основеselectРеализуйте тайм-аут, чтобы показать вам, как реализован код

func testWithGoroutineTimeOut() {
	var wg sync.WaitGroup
	done := make(chan struct{})
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
		}()
	}
	// wg.Wait()此时也要go出去,防止在wg.Wait()出堵住
	go func() {
		wg.Wait()
		close(done)
	}()
	select {
	// 正常结束完成
	case <-done:
	// 超时	
	case <-time.After(500 * time.Millisecond):
	}
}

Вы можете видеть, что приведенный выше код был основан наselectОчень просто реализовать тайм-аут?

Но у нас более высокие требования к этому интерфейсу.

  • goroutineнет обработки ошибок,
  • В настоящее времяgoвнеgoroutineколичество зависитforколичество петель, считаяforЦикл 100 Вт раз, в результате чегоgoroutineслишком много вопросов

Может написать пул сопрограмм для решенияgoroutineСлишком много, так как же реализовать пул сопрограмм?

мы можем использоватьsync waitGroup+ неблокирующийchannelвыполнить код показывает, как показано ниже:

package ezgopool

import "sync"

// goroutine pool
type GoroutinePool struct {
	c  chan struct{}
	wg *sync.WaitGroup
}

// 采用有缓冲channel实现,当channel满的时候阻塞
func NewGoroutinePool(maxSize int) *GoroutinePool {
	if maxSize <= 0 {
		panic("max size too small")
	}
	return &GoroutinePool{
		c:  make(chan struct{}, maxSize),
		wg: new(sync.WaitGroup),
	}
}

// add
func (g *GoroutinePool) Add(delta int) {
	g.wg.Add(delta)
	for i := 0; i < delta; i++ {
		g.c <- struct{}{}
	}

}

// done
func (g *GoroutinePool) Done() {
	<-g.c
	g.wg.Done()
}

// wait
func (g *GoroutinePool) Wait() {
	g.wg.Wait()
}

Выше приведена реализация пула сопрограмм, который на самом деле очень прост, Мой блог также записал еще одинgolangРеализация пула сопрограмм с открытым исходным кодом, подробности см.nuggets.capable/post/684490…

Наконец, наша модель тайм-аута + возврата ошибки + пула сопрограмм завершена~

func testGoroutineWithTimeOut() {
	 wg :=sync.WaitGroup{}
	done := make(chan struct{})
	// 新增阻塞chan
	errChan := make(chan error)

	pool.NewGoroutinePool(10)
	for i := 0; i < 10; i++ {
		pool.Add(1)
		go func() {
			pool.Done()
			if err!=nil{
				errChan<-errors.New("error")
			}
		}()
	}

	go func() {
		pool.Wait()
		close(done)
	}()

	select {
	// 错误快返回,适用于get接口
	case err := <-errChan:
		return nil, err
	case <-done:
	case <-time.After(500 * time.Millisecond):
	}
}

Благодарность