Али двойник 11 такой же, дорожный страж Sentinel go интерпретация исходного кода

облачный носитель

头图.png

Автор | Ю Ю руководитель проекта apache/dubbo-go

Автор этой статьи отвечает за проект apache/dubbo-go. Sentinel-go в настоящее время доступен в проекте dubbogo. Если вы хотите использовать его отдельно, вы можете обратиться к нему.Использование часового с dubbo-goЕсли у вас есть другие вопросы, войдите в сообщество dubbogo [Dingding Group 23331795] для связи.

Управляемое чтение: В этой статье в основном анализируется ПО промежуточного слоя для управления трафиком с открытым исходным кодом Sentinel от Alibaba Group, которое изначально поддерживает Java/Go/C++ и другие языки.В этой статье анализируется только его реализация на языке Go. Если иное не указано ниже, Sentinel относится к Sentinel-Go.

1 Базовые понятия Ресурс и правило

1.1 Resource

	// ResourceType represents classification of the resources
	type ResourceType int32

	const (
		ResTypeCommon ResourceType = iota
		ResTypeWeb
		ResTypeRPC
	)

	// TrafficType describes the traffic type: Inbound or Outbound
	type TrafficType int32

	const (
		// Inbound represents the inbound traffic (e.g. provider)
		Inbound TrafficType = iota
		// Outbound represents the outbound traffic (e.g. consumer)
		Outbound
	)

	// ResourceWrapper represents the invocation
	type ResourceWrapper struct {
		// global unique resource name
		name string
		// resource classification
		classification ResourceType
		// Inbound or Outbound
		flowType TrafficType
	}

Resource(ResourceWrapper) хранит сценарий приложения ResourceType и направление целевого управления потоком FlowType(TrafficType).

1.2 Entry

	// EntryOptions represents the options of a Sentinel resource entry.
	type EntryOptions struct {
		resourceType base.ResourceType
		entryType    base.TrafficType
		acquireCount uint32
		slotChain    *base.SlotChain
	}

	type EntryContext struct {
		entry *SentinelEntry

		// Use to calculate RT
		startTime uint64

		Resource *ResourceWrapper
		StatNode StatNode

		Input *SentinelInput
		// the result of rule slots check
		RuleCheckResult *TokenResult
	}

	type SentinelEntry struct {
		res *ResourceWrapper
		// one entry bounds with one context
		ctx *EntryContext

		sc *SlotChain
	}

Объект Entry SentinelEntry связан с Resource (ResourceWrapper) и его набором правил управления потоком SlotChain. Каждый объект Entry имеет контекстную среду EntryContext, в которой хранятся некоторые параметры управления потоком и результаты оценки управления потоком, используемые при каждом обнаружении правила.

Примечательно,SentinelEntry.scзначение исходит изEntryOptions.slotChain,EntryOptions.slotChainСохраняет глобальный объект SlotChainapi/slot_chain.go:globalSlotChain.

Что касается того, почемуSlotChain, который представляет собой набор всех компонентов управления потоком, предоставляемых Sentinel. Можно просто думать, что каждый компонент управления потоком — это слот. Подробный анализ см.[3.5 SlotChain].

Sentinel Некоторые имена переменных и функций менее читаемы, напримерEntryOptions.acquireCountЭто действительно не может заставить людей воспринимать это буквально, я видел функциюcore/api.go:WithAcquireCount()По комментариям понятно:EntryOptions.acquireCountколичество выполненных пакетных действий. Если в запросе RPC вызывается служебный интерфейс сервера, значение равно 1 [такжеEntryOptions.acquireCountЗначение по умолчанию], если вызываются три служебных интерфейса сервера, значение равно 3. Поэтому рекомендуется изменить имя наEntryOptions.batchCountЛучше, учитывая принцип минимальных изменений, можно зарезервироватьcore/api.go:WithAcquireCount()при добавлении функции с тем жеcore/api.go:WithBatchCount()интерфейс. Связанные улучшения отправленыpr 263.

1.3 Rule

	type TokenCalculateStrategy int32
	const (
		Direct TokenCalculateStrategy = iota
		WarmUp
	)

	type ControlBehavior int32
	const (
		Reject ControlBehavior = iota
		Throttling
	)

	// Rule describes the strategy of flow control, the flow control strategy is based on QPS statistic metric
	type Rule struct {
		// Resource represents the resource name.
		Resource               string                 `json:"resource"`
		ControlBehavior        ControlBehavior        `json:"controlBehavior"`
		// Threshold means the threshold during StatIntervalInMs
		// If StatIntervalInMs is 1000(1 second), Threshold means QPS
		Threshold         float64          `json:"threshold"`
		MaxQueueingTimeMs uint32           `json:"maxQueueingTimeMs"`
		// StatIntervalInMs indicates the statistic interval and it's the optional setting for flow Rule.
		// If user doesn't set StatIntervalInMs, that means using default metric statistic of resource.
		// If the StatIntervalInMs user specifies can not reuse the global statistic of resource,
		// 		sentinel will generate independent statistic structure for this rule.
		StatIntervalInMs uint32 `json:"statIntervalInMs"`
	}

Правило записывает пороговое значение оценки текущего ограничения Threshold of a Resource, продолжительность временного окна текущего ограничения StatIntervalInMs и действие ControlBehavior оценки после срабатывания текущего ограничения.

Ядром вышеизложенного является интерфейс Rule RuleCheckSlot, а StatSlot используется для подсчета текущих показателей самого Sentinel.

1.4 Flow

В текущей главе в основном анализируется текущее ограничение (ядро/поток) в управлении потоком и сортируется общий скелет дозорного в соответствии с процессом управления потоком.

1.4.1 TrafficShapingController

так называемыйTrafficShapingController, как следует из названия, является контроллером формирования потока, который является конкретным реализатором управления потоком.

	// core/flow/traffic_shaping.go

	// TrafficShapingCalculator calculates the actual traffic shaping threshold
	// based on the threshold of rule and the traffic shaping strategy.
	type TrafficShapingCalculator interface {
		CalculateAllowedTokens(acquireCount uint32, flag int32) float64
	}

	type DirectTrafficShapingCalculator struct {
		threshold float64
	}

	func (d *DirectTrafficShapingCalculator) CalculateAllowedTokens(uint32, int32) float64 {
		return d.threshold
	}

TrafficShapingCalculatorИнтерфейс используется для расчета верхнего предела текущего лимита.Если вы не используете функцию прогрева, то можете пропустить ее реализацию.Одна из ее сущностей, DirectTrafficShapingCalculator, возвращаетRule.Threshold【Верхний предел ограничения тока, устанавливаемый пользователем】.

	// TrafficShapingChecker performs checking according to current metrics and the traffic
	// shaping strategy, then yield the token result.
	type TrafficShapingChecker interface {
		DoCheck(resStat base.StatNode, acquireCount uint32, threshold float64) *base.TokenResult
	}

	type RejectTrafficShapingChecker struct {
		rule  *Rule
	}

	func (d *RejectTrafficShapingChecker) DoCheck(resStat base.StatNode, acquireCount uint32, threshold float64) *base.TokenResult {
		metricReadonlyStat := d.BoundOwner().boundStat.readOnlyMetric
		if metricReadonlyStat == nil {
			return nil
		}
		curCount := float64(metricReadonlyStat.GetSum(base.MetricEventPass))
		if curCount+float64(acquireCount) > threshold {
			return base.NewTokenResultBlockedWithCause(base.BlockTypeFlow, "", d.rule, curCount)
		}
		return nil
	}

RejectTrafficShapingCheckerв соответствии сRule.ThresholdОпределить, превышает ли ресурс ограничение в текущем временном окне, и его текущий предельный результатTokenResultStatusЭто может быть только Pass или Blocked.

Sentinel Flow также имеет ограничитель постоянной скорости.ThrottlingChecker, его цель состоит в том, чтобы разрешить выполнение запросов с одинаковой скоростью, разделить временное окно [например, 1 с] на более мелкие микровременные окна в соответствии с пороговым значением и выполнить запрос не более одного раза в каждом микровременном окне, и его текущий предельный результатTokenResultStatusЭто может быть только Pass, Blocked или Wait, и его соответствующие значения:

  • Пройдено: если в микровременном окне нет ограничений, запрос пропускается;
  • Ожидание: лимит превышен в пределах микровременного окна, и выполнение задерживается на определенное временное окно, в течение которого запрос должен ждать;
  • Заблокировано: если лимит превышен в течение микровремени, а время ожидания превышает максимально допустимое время ожидания, установленное пользователем [Rule.MaxQueueingTimeMs], запрос отклоняется.
	type TrafficShapingController struct {
		flowCalculator TrafficShapingCalculator
		flowChecker    TrafficShapingChecker

		rule *Rule
		// boundStat is the statistic of current TrafficShapingController
		boundStat standaloneStatistic
	}

	func (t *TrafficShapingController) PerformChecking(acquireCount uint32, flag int32) *base.TokenResult {
		allowedTokens := t.flowCalculator.CalculateAllowedTokens(acquireCount, flag)
		return t.flowChecker.DoCheck(resStat, acquireCount, allowedTokens)
	}

существуетDirect + RejectВ сценарии ограничения тока эти три интерфейса на самом деле не имеют особого смысла, а их основные функцииTrafficShapingController.PerformChecking()Основной процесс:

  • 1 Получите значение метрики [curCount] текущего ресурса из TrafficShapingController.boundStat;
  • 2 Если curCount + batchNum(acquireCount) > Rule.Threshold, пройти, в противном случае отклонить.

В сценарии ограничения токаTrafficShapingControllerЗначения четырех членов следующие:

  • flowCalculator вычисляет верхний предел текущего предела;
  • flowChecker выполняет действие проверки текущего лимита;
  • правило хранит текущие ограничивающие правила;
  • boundStat сохраняет результат проверки ограничения тока и параметры временного окна в качестве основы для определения следующего действия проверки ограничения тока.

1.4.2 TrafficControllerMap

При выполнении определения текущего лимита вам необходимо получить соответствующий ресурс в соответствии с именем Ресурса.TrafficShapingController.

   // TrafficControllerMap represents the map storage for TrafficShapingController.
   type TrafficControllerMap map[string][]*TrafficShapingController
	// core/flow/rule_manager.go
	tcMap        = make(TrafficControllerMap)

Глобальная частная переменная tcMap на уровне пакета хранит все правила, ключ — это имя ресурса, а значение — это TrafficShapingController, соответствующий ресурсу.

Функции пользовательского интерфейсаcore/flow/rule_manager.go:LoadRules()Он создаст соответствующее правило в соответствии с правилом, определенным пользователем.TrafficShapingControllerдепозитtcMap, этот интерфейс вызывает функциюgenerateStatFor(*Rule)структураTrafficShapingController.boundStat.

В сценариях ограничения тока функцияgenerateStatFor(*Rule)Основной код выглядит следующим образом:

	func generateStatFor(rule *Rule) (*standaloneStatistic, error) {
		resNode = stat.GetOrCreateResourceNode(rule.Resource, base.ResTypeCommon)

		// default case, use the resource's default statistic
		readStat := resNode.DefaultMetric()
		retStat.reuseResourceStat = true
		retStat.readOnlyMetric = readStat
		retStat.writeOnlyMetric = nil
		return &retStat, nil
	}

2 Metrics

Метрики Ресурса являются основой для суждения Правил.

2.1 Колесо атомного времени AtomicBucketWrapArray

Библиотека Sentinel богата функциями, но независимо от того, является ли она ограничением тока или слиянием, ее основой хранения является скользящее временное окно. Включены многочисленные оптимизации: например, длинные раунды без блокировки.

Существует много видов реализации скользящего окна, и алгоритм колеса времени является одной из относительно простых реализаций, и в алгоритме колеса времени могут быть реализованы различные методы ограничения тока. Общая блок-схема колеса времени выглядит следующим образом:

1.png

1 BucketWrap

Основной единицей колеса времени является ведро [временное окно].

	// BucketWrap represent a slot to record metrics
	// In order to reduce the usage of memory, BucketWrap don't hold length of BucketWrap
	// The length of BucketWrap could be seen in LeapArray.
	// The scope of time is [startTime, startTime+bucketLength)
	// The size of BucketWrap is 24(8+16) bytes
	type BucketWrap struct {
		// The start timestamp of this statistic bucket wrapper.
		BucketStart uint64
		// The actual data structure to record the metrics (e.g. MetricBucket).
		Value atomic.Value
	}

Пополнить: Причина, по которой здесь используются указатели, заключается в том, чтоBucketWrapна основеAtomicBucketWrapArrayбудет несколькоsentinelИспользуются компоненты управления потоком, и параметры управления потоком каждого компонента разные, например:

  • 1 core/circuitbreaker/circuit_breaker.go:slowRtCircuitBreakerв использованииslowRequestLeapArrayосновные параметрыslowRequestCounter
      // core/circuitbreaker/circuit_breaker.go
	type slowRequestCounter struct {
		slowCount  uint64
		totalCount uint64
	}
  • 2 core/circuitbreaker/circuit_breaker.go:errorRatioCircuitBreakerв использованииerrorCounterLeapArrayосновные параметрыerrorCounter
    // core/circuitbreaker/circuit_breaker.go
	type errorCounter struct {
		errorCount uint64
		totalCount uint64
	}

1.1 MetricBucket

BucketWrap можно рассматривать как шаблон сегмента времени. Конкретным объектом сегмента является MetricsBucket, который определяется следующим образом:

	// MetricBucket represents the entity to record metrics per minimum time unit (i.e. the bucket time span).
	// Note that all operations of the MetricBucket are required to be thread-safe.
	type MetricBucket struct {
		// Value of statistic
		counter [base.MetricEventTotal]int64
		minRt   int64
	}

MetricBucket хранит пять типов метрик:

	// There are five events to record
	// pass + block == Total
	const (
		// sentinel rules check pass
		MetricEventPass MetricEvent = iota
		// sentinel rules check block
		MetricEventBlock

		MetricEventComplete
		// Biz error, used for circuit breaker
		MetricEventError
		// request execute rt, unit is millisecond
		MetricEventRt
		// hack for the number of event
		MetricEventTotal
	)

2 AtomicBucketWrapArray

Каждое ведро записывает только свое время начала и значение метрики. Что касается общего значения длины временного окна каждого ведра, оно единообразно записывается в AtomicBucketWrapArray. AtomicBucketWrapArray определяется следующим образом:

	// atomic BucketWrap array to resolve race condition
	// AtomicBucketWrapArray can not append or delete element after initializing
	type AtomicBucketWrapArray struct {
		// The base address for real data array
		base unsafe.Pointer
		// The length of slice(array), it can not be modified.
		length int
		data   []*BucketWrap
	}

Значение AtomicBucketWrapArray.base — это первый указатель на область данных среза AtomicBucketWrapArray.data. Поскольку AtomicBucketWrapArray.data представляет собой срез фиксированной длины, AtomicBucketWrapArray.base напрямую сохраняет первый адрес области памяти данных для ускорения доступа.

Во-вторых, AtomicBucketWrapArray.data хранит указатель на BucketWrap, а не на BucketWrap.

Функция NewAtomicBucketWrapArrayWithTime() подготовится и сгенерирует все сегменты времени.

2.2 Колесо времени

1 leapArray

	// Give a diagram to illustrate
	// Suppose current time is 888, bucketLengthInMs is 200ms,
	// intervalInMs is 1000ms, LeapArray will build the below windows
	//   B0       B1      B2     B3      B4
	//   |_______|_______|_______|_______|_______|
	//  1000    1200    1400    1600    800    (1000)
	//                                        ^
	//                                      time=888
	type LeapArray struct {
		bucketLengthInMs uint32
		sampleCount      uint32
		intervalInMs     uint32
		array            *AtomicBucketWrapArray
		// update lock
		updateLock mutex
	}

Анализ каждого члена LeapArray:

  • BucketLengthInMs — длина дырявого ведра в миллисекундах;
  • sampleCount — количество сегментов утечки времени;
  • intervalInMs — длина временного окна в миллисекундах.

Диаграмма ASCII в своих комментариях хорошо объясняет значение каждого поля.

LeapArrayОсновная функцияLeapArray.currentBucketOfTime(), его функция состоит в том, чтобы получить соответствующий период времени в соответствии с определенным моментом времени.BucketWrap, код показан ниже:

	func (la *LeapArray) currentBucketOfTime(now uint64, bg BucketGenerator) (*BucketWrap, error) {
		if now <= 0 {
			return nil, errors.New("Current time is less than 0.")
		}

		idx := la.calculateTimeIdx(now)
		bucketStart := calculateStartTime(now, la.bucketLengthInMs)

		for { //spin to get the current BucketWrap
			old := la.array.get(idx)
			if old == nil {
				// because la.array.data had initiated when new la.array
				// theoretically, here is not reachable
				newWrap := &BucketWrap{
					BucketStart: bucketStart,
					Value:       atomic.Value{},
				}
				newWrap.Value.Store(bg.NewEmptyBucket())
				if la.array.compareAndSet(idx, nil, newWrap) {
					return newWrap, nil
				} else {
					runtime.Gosched()
				}
			} else if bucketStart == atomic.LoadUint64(&old.BucketStart) {
				return old, nil
			} else if bucketStart > atomic.LoadUint64(&old.BucketStart) {
				// current time has been next cycle of LeapArray and LeapArray dont't count in last cycle.
				// reset BucketWrap
				if la.updateLock.TryLock() {
					old = bg.ResetBucketTo(old, bucketStart)
					la.updateLock.Unlock()
					return old, nil
				} else {
					runtime.Gosched()
				}
			} else if bucketStart < atomic.LoadUint64(&old.BucketStart) {
				// TODO: reserve for some special case (e.g. when occupying "future" buckets).
				return nil, errors.New(fmt.Sprintf("Provided time timeMillis=%d is already behind old.BucketStart=%d.", bucketStart, old.BucketStart))
			}
		}
	}

Основная логика цикла for:

  • 1 Получить временную корзину, соответствующую моменту времени;
  • 2 Если старое пусто, создайте новое ведро времени, попытайтесь сохранить его в колесе времени временного окна в атомарной операции и повторите попытку, если не удастся сохранить;
  • 3 Если old — это период времени, в котором находится текущая точка времени, возврат;
  • 4. Если старая начальная точка времени меньше текущего времени, попробуйте сбросить значения параметров, таких как время запуска ведра, через оптимистическую блокировку и вернуться, если обновление блокировки прошло успешно;
  • 5 Если исходное время old больше, чем текущее время, система искажается по времени и возвращается ошибка.

2 BucketLeapArray

jumpArray реализует все объекты скользящего временного окна, а его внешний интерфейс — BucketLeapArray:

	// The implementation of sliding window based on LeapArray (as the sliding window infrastructure)
	// and MetricBucket (as the data type). The MetricBucket is used to record statistic
	// metrics per minimum time unit (i.e. the bucket time span).
	type BucketLeapArray struct {
		data     LeapArray
		dataType string
	}

Как видно из комментариев к этой структуре, сущностью ее временного окна BucketWrap является MetricBucket.

2.3 Чтение и запись метрических данных

SlidingWindowMetric

	// SlidingWindowMetric represents the sliding window metric wrapper.
	// It does not store any data and is the wrapper of BucketLeapArray to adapt to different internal bucket
	// SlidingWindowMetric is used for SentinelRules and BucketLeapArray is used for monitor
	// BucketLeapArray is per resource, and SlidingWindowMetric support only read operation.
	type SlidingWindowMetric struct {
		bucketLengthInMs uint32
		sampleCount      uint32
		intervalInMs     uint32
		real             *BucketLeapArray
	}

SlidingWindowMetric является инкапсуляцией BucketLeapArray и предоставляет только интерфейс только для чтения.

ResourceNode

	type BaseStatNode struct {
		sampleCount uint32
		intervalMs  uint32

		goroutineNum int32

		arr    *sbase.BucketLeapArray
		metric *sbase.SlidingWindowMetric
	}

	type ResourceNode struct {
		BaseStatNode

		resourceName string
		resourceType base.ResourceType
	}

	// core/stat/node_storage.go
	type ResourceNodeMap map[string]*ResourceNode
	var (
		inboundNode = NewResourceNode(base.TotalInBoundResourceName, base.ResTypeCommon)

		resNodeMap = make(ResourceNodeMap)
		rnsMux     = new(sync.RWMutex)
	)

BaseStatNode предоставляет интерфейс чтения и записи, и его данные записываются в BaseStatNode.arr, а интерфейс чтения зависит от BaseStatNode.metric.BaseStatNode.arrвNewBaseStatNode()создано в, указательSlidingWindowMetric.realтакже указать на это.

ResourceNodeКак следует из названия, он представляет ресурс и его хранилище метрик.ResourceNode.BaseStatNode.

глобальная переменнаяresNodeMapДанные метрик сохраняются для всех ресурсов.

3 Процесс ограничения тока

В этом разделе анализируется только самая основная функция формирования трафика, предоставляемая библиотекой Sentinel - ограничение тока.Существуют различные алгоритмы ограничения тока.Вы можете использовать его встроенные алгоритмы, а пользователи также могут их расширять.

Процесс ограничения тока состоит из трех этапов:

  • 1 Создайте свой EntryContext для определенного Ресурса, сохраните его Метрики, текущее время начала ограничения и т. д. Sentinel называет его StatPrepareSlot;
  • 2 В соответствии с текущим алгоритмом ограничения Ресурса определяется, следует ли его ограничивать, и выдается текущий результат суждения об ограничении Sentinel называет его RuleCheckSlot;
    • Дополнение: этот текущий алгоритм ограничения представляет собой набор из серии методов суждения (SlotChain);
  • 3 После оценки, в дополнение к выполнению пользователем соответствующего действия в соответствии с результатом оценки, Sentinel также должен выполнить свое собственное действие в соответствии с результатом оценки и сохранить время RT и другие индикаторы, используемые во всем процессе оценки, которые Sentinel вызывает StatSlot.

Общий процесс показан на следующем рисунке:

2.png

3.1 Slot

Для трех шагов проверки три соответствующих слота определяются следующим образом:

	// StatPrepareSlot is responsible for some preparation before statistic
	// For example: init structure and so on
	type StatPrepareSlot interface {
		// Prepare function do some initialization
		// Such as: init statistic structure、node and etc
		// The result of preparing would store in EntryContext
		// All StatPrepareSlots execute in sequence
		// Prepare function should not throw panic.
		Prepare(ctx *EntryContext)
	}

	// RuleCheckSlot is rule based checking strategy
	// All checking rule must implement this interface.
	type RuleCheckSlot interface {
		// Check function do some validation
		// It can break off the slot pipeline
		// Each TokenResult will return check result
		// The upper logic will control pipeline according to SlotResult.
		Check(ctx *EntryContext) *TokenResult
	}

	// StatSlot is responsible for counting all custom biz metrics.
	// StatSlot would not handle any panic, and pass up all panic to slot chain
	type StatSlot interface {
		// OnEntryPass function will be invoked when StatPrepareSlots and RuleCheckSlots execute pass
		// StatSlots will do some statistic logic, such as QPS、log、etc
		OnEntryPassed(ctx *EntryContext)
		// OnEntryBlocked function will be invoked when StatPrepareSlots and RuleCheckSlots fail to execute
		// It may be inbound flow control or outbound cir
		// StatSlots will do some statistic logic, such as QPS、log、etc
		// blockError introduce the block detail
		OnEntryBlocked(ctx *EntryContext, blockError *BlockError)
		// OnCompleted function will be invoked when chain exits.
		// The semantics of OnCompleted is the entry passed and completed
		// Note: blocked entry will not call this function
		OnCompleted(ctx *EntryContext)
	}

Отказавшись от Prepare и Stat, можно просто подумать, что так называемый слот — это компонент управления потоком, предоставляемый sentinel.

Стоит отметить, что согласно аннотации StatSlot.OnCompleted будет выполняться только при прохождении RuleCheckSlot.Check, который используется для расчета таких метрик, как RT, используемых от начала запроса до конца.

3.2 Prepare

	// core/base/slot_chain.go
	// StatPrepareSlot is responsible for some preparation before statistic
	// For example: init structure and so on
	type StatPrepareSlot interface {
		// Prepare function do some initialization
		// Such as: init statistic structure、node and etc
		// The result of preparing would store in EntryContext
		// All StatPrepareSlots execute in sequence
		// Prepare function should not throw panic.
		Prepare(ctx *EntryContext)
	}

	// core/stat/stat_prepare_slot.go
	type ResourceNodePrepareSlot struct {
	}

	func (s *ResourceNodePrepareSlot) Prepare(ctx *base.EntryContext) {
		node := GetOrCreateResourceNode(ctx.Resource.Name(), ctx.Resource.Classification())
		// Set the resource node to the context.
		ctx.StatNode = node
	}

Как объяснялось ранее, Prepare в основном создает ResourceNode, используемый для хранения метрик ресурсов. Все StatNode ресурса будут храниться в глобальной переменной на уровне пакета.core/stat/node_storage.go:resNodeMap [type: map[string]*ResourceNode]в функцииGetOrCreateResourceNodeИспользуется для преобразования имени ресурсаresNodeMapПолучите соответствующий StatNode из , если он не существует, создайте StatNode и сохраните его вresNodeMap.

3.3 Check

Процесс выполнения RuleCheckSlot.Check():

  • 1 Получить все его коллекции правил в соответствии с именем ресурса;
  • 2 Просмотрите набор правил и поочередно выполните проверку ресурса.Если какое-либо правило определяет, что ресурс необходимо ограничить [заблокировано], он будет возвращен, в противном случае он будет освобожден.
	type Slot struct {
	}

	func (s *Slot) Check(ctx *base.EntryContext) *base.TokenResult {
		res := ctx.Resource.Name()
		tcs := getTrafficControllerListFor(res)
		result := ctx.RuleCheckResult

		// Check rules in order
		for _, tc := range tcs {
			r := canPassCheck(tc, ctx.StatNode, ctx.Input.AcquireCount)
			if r == nil {
				// nil means pass
				continue
			}
			if r.Status() == base.ResultStatusBlocked {
				return r
			}
			if r.Status() == base.ResultStatusShouldWait {
				if waitMs := r.WaitMs(); waitMs > 0 {
					// Handle waiting action.
					time.Sleep(time.Duration(waitMs) * time.Millisecond)
				}
				continue
			}
		}
		return result
	}

	func canPassCheck(tc *TrafficShapingController, node base.StatNode, acquireCount uint32) *base.TokenResult {
		return canPassCheckWithFlag(tc, node, acquireCount, 0)
	}

	func canPassCheckWithFlag(tc *TrafficShapingController, node base.StatNode, acquireCount uint32, flag int32) *base.TokenResult {
		return checkInLocal(tc, node, acquireCount, flag)
	}

	func checkInLocal(tc *TrafficShapingController, resStat base.StatNode, acquireCount uint32, flag int32) *base.TokenResult {
		return tc.PerformChecking(resStat, acquireCount, flag)
	}

3.4 Exit

После того, как часовой проверит Ресурс, его последующая логическая последовательность выполнения будет следующей:

  • 1 Если RuleCheckSlot.Check() определяет, что проход пройден, выполняется StatSlot.OnEntryPassed(), в противном случае RuleCheckSlot.Check() принимает решение об отклонении и выполняет StatSlot.OnEntryBlocked();
  • 2 Если RuleCheckSlot.Check() определяет, что проход пройден, выполните это действие;
  • 3 Если RuleCheckSlot.Check() определяет, что проход пройден, выполните SentinelEntry.Exit() --> SlotChain.ext() --> StatSlot.OnCompleted().

Ссылка вызова третьего шага выглядит следующим образом:

StatSlot.OnCompleted()

	// core/flow/standalone_stat_slot.go
	type StandaloneStatSlot struct {
	}

	func (s StandaloneStatSlot) OnEntryPassed(ctx *base.EntryContext) {
		res := ctx.Resource.Name()
		for _, tc := range getTrafficControllerListFor(res) {
			if !tc.boundStat.reuseResourceStat {
				if tc.boundStat.writeOnlyMetric != nil {
					tc.boundStat.writeOnlyMetric.AddCount(base.MetricEventPass, int64(ctx.Input.AcquireCount))
				}
			}
		}
	}

	func (s StandaloneStatSlot) OnEntryBlocked(ctx *base.EntryContext, blockError *base.BlockError) {
		// Do nothing
	}

	func (s StandaloneStatSlot) OnCompleted(ctx *base.EntryContext) {
		// Do nothing
	}

SlotChain.exit()

	// core/base/slot_chain.go
	type SlotChain struct {
	}

	func (sc *SlotChain) exit(ctx *EntryContext) {
		// The OnCompleted is called only when entry passed
		if ctx.IsBlocked() {
			return
		}
		for _, s := range sc.stats {
			s.OnCompleted(ctx)
		}
	}

SentinelEntry.Exit()

	// core/base/entry.go
	type SentinelEntry struct {
		sc *SlotChain
		exitCtl sync.Once
	}

	func (e *SentinelEntry) Exit() {
		e.exitCtl.Do(func() {
			if e.sc != nil {
				e.sc.exit(ctx)
			}
		})
	}

Как видно из приведенного выше исполнения,StatSlot.OnCompleted()Он вызывается после завершения действия (например, вызова запроса-ответа RPC). Если некоторым компонентам необходимо рассчитать время действия и потреблять RT, это находится в соответствующемStatSlot.OnCompleted()Китайская основаEntryContext.startTimeЗавершение трудоемких расчетов.

3.5 SlotChain

Sentinel — это, по сути, пакет управления потоком, который не только обеспечивает функции ограничения тока, но также предоставляет множество других функциональных компонентов управления потоком, таких как адаптивная защита трафика, понижение уровня автоматического выключателя, холодный запуск, результаты глобальных показателей трафика и т. д. Пакет Sentinel-Go определяет аSlotChainСущность хранит все свои компоненты управления потоком.

   // core/base/slot_chain.go

	// SlotChain hold all system slots and customized slot.
	// SlotChain support plug-in slots developed by developer.
	type SlotChain struct {
		statPres   []StatPrepareSlot
		ruleChecks []RuleCheckSlot
		stats      []StatSlot
	}

	// The entrance of slot chain
	// Return the TokenResult and nil if internal panic.
	func (sc *SlotChain) Entry(ctx *EntryContext) *TokenResult {
		// execute prepare slot
		sps := sc.statPres
		if len(sps) > 0 {
			for _, s := range sps {
				s.Prepare(ctx)
			}
		}

		// execute rule based checking slot
		rcs := sc.ruleChecks
		var ruleCheckRet *TokenResult
		if len(rcs) > 0 {
			for _, s := range rcs {
				sr := s.Check(ctx)
				if sr == nil {
					// nil equals to check pass
					continue
				}
				// check slot result
				if sr.IsBlocked() {
					ruleCheckRet = sr
					break
				}
			}
		}
		if ruleCheckRet == nil {
			ctx.RuleCheckResult.ResetToPass()
		} else {
			ctx.RuleCheckResult = ruleCheckRet
		}

		// execute statistic slot
		ss := sc.stats
		ruleCheckRet = ctx.RuleCheckResult
		if len(ss) > 0 {
			for _, s := range ss {
				// indicate the result of rule based checking slot.
				if !ruleCheckRet.IsBlocked() {
					s.OnEntryPassed(ctx)
				} else {
					// The block error should not be nil.
					s.OnEntryBlocked(ctx, ruleCheckRet.blockErr)
				}
			}
		}
		return ruleCheckRet
	}

	func (sc *SlotChain) exit(ctx *EntryContext) {
		if ctx == nil || ctx.Entry() == nil {
			logging.Error(errors.New("nil EntryContext or SentinelEntry"), "")
			return
		}
		// The OnCompleted is called only when entry passed
		if ctx.IsBlocked() {
			return
		}
		for _, s := range sc.stats {
			s.OnCompleted(ctx)
		}
		// relieve the context here
	}

предположение: пакет Sentinel не может определить, какой компонент он использует для ресурса, и будет выполнять правила всех компонентов по очереди для EntryContext ресурса во время выполнения. Почему Sentinel-golang не предоставляет пользователям интерфейс для установки используемых ими компонентов управления потоком для сокращения следующих функцийSlotChain.Entry()выполнить вRuleCheckSlot.Check()количество казней? Связанные улучшения были представленыpr 264[Кроме того, был слит код. По словам ответственного лица, общая эффективность sentinel-go увеличилась на 15% после стресс-теста.]

globalSlotChain

Sentinel-Go определяет глобальную приватную переменную на уровне пакета SlotChain.globalSlotChainИспользуется для хранения всех объектов компонентов управления потоком. Соответствующий пример кода выглядит следующим образом. Поскольку эта статья посвящена только токоограничивающим компонентам, ниже приводится только регистрационный код токоограничивающих компонентов.

   // api/slot_chain.go

	func BuildDefaultSlotChain() *base.SlotChain {
		sc := base.NewSlotChain()
		sc.AddStatPrepareSlotLast(&stat.ResourceNodePrepareSlot{})

		sc.AddRuleCheckSlotLast(&flow.Slot{})

		sc.AddStatSlotLast(&flow.StandaloneStatSlot{})

		return sc
	}

	var globalSlotChain = BuildDefaultSlotChain()

Entry

Самая важная функция входа вне Sentinel-Goapi/api.go:Entry()середина,globalSlotChainБудет использоваться как параметр SlotChain для EntryOptions.

	// api/api.go

	// Entry is the basic API of Sentinel.
	func Entry(resource string, opts ...EntryOption) (*base.SentinelEntry, *base.BlockError) {
		options := entryOptsPool.Get().(*EntryOptions)
		options.slotChain = globalSlotChain

		return entry(resource, options)
	}

Эволюция Sentinel неотделима от вклада сообщества. Версия Sentinel Go 1.0 GA будет выпущена в ближайшем будущем и принесет больше облачных функций. Мы очень приветствуем заинтересованных разработчиков, которые вносят свой вклад и вместе возглавят эволюцию будущих версий. Мы приветствуем вклад любого рода, включая, помимо прочего:

• исправление ошибок • новые функции/улучшения • панель приборов • документ/веб-сайт • тестовые примеры

Разработчики могут выбирать интересные проблемы из списка хороших первых проблем на GitHub, чтобы участвовать в обсуждениях и вносить свой вклад. Мы сосредоточимся на разработчиках, которые активно участвуют во вкладах, а основные участники будут назначены коммиттерами, чтобы вместе вести развитие сообщества. Мы также приветствуем любые вопросы и предложения, вы можете общаться через выпуск GitHub, группу Gitter или Dingding (номер группы: 30150716) и другие каналы. Теперь приступайте к взлому!

• Репозиторий Sentinel Go:GitHub.com/Alibaba/Сен…• Корпоративным пользователям предлагается зарегистрироваться:GitHub.com/alibaba/s ru…

об авторе

Ю Ю (github @AlexStocks), руководитель проекта apache/dubbo-go, программист с более чем десятилетним опытом работы на передовой в области исследований и разработок серверной инфраструктуры, в настоящее время работает над оркестровкой контейнеров и сервисной сеткой в ​​Trusted Native Department of Ant Financial. Я люблю открытый исходный код.С тех пор, как я добавил код для Redis в 2015 году, я последовательно улучшал известные проекты, такие как Muduo/Pika/Dubbo/Dubbo-go.

"Облачная нативная платформа AlibabaСосредоточьтесь на микросервисах, бессерверных технологиях, контейнерах, Service Mesh и других технических областях, сосредоточьтесь на популярных тенденциях облачных технологий и практиках крупномасштабного внедрения облачных технологий, а также станьте официальной учетной записью самых облачных разработчиков. "