Погружение в модель параллелизма Go: контекст

Go
Погружение в модель параллелизма Go: контекст

вводить

На сервере Go каждый входящий запрос обрабатывается собственной горутиной. Обработчики запросов обычно запускают другие горутины для доступа к бэкендам, таким как базы данных и службы RPC. Коллекции горутин, которые обрабатывают запросы, часто требуется доступ к значениям, специфичным для запроса, таким как идентификатор конечного пользователя, токен авторизации и дата истечения срока действия запроса. Когда запрос отменяется или истекает время ожидания, все горутины, обрабатывающие этот запрос, должны быстро завершиться, чтобы система могла восстановить любые ресурсы, которые они использовали.

Контекст специально используется для упрощения обработки одного запроса, обмена данными между несколькими горутинами, отмены сигналов, обработки времени ожидания и других связанных операций. Переведено сGo Concurrency Patterns: Context.

характеристика

  • Контекст безопасен для параллелизма.
  • Поддерживает древовидных вышестоящих для управления одним или несколькими подчиненными, но не поддерживает обратное управление и горизонтальное управление.
  • Горутина передает сигнал отмены, чтобы завершить жизнь дочерней горутины.
  • Когда goruntine инициализирует службу sub-goruntine, передайте крайний срок или тайм-аут для управления sub-goruntine.
  • Когда горунтины заканчиваются, заканчивается жизненный цикл всех соответствующих дочерних горунтин.

сцены, которые будут использоваться

  • В случае одновременных мультисервисных вызовов, например, когда приходит запрос, запускаются три горутины для вызова трех сервисов: RpcA, RpcB и RpcC. В это время, если в одной из служб есть ошибка, будет возвращена ошибка, а две другие службы Rpc ​​будут отменены одновременно. Этого можно добиться с помощью метода WithCancel.
  • Запросы тайм-аута, такие как ограничения тайм-аута для Http и Rpc, могут быть реализованы через WithDeadline и WithTimeout.
  • Перенося данные, такие как запрошенная информация о пользователе, в общих бизнес-сценариях у нас будет специальное промежуточное программное обеспечение для проверки информации о пользователе, а затем вставки информации о пользователе в контекст или обмена ею с несколькими производными горутинами.Реализация метода WithValue.

Официальный пример

package main
import (
	"context"
	"fmt"
)

func main() {
	gen := func(ctx context.Context) <-chan int {
		dst := make(chan int)
		n := 1
		go func() {
			for {
				select {
				case <-ctx.Done():
					return // returning not to leak the goroutine
				case dst <- n:
					n++
				}
			}
		}()
		return dst
	}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel() // cancel when we are finished consuming integers

	for n := range gen(ctx) {
		fmt.Println(n)
		if n == 5 {
			break
		}
	}
}

Context

context — это интерфейс, определяемый следующим образом:исходный код

type Context interface {
	Deadline() (deadline time.Time, ok bool)

	Done() <-chan struct{}

	Err() error

	Value(key interface{}) interface{}
}

Содержит следующие 3 функции:

  • время выживания
  • отменить сигнал
  • Общее значение между горутингом на основе запроса

emptyCtx — это реализация Context, которая реализует методы Deadline, Done, Err, Value и String соответственно.

type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
	return 
}

func (*emptyCtx) Done() <-chan struct{} {
	return nil
}

func (*emptyCtx) Err() error {
	return nil
}

func (*emptyCtx) Value(key interface{}) interface{} {
	return nil
}

func (e *emptyCtx) String() string {
	switch e {
	case background:
		return "context.Background"
	case todo:
		return "context.TODO"
	}
	return "unknown empty Context"
}

cancelCtx

Из примера мы видим первую операцию инициализации, которую нужно сделать, чтобы использовать контекст

ctx, cancel := context.WithCancel(context.Background())

Здесь context.Background() возвращает указатель типа emptyCtx.

var (
	background = new(emptyCtx)
	todo       = new(emptyCtx)
)

func Background() Context {
	return background
}

func TODO() Context {
	return todo
}

Давайте еще раз посмотрим, функция WithCancel получает фон в качестве параметра и создает экземпляр cancelCtx. В то же время Context используется как его анонимное поле, так что его можно рассматривать как Context.

func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
	c := newCancelCtx(parent)
	propagateCancel(parent, &c)
	return &c, func() { c.cancel(true, Canceled) } //什么意思,看下文您就明白了
}

func newCancelCtx(parent Context) cancelCtx {
	return cancelCtx{Context: parent}
}

type cancelCtx struct {
	Context

	mu       sync.Mutex            // protects following fields
	done     chan struct{}         // created lazily, closed by first cancel call
	children map[canceler]struct{} // set to nil by the first cancel call
	err      error                 // set to non-nil by the first cancel call
}

Глядя на рисунок ниже, основная ответственность WithCancel заключается в создании cancelCtx, монтировании к родительскому узлу, а затем возвращении функций cancelCtx и cancel() для запуска события отмены.

CancelCtx реализует собственные интерфейсы Done(), Err(), String(). Стоит отметить, что поле done создается лениво, создается при первом вызове Done() и возвращает канал, доступный только для чтения.

func (c *cancelCtx) Done() <-chan struct{} {
	c.mu.Lock()
	if c.done == nil {
		c.done = make(chan struct{})
	}
	d := c.done
	c.mu.Unlock()
	return d
}

func (c *cancelCtx) Err() error {
	c.mu.Lock()
	err := c.err
	c.mu.Unlock()
	return err
}

func (c *cancelCtx) String() string {
	return fmt.Sprintf("%v.WithCancel", c.Context)
}

Мало того, cancelCtx наиболее важно реализует метод cancel(). Основной рабочий процесс выглядит следующим образом:

  1. Установить отмененное сообщение об ошибке
  2. закрыть канал:c.done
  3. Рекурсивно закрыть все дочерние узлы
  4. удалить себя из родительского узла
  5. Наконец, сигнал отмены передается всем дочерним узлам путем закрытия канала.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
	if err == nil {
		panic("context: internal error: missing cancel error")
	}
	c.mu.Lock()
	// 已经被取消
	if c.err != nil {
		c.mu.Unlock()
		return 
	}
	// 设置 cancelCtx 错误信息
	c.err = err
	if c.done == nil {
		c.done = closedchan
	} else {
		close(c.done)
	}
	//  递归地取消所有子节点
	for child := range c.children {
		// NOTE: acquiring the child's lock while holding parent's lock.
		child.cancel(false, err)
	}
	// 清空子节点
	c.children = nil
	c.mu.Unlock()

	if removeFromParent {
		// 从父节点中移除自己 
		removeChild(c.Context, c)
	}
}

Есть еще одна ключевая функцияpropagateCancelнужно сосредоточиться на

func propagateCancel(parent Context, child canceler) {
	// 父节点是一个空节点,可以理解为本节点为根节点,不需要挂载
	if parent.Done() == nil {
		return // parent is never canceled
	}
	// 父节点是可取消类型的
	if p, ok := parentCancelCtx(parent); ok {
		p.mu.Lock()
		if p.err != nil {
			// parent has already been canceled
			// 父节点被取消了,本节点也需要取消
			child.cancel(false, p.err)
		} else {
			if p.children == nil {
				p.children = make(map[canceler]struct{})
			}
			// 挂载到父节点
			p.children[child] = struct{}{}
		}
		p.mu.Unlock()
	} else {
		// 为了兼容,Context 内嵌到一个类型里的情况发生
		go func() {
			select {
			case <-parent.Done():
				child.cancel(false, parent.Err())
			case <-child.Done():
			}
		}()
	}
}

Вот случай else в приведенном выше коде. Зачем вам нужно открывать горутину для отслеживания сигнала отмены? Давайте сначала рассмотрим первый случай:

case <-parent.Done():

Основная цель здесь состоит в том, чтобы избежать встраивания cancelCtx в тип в качестве анонимного поля, например:

type CancelContext struct {
    Context
}

В этот моментфункция parentCancelCtxНевозможно распознать, что CancelContext является ctx отменяемого типа.
Посмотрите на второй случай:

case <-child.Done():

Основная функция состоит в том, чтобы позволить оператору select нормально завершаться, когда дочерний узел отменяется, чтобы избежать утечки горутины.

С помощью следующего исходного кода parentCancelCtx мы определяем, что, как только родитель параметра является упакованным типом, parentCancelCtx не может правильно определить тип родителя.

func parentCancelCtx(parent Context) (*cancelCtx, bool) {
	for {
		switch c := parent.(type) {
		case *cancelCtx:
			return c, true
		case *timerCtx:
			return &c.cancelCtx, true
		case *valueCtx:
			parent = c.Context
		default:
			return nil, false
		}
	}
}

timerCtx

Из определения мы видим, что timerCtx реализован на основе cancelCtx с двумя дополнительными полями, таймером и крайним сроком. И timerCtx реализует собственный метод Deadline.

type timerCtx struct {
	cancelCtx
	timer *time.Timer // Under cancelCtx.mu.

	deadline time.Time
}

func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
	return c.deadline, true
}

Итак, мы смотрим непосредственно на основную функцию WithDeadline.

func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
	// 判断父节点是否超时,(非timeCtx类型的Deadline()直接return的)
	// 如果父节点的超时时间比当前节点早,直接返回cancalCtx即可
	// 因为父节点超时会自动调用cancel,子节点随之取消,所以不需要单独处理子节点的超时问题
	if cur, ok := parent.Deadline(); ok && cur.Before(d) {
		// The current deadline is already sooner than the new one.
		return WithCancel(parent)
	}
	c := &timerCtx{
		cancelCtx: newCancelCtx(parent),
		deadline:  d,
	}
	propagateCancel(parent, c)
	dur := time.Until(d)
	// 直接取消
	if dur <= 0 {
		c.cancel(true, DeadlineExceeded) // deadline has already passed
		return c, func() { c.cancel(false, Canceled) }
	}
	c.mu.Lock()
	defer c.mu.Unlock()
	// 核心代码在这,如果当前节点没被取消,则通过time.AfterFunc在dur时间后调用cancel函数,自动取消。
	if c.err == nil {
		c.timer = time.AfterFunc(dur, func() {
			c.cancel(true, DeadlineExceeded)
		})
	}
	return c, func() { c.cancel(true, Canceled) }
}
// 基于WithDeadline封装实现
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
	return WithDeadline(parent, time.Now().Add(timeout))
}

valueCtx

Исходный код относительно прост.Контекст используется как анонимное поле для реализации интерфейса связанного списка типа, который передается слой за слоем.Для получения значения в основном проверяйте метод Value.Он будет судить о наличии слой за слоем, пока не найдет контекст верхнего уровня. Поэтому здесь тоже есть на что обратить внимание, то есть значение ключа дочернего узла перезапишет значение родительского узла, поэтому особое внимание нужно уделить при именовании значения ключа.

func WithValue(parent Context, key, val interface{}) Context {
	if key == nil {
		panic("nil key")
	}
	if !reflect.TypeOf(key).Comparable() {
		panic("key is not comparable")
	}
	return &valueCtx{parent, key, val}
}

// A valueCtx carries a key-value pair. It implements Value for that key and
// delegates all other calls to the embedded Context.
type valueCtx struct {
	Context
	key, val interface{}
}

func (c *valueCtx) String() string {
	return fmt.Sprintf("%v.WithValue(%#v, %#v)", c.Context, c.key, c.val)
}

func (c *valueCtx) Value(key interface{}) interface{} {
	if c.key == key {
		return c.val
	}
	return c.Context.Value(key)
}

Разобрать

Метод Done возвращает

Заканчивать

Добро пожаловать, чтобы следовать за мнойGithub.

Ссылаться на

Go Concurrency Patterns: Context
Go context
Глубокая расшифровка контекста языка Go