Автор | Ю Ю руководитель проекта apache/dubbo-go
Автор этой статьи отвечает за проект apache/dubbo-go. Sentinel-go в настоящее время доступен в проекте dubbogo. Если вы хотите использовать его отдельно, вы можете обратиться к нему.Использование часового с dubbo-goЕсли у вас есть другие вопросы, войдите в сообщество dubbogo [Dingding Group 23331795] для связи.
Управляемое чтение: В этой статье в основном анализируется ПО промежуточного слоя для управления трафиком с открытым исходным кодом Sentinel от Alibaba Group, которое изначально поддерживает Java/Go/C++ и другие языки.В этой статье анализируется только его реализация на языке Go. Если иное не указано ниже, Sentinel относится к Sentinel-Go.
1 Базовые понятия Ресурс и правило
1.1 Resource
// ResourceType represents classification of the resources
type ResourceType int32
const (
ResTypeCommon ResourceType = iota
ResTypeWeb
ResTypeRPC
)
// TrafficType describes the traffic type: Inbound or Outbound
type TrafficType int32
const (
// Inbound represents the inbound traffic (e.g. provider)
Inbound TrafficType = iota
// Outbound represents the outbound traffic (e.g. consumer)
Outbound
)
// ResourceWrapper represents the invocation
type ResourceWrapper struct {
// global unique resource name
name string
// resource classification
classification ResourceType
// Inbound or Outbound
flowType TrafficType
}
Resource(ResourceWrapper) хранит сценарий приложения ResourceType и направление целевого управления потоком FlowType(TrafficType).
1.2 Entry
// EntryOptions represents the options of a Sentinel resource entry.
type EntryOptions struct {
resourceType base.ResourceType
entryType base.TrafficType
acquireCount uint32
slotChain *base.SlotChain
}
type EntryContext struct {
entry *SentinelEntry
// Use to calculate RT
startTime uint64
Resource *ResourceWrapper
StatNode StatNode
Input *SentinelInput
// the result of rule slots check
RuleCheckResult *TokenResult
}
type SentinelEntry struct {
res *ResourceWrapper
// one entry bounds with one context
ctx *EntryContext
sc *SlotChain
}
Объект Entry SentinelEntry связан с Resource (ResourceWrapper) и его набором правил управления потоком SlotChain. Каждый объект Entry имеет контекстную среду EntryContext, в которой хранятся некоторые параметры управления потоком и результаты оценки управления потоком, используемые при каждом обнаружении правила.
Примечательно,SentinelEntry.sc
значение исходит изEntryOptions.slotChain
,EntryOptions.slotChain
Сохраняет глобальный объект SlotChainapi/slot_chain.go:globalSlotChain
.
Что касается того, почемуSlotChain
, который представляет собой набор всех компонентов управления потоком, предоставляемых Sentinel. Можно просто думать, что каждый компонент управления потоком — это слот. Подробный анализ см.[3.5 SlotChain].
Sentinel Некоторые имена переменных и функций менее читаемы, напримерEntryOptions.acquireCount
Это действительно не может заставить людей воспринимать это буквально, я видел функциюcore/api.go:WithAcquireCount()
По комментариям понятно:EntryOptions.acquireCount
количество выполненных пакетных действий. Если в запросе RPC вызывается служебный интерфейс сервера, значение равно 1 [такжеEntryOptions.acquireCount
Значение по умолчанию], если вызываются три служебных интерфейса сервера, значение равно 3. Поэтому рекомендуется изменить имя наEntryOptions.batchCount
Лучше, учитывая принцип минимальных изменений, можно зарезервироватьcore/api.go:WithAcquireCount()
при добавлении функции с тем жеcore/api.go:WithBatchCount()
интерфейс. Связанные улучшения отправленыpr 263.
1.3 Rule
type TokenCalculateStrategy int32
const (
Direct TokenCalculateStrategy = iota
WarmUp
)
type ControlBehavior int32
const (
Reject ControlBehavior = iota
Throttling
)
// Rule describes the strategy of flow control, the flow control strategy is based on QPS statistic metric
type Rule struct {
// Resource represents the resource name.
Resource string `json:"resource"`
ControlBehavior ControlBehavior `json:"controlBehavior"`
// Threshold means the threshold during StatIntervalInMs
// If StatIntervalInMs is 1000(1 second), Threshold means QPS
Threshold float64 `json:"threshold"`
MaxQueueingTimeMs uint32 `json:"maxQueueingTimeMs"`
// StatIntervalInMs indicates the statistic interval and it's the optional setting for flow Rule.
// If user doesn't set StatIntervalInMs, that means using default metric statistic of resource.
// If the StatIntervalInMs user specifies can not reuse the global statistic of resource,
// sentinel will generate independent statistic structure for this rule.
StatIntervalInMs uint32 `json:"statIntervalInMs"`
}
Правило записывает пороговое значение оценки текущего ограничения Threshold of a Resource, продолжительность временного окна текущего ограничения StatIntervalInMs и действие ControlBehavior оценки после срабатывания текущего ограничения.
Ядром вышеизложенного является интерфейс Rule RuleCheckSlot, а StatSlot используется для подсчета текущих показателей самого Sentinel.
1.4 Flow
В текущей главе в основном анализируется текущее ограничение (ядро/поток) в управлении потоком и сортируется общий скелет дозорного в соответствии с процессом управления потоком.
1.4.1 TrafficShapingController
так называемыйTrafficShapingController
, как следует из названия, является контроллером формирования потока, который является конкретным реализатором управления потоком.
// core/flow/traffic_shaping.go
// TrafficShapingCalculator calculates the actual traffic shaping threshold
// based on the threshold of rule and the traffic shaping strategy.
type TrafficShapingCalculator interface {
CalculateAllowedTokens(acquireCount uint32, flag int32) float64
}
type DirectTrafficShapingCalculator struct {
threshold float64
}
func (d *DirectTrafficShapingCalculator) CalculateAllowedTokens(uint32, int32) float64 {
return d.threshold
}
TrafficShapingCalculator
Интерфейс используется для расчета верхнего предела текущего лимита.Если вы не используете функцию прогрева, то можете пропустить ее реализацию.Одна из ее сущностей, DirectTrafficShapingCalculator, возвращаетRule.Threshold
【Верхний предел ограничения тока, устанавливаемый пользователем】.
// TrafficShapingChecker performs checking according to current metrics and the traffic
// shaping strategy, then yield the token result.
type TrafficShapingChecker interface {
DoCheck(resStat base.StatNode, acquireCount uint32, threshold float64) *base.TokenResult
}
type RejectTrafficShapingChecker struct {
rule *Rule
}
func (d *RejectTrafficShapingChecker) DoCheck(resStat base.StatNode, acquireCount uint32, threshold float64) *base.TokenResult {
metricReadonlyStat := d.BoundOwner().boundStat.readOnlyMetric
if metricReadonlyStat == nil {
return nil
}
curCount := float64(metricReadonlyStat.GetSum(base.MetricEventPass))
if curCount+float64(acquireCount) > threshold {
return base.NewTokenResultBlockedWithCause(base.BlockTypeFlow, "", d.rule, curCount)
}
return nil
}
RejectTrafficShapingChecker
в соответствии сRule.Threshold
Определить, превышает ли ресурс ограничение в текущем временном окне, и его текущий предельный результатTokenResultStatus
Это может быть только Pass или Blocked.
Sentinel Flow также имеет ограничитель постоянной скорости.ThrottlingChecker
, его цель состоит в том, чтобы разрешить выполнение запросов с одинаковой скоростью, разделить временное окно [например, 1 с] на более мелкие микровременные окна в соответствии с пороговым значением и выполнить запрос не более одного раза в каждом микровременном окне, и его текущий предельный результатTokenResultStatus
Это может быть только Pass, Blocked или Wait, и его соответствующие значения:
- Пройдено: если в микровременном окне нет ограничений, запрос пропускается;
- Ожидание: лимит превышен в пределах микровременного окна, и выполнение задерживается на определенное временное окно, в течение которого запрос должен ждать;
- Заблокировано: если лимит превышен в течение микровремени, а время ожидания превышает максимально допустимое время ожидания, установленное пользователем [Rule.MaxQueueingTimeMs], запрос отклоняется.
type TrafficShapingController struct {
flowCalculator TrafficShapingCalculator
flowChecker TrafficShapingChecker
rule *Rule
// boundStat is the statistic of current TrafficShapingController
boundStat standaloneStatistic
}
func (t *TrafficShapingController) PerformChecking(acquireCount uint32, flag int32) *base.TokenResult {
allowedTokens := t.flowCalculator.CalculateAllowedTokens(acquireCount, flag)
return t.flowChecker.DoCheck(resStat, acquireCount, allowedTokens)
}
существуетDirect + Reject
В сценарии ограничения тока эти три интерфейса на самом деле не имеют особого смысла, а их основные функцииTrafficShapingController.PerformChecking()
Основной процесс:
- 1 Получите значение метрики [curCount] текущего ресурса из TrafficShapingController.boundStat;
- 2 Если curCount + batchNum(acquireCount) > Rule.Threshold, пройти, в противном случае отклонить.
В сценарии ограничения токаTrafficShapingController
Значения четырех членов следующие:
- flowCalculator вычисляет верхний предел текущего предела;
- flowChecker выполняет действие проверки текущего лимита;
- правило хранит текущие ограничивающие правила;
- boundStat сохраняет результат проверки ограничения тока и параметры временного окна в качестве основы для определения следующего действия проверки ограничения тока.
1.4.2 TrafficControllerMap
При выполнении определения текущего лимита вам необходимо получить соответствующий ресурс в соответствии с именем Ресурса.TrafficShapingController
.
// TrafficControllerMap represents the map storage for TrafficShapingController.
type TrafficControllerMap map[string][]*TrafficShapingController
// core/flow/rule_manager.go
tcMap = make(TrafficControllerMap)
Глобальная частная переменная tcMap на уровне пакета хранит все правила, ключ — это имя ресурса, а значение — это TrafficShapingController, соответствующий ресурсу.
Функции пользовательского интерфейсаcore/flow/rule_manager.go:LoadRules()
Он создаст соответствующее правило в соответствии с правилом, определенным пользователем.TrafficShapingController
депозитtcMap
, этот интерфейс вызывает функциюgenerateStatFor(*Rule)
структураTrafficShapingController.boundStat
.
В сценариях ограничения тока функцияgenerateStatFor(*Rule)
Основной код выглядит следующим образом:
func generateStatFor(rule *Rule) (*standaloneStatistic, error) {
resNode = stat.GetOrCreateResourceNode(rule.Resource, base.ResTypeCommon)
// default case, use the resource's default statistic
readStat := resNode.DefaultMetric()
retStat.reuseResourceStat = true
retStat.readOnlyMetric = readStat
retStat.writeOnlyMetric = nil
return &retStat, nil
}
2 Metrics
Метрики Ресурса являются основой для суждения Правил.
2.1 Колесо атомного времени AtomicBucketWrapArray
Библиотека Sentinel богата функциями, но независимо от того, является ли она ограничением тока или слиянием, ее основой хранения является скользящее временное окно. Включены многочисленные оптимизации: например, длинные раунды без блокировки.
Существует много видов реализации скользящего окна, и алгоритм колеса времени является одной из относительно простых реализаций, и в алгоритме колеса времени могут быть реализованы различные методы ограничения тока. Общая блок-схема колеса времени выглядит следующим образом:
1 BucketWrap
Основной единицей колеса времени является ведро [временное окно].
// BucketWrap represent a slot to record metrics
// In order to reduce the usage of memory, BucketWrap don't hold length of BucketWrap
// The length of BucketWrap could be seen in LeapArray.
// The scope of time is [startTime, startTime+bucketLength)
// The size of BucketWrap is 24(8+16) bytes
type BucketWrap struct {
// The start timestamp of this statistic bucket wrapper.
BucketStart uint64
// The actual data structure to record the metrics (e.g. MetricBucket).
Value atomic.Value
}
Пополнить: Причина, по которой здесь используются указатели, заключается в том, чтоBucketWrap
на основеAtomicBucketWrapArray
будет несколькоsentinel
Используются компоненты управления потоком, и параметры управления потоком каждого компонента разные, например:
- 1
core/circuitbreaker/circuit_breaker.go:slowRtCircuitBreaker
в использованииslowRequestLeapArray
основные параметрыslowRequestCounter
// core/circuitbreaker/circuit_breaker.go
type slowRequestCounter struct {
slowCount uint64
totalCount uint64
}
- 2
core/circuitbreaker/circuit_breaker.go:errorRatioCircuitBreaker
в использованииerrorCounterLeapArray
основные параметрыerrorCounter
// core/circuitbreaker/circuit_breaker.go
type errorCounter struct {
errorCount uint64
totalCount uint64
}
1.1 MetricBucket
BucketWrap можно рассматривать как шаблон сегмента времени. Конкретным объектом сегмента является MetricsBucket, который определяется следующим образом:
// MetricBucket represents the entity to record metrics per minimum time unit (i.e. the bucket time span).
// Note that all operations of the MetricBucket are required to be thread-safe.
type MetricBucket struct {
// Value of statistic
counter [base.MetricEventTotal]int64
minRt int64
}
MetricBucket хранит пять типов метрик:
// There are five events to record
// pass + block == Total
const (
// sentinel rules check pass
MetricEventPass MetricEvent = iota
// sentinel rules check block
MetricEventBlock
MetricEventComplete
// Biz error, used for circuit breaker
MetricEventError
// request execute rt, unit is millisecond
MetricEventRt
// hack for the number of event
MetricEventTotal
)
2 AtomicBucketWrapArray
Каждое ведро записывает только свое время начала и значение метрики. Что касается общего значения длины временного окна каждого ведра, оно единообразно записывается в AtomicBucketWrapArray. AtomicBucketWrapArray определяется следующим образом:
// atomic BucketWrap array to resolve race condition
// AtomicBucketWrapArray can not append or delete element after initializing
type AtomicBucketWrapArray struct {
// The base address for real data array
base unsafe.Pointer
// The length of slice(array), it can not be modified.
length int
data []*BucketWrap
}
Значение AtomicBucketWrapArray.base — это первый указатель на область данных среза AtomicBucketWrapArray.data. Поскольку AtomicBucketWrapArray.data представляет собой срез фиксированной длины, AtomicBucketWrapArray.base напрямую сохраняет первый адрес области памяти данных для ускорения доступа.
Во-вторых, AtomicBucketWrapArray.data хранит указатель на BucketWrap, а не на BucketWrap.
Функция NewAtomicBucketWrapArrayWithTime() подготовится и сгенерирует все сегменты времени.
2.2 Колесо времени
1 leapArray
// Give a diagram to illustrate
// Suppose current time is 888, bucketLengthInMs is 200ms,
// intervalInMs is 1000ms, LeapArray will build the below windows
// B0 B1 B2 B3 B4
// |_______|_______|_______|_______|_______|
// 1000 1200 1400 1600 800 (1000)
// ^
// time=888
type LeapArray struct {
bucketLengthInMs uint32
sampleCount uint32
intervalInMs uint32
array *AtomicBucketWrapArray
// update lock
updateLock mutex
}
Анализ каждого члена LeapArray:
- BucketLengthInMs — длина дырявого ведра в миллисекундах;
- sampleCount — количество сегментов утечки времени;
- intervalInMs — длина временного окна в миллисекундах.
Диаграмма ASCII в своих комментариях хорошо объясняет значение каждого поля.
LeapArray
Основная функцияLeapArray.currentBucketOfTime()
, его функция состоит в том, чтобы получить соответствующий период времени в соответствии с определенным моментом времени.BucketWrap
, код показан ниже:
func (la *LeapArray) currentBucketOfTime(now uint64, bg BucketGenerator) (*BucketWrap, error) {
if now <= 0 {
return nil, errors.New("Current time is less than 0.")
}
idx := la.calculateTimeIdx(now)
bucketStart := calculateStartTime(now, la.bucketLengthInMs)
for { //spin to get the current BucketWrap
old := la.array.get(idx)
if old == nil {
// because la.array.data had initiated when new la.array
// theoretically, here is not reachable
newWrap := &BucketWrap{
BucketStart: bucketStart,
Value: atomic.Value{},
}
newWrap.Value.Store(bg.NewEmptyBucket())
if la.array.compareAndSet(idx, nil, newWrap) {
return newWrap, nil
} else {
runtime.Gosched()
}
} else if bucketStart == atomic.LoadUint64(&old.BucketStart) {
return old, nil
} else if bucketStart > atomic.LoadUint64(&old.BucketStart) {
// current time has been next cycle of LeapArray and LeapArray dont't count in last cycle.
// reset BucketWrap
if la.updateLock.TryLock() {
old = bg.ResetBucketTo(old, bucketStart)
la.updateLock.Unlock()
return old, nil
} else {
runtime.Gosched()
}
} else if bucketStart < atomic.LoadUint64(&old.BucketStart) {
// TODO: reserve for some special case (e.g. when occupying "future" buckets).
return nil, errors.New(fmt.Sprintf("Provided time timeMillis=%d is already behind old.BucketStart=%d.", bucketStart, old.BucketStart))
}
}
}
Основная логика цикла for:
- 1 Получить временную корзину, соответствующую моменту времени;
- 2 Если старое пусто, создайте новое ведро времени, попытайтесь сохранить его в колесе времени временного окна в атомарной операции и повторите попытку, если не удастся сохранить;
- 3 Если old — это период времени, в котором находится текущая точка времени, возврат;
- 4. Если старая начальная точка времени меньше текущего времени, попробуйте сбросить значения параметров, таких как время запуска ведра, через оптимистическую блокировку и вернуться, если обновление блокировки прошло успешно;
- 5 Если исходное время old больше, чем текущее время, система искажается по времени и возвращается ошибка.
2 BucketLeapArray
jumpArray реализует все объекты скользящего временного окна, а его внешний интерфейс — BucketLeapArray:
// The implementation of sliding window based on LeapArray (as the sliding window infrastructure)
// and MetricBucket (as the data type). The MetricBucket is used to record statistic
// metrics per minimum time unit (i.e. the bucket time span).
type BucketLeapArray struct {
data LeapArray
dataType string
}
Как видно из комментариев к этой структуре, сущностью ее временного окна BucketWrap является MetricBucket.
2.3 Чтение и запись метрических данных
SlidingWindowMetric
// SlidingWindowMetric represents the sliding window metric wrapper.
// It does not store any data and is the wrapper of BucketLeapArray to adapt to different internal bucket
// SlidingWindowMetric is used for SentinelRules and BucketLeapArray is used for monitor
// BucketLeapArray is per resource, and SlidingWindowMetric support only read operation.
type SlidingWindowMetric struct {
bucketLengthInMs uint32
sampleCount uint32
intervalInMs uint32
real *BucketLeapArray
}
SlidingWindowMetric является инкапсуляцией BucketLeapArray и предоставляет только интерфейс только для чтения.
ResourceNode
type BaseStatNode struct {
sampleCount uint32
intervalMs uint32
goroutineNum int32
arr *sbase.BucketLeapArray
metric *sbase.SlidingWindowMetric
}
type ResourceNode struct {
BaseStatNode
resourceName string
resourceType base.ResourceType
}
// core/stat/node_storage.go
type ResourceNodeMap map[string]*ResourceNode
var (
inboundNode = NewResourceNode(base.TotalInBoundResourceName, base.ResTypeCommon)
resNodeMap = make(ResourceNodeMap)
rnsMux = new(sync.RWMutex)
)
BaseStatNode предоставляет интерфейс чтения и записи, и его данные записываются в BaseStatNode.arr, а интерфейс чтения зависит от BaseStatNode.metric.BaseStatNode.arr
вNewBaseStatNode()
создано в, указательSlidingWindowMetric.real
также указать на это.
ResourceNode
Как следует из названия, он представляет ресурс и его хранилище метрик.ResourceNode.BaseStatNode
.
глобальная переменнаяresNodeMap
Данные метрик сохраняются для всех ресурсов.
3 Процесс ограничения тока
В этом разделе анализируется только самая основная функция формирования трафика, предоставляемая библиотекой Sentinel - ограничение тока.Существуют различные алгоритмы ограничения тока.Вы можете использовать его встроенные алгоритмы, а пользователи также могут их расширять.
Процесс ограничения тока состоит из трех этапов:
- 1 Создайте свой EntryContext для определенного Ресурса, сохраните его Метрики, текущее время начала ограничения и т. д. Sentinel называет его StatPrepareSlot;
- 2 В соответствии с текущим алгоритмом ограничения Ресурса определяется, следует ли его ограничивать, и выдается текущий результат суждения об ограничении Sentinel называет его RuleCheckSlot;
- Дополнение: этот текущий алгоритм ограничения представляет собой набор из серии методов суждения (SlotChain);
- 3 После оценки, в дополнение к выполнению пользователем соответствующего действия в соответствии с результатом оценки, Sentinel также должен выполнить свое собственное действие в соответствии с результатом оценки и сохранить время RT и другие индикаторы, используемые во всем процессе оценки, которые Sentinel вызывает StatSlot.
Общий процесс показан на следующем рисунке:
3.1 Slot
Для трех шагов проверки три соответствующих слота определяются следующим образом:
// StatPrepareSlot is responsible for some preparation before statistic
// For example: init structure and so on
type StatPrepareSlot interface {
// Prepare function do some initialization
// Such as: init statistic structure、node and etc
// The result of preparing would store in EntryContext
// All StatPrepareSlots execute in sequence
// Prepare function should not throw panic.
Prepare(ctx *EntryContext)
}
// RuleCheckSlot is rule based checking strategy
// All checking rule must implement this interface.
type RuleCheckSlot interface {
// Check function do some validation
// It can break off the slot pipeline
// Each TokenResult will return check result
// The upper logic will control pipeline according to SlotResult.
Check(ctx *EntryContext) *TokenResult
}
// StatSlot is responsible for counting all custom biz metrics.
// StatSlot would not handle any panic, and pass up all panic to slot chain
type StatSlot interface {
// OnEntryPass function will be invoked when StatPrepareSlots and RuleCheckSlots execute pass
// StatSlots will do some statistic logic, such as QPS、log、etc
OnEntryPassed(ctx *EntryContext)
// OnEntryBlocked function will be invoked when StatPrepareSlots and RuleCheckSlots fail to execute
// It may be inbound flow control or outbound cir
// StatSlots will do some statistic logic, such as QPS、log、etc
// blockError introduce the block detail
OnEntryBlocked(ctx *EntryContext, blockError *BlockError)
// OnCompleted function will be invoked when chain exits.
// The semantics of OnCompleted is the entry passed and completed
// Note: blocked entry will not call this function
OnCompleted(ctx *EntryContext)
}
Отказавшись от Prepare и Stat, можно просто подумать, что так называемый слот — это компонент управления потоком, предоставляемый sentinel.
Стоит отметить, что согласно аннотации StatSlot.OnCompleted будет выполняться только при прохождении RuleCheckSlot.Check, который используется для расчета таких метрик, как RT, используемых от начала запроса до конца.
3.2 Prepare
// core/base/slot_chain.go
// StatPrepareSlot is responsible for some preparation before statistic
// For example: init structure and so on
type StatPrepareSlot interface {
// Prepare function do some initialization
// Such as: init statistic structure、node and etc
// The result of preparing would store in EntryContext
// All StatPrepareSlots execute in sequence
// Prepare function should not throw panic.
Prepare(ctx *EntryContext)
}
// core/stat/stat_prepare_slot.go
type ResourceNodePrepareSlot struct {
}
func (s *ResourceNodePrepareSlot) Prepare(ctx *base.EntryContext) {
node := GetOrCreateResourceNode(ctx.Resource.Name(), ctx.Resource.Classification())
// Set the resource node to the context.
ctx.StatNode = node
}
Как объяснялось ранее, Prepare в основном создает ResourceNode, используемый для хранения метрик ресурсов. Все StatNode ресурса будут храниться в глобальной переменной на уровне пакета.core/stat/node_storage.go:resNodeMap [type: map[string]*ResourceNode]
в функцииGetOrCreateResourceNode
Используется для преобразования имени ресурсаresNodeMap
Получите соответствующий StatNode из , если он не существует, создайте StatNode и сохраните его вresNodeMap
.
3.3 Check
Процесс выполнения RuleCheckSlot.Check():
- 1 Получить все его коллекции правил в соответствии с именем ресурса;
- 2 Просмотрите набор правил и поочередно выполните проверку ресурса.Если какое-либо правило определяет, что ресурс необходимо ограничить [заблокировано], он будет возвращен, в противном случае он будет освобожден.
type Slot struct {
}
func (s *Slot) Check(ctx *base.EntryContext) *base.TokenResult {
res := ctx.Resource.Name()
tcs := getTrafficControllerListFor(res)
result := ctx.RuleCheckResult
// Check rules in order
for _, tc := range tcs {
r := canPassCheck(tc, ctx.StatNode, ctx.Input.AcquireCount)
if r == nil {
// nil means pass
continue
}
if r.Status() == base.ResultStatusBlocked {
return r
}
if r.Status() == base.ResultStatusShouldWait {
if waitMs := r.WaitMs(); waitMs > 0 {
// Handle waiting action.
time.Sleep(time.Duration(waitMs) * time.Millisecond)
}
continue
}
}
return result
}
func canPassCheck(tc *TrafficShapingController, node base.StatNode, acquireCount uint32) *base.TokenResult {
return canPassCheckWithFlag(tc, node, acquireCount, 0)
}
func canPassCheckWithFlag(tc *TrafficShapingController, node base.StatNode, acquireCount uint32, flag int32) *base.TokenResult {
return checkInLocal(tc, node, acquireCount, flag)
}
func checkInLocal(tc *TrafficShapingController, resStat base.StatNode, acquireCount uint32, flag int32) *base.TokenResult {
return tc.PerformChecking(resStat, acquireCount, flag)
}
3.4 Exit
После того, как часовой проверит Ресурс, его последующая логическая последовательность выполнения будет следующей:
- 1 Если RuleCheckSlot.Check() определяет, что проход пройден, выполняется StatSlot.OnEntryPassed(), в противном случае RuleCheckSlot.Check() принимает решение об отклонении и выполняет StatSlot.OnEntryBlocked();
- 2 Если RuleCheckSlot.Check() определяет, что проход пройден, выполните это действие;
- 3 Если RuleCheckSlot.Check() определяет, что проход пройден, выполните SentinelEntry.Exit() --> SlotChain.ext() --> StatSlot.OnCompleted().
Ссылка вызова третьего шага выглядит следующим образом:
StatSlot.OnCompleted()
// core/flow/standalone_stat_slot.go
type StandaloneStatSlot struct {
}
func (s StandaloneStatSlot) OnEntryPassed(ctx *base.EntryContext) {
res := ctx.Resource.Name()
for _, tc := range getTrafficControllerListFor(res) {
if !tc.boundStat.reuseResourceStat {
if tc.boundStat.writeOnlyMetric != nil {
tc.boundStat.writeOnlyMetric.AddCount(base.MetricEventPass, int64(ctx.Input.AcquireCount))
}
}
}
}
func (s StandaloneStatSlot) OnEntryBlocked(ctx *base.EntryContext, blockError *base.BlockError) {
// Do nothing
}
func (s StandaloneStatSlot) OnCompleted(ctx *base.EntryContext) {
// Do nothing
}
SlotChain.exit()
// core/base/slot_chain.go
type SlotChain struct {
}
func (sc *SlotChain) exit(ctx *EntryContext) {
// The OnCompleted is called only when entry passed
if ctx.IsBlocked() {
return
}
for _, s := range sc.stats {
s.OnCompleted(ctx)
}
}
SentinelEntry.Exit()
// core/base/entry.go
type SentinelEntry struct {
sc *SlotChain
exitCtl sync.Once
}
func (e *SentinelEntry) Exit() {
e.exitCtl.Do(func() {
if e.sc != nil {
e.sc.exit(ctx)
}
})
}
Как видно из приведенного выше исполнения,StatSlot.OnCompleted()
Он вызывается после завершения действия (например, вызова запроса-ответа RPC). Если некоторым компонентам необходимо рассчитать время действия и потреблять RT, это находится в соответствующемStatSlot.OnCompleted()
Китайская основаEntryContext.startTime
Завершение трудоемких расчетов.
3.5 SlotChain
Sentinel — это, по сути, пакет управления потоком, который не только обеспечивает функции ограничения тока, но также предоставляет множество других функциональных компонентов управления потоком, таких как адаптивная защита трафика, понижение уровня автоматического выключателя, холодный запуск, результаты глобальных показателей трафика и т. д. Пакет Sentinel-Go определяет аSlotChain
Сущность хранит все свои компоненты управления потоком.
// core/base/slot_chain.go
// SlotChain hold all system slots and customized slot.
// SlotChain support plug-in slots developed by developer.
type SlotChain struct {
statPres []StatPrepareSlot
ruleChecks []RuleCheckSlot
stats []StatSlot
}
// The entrance of slot chain
// Return the TokenResult and nil if internal panic.
func (sc *SlotChain) Entry(ctx *EntryContext) *TokenResult {
// execute prepare slot
sps := sc.statPres
if len(sps) > 0 {
for _, s := range sps {
s.Prepare(ctx)
}
}
// execute rule based checking slot
rcs := sc.ruleChecks
var ruleCheckRet *TokenResult
if len(rcs) > 0 {
for _, s := range rcs {
sr := s.Check(ctx)
if sr == nil {
// nil equals to check pass
continue
}
// check slot result
if sr.IsBlocked() {
ruleCheckRet = sr
break
}
}
}
if ruleCheckRet == nil {
ctx.RuleCheckResult.ResetToPass()
} else {
ctx.RuleCheckResult = ruleCheckRet
}
// execute statistic slot
ss := sc.stats
ruleCheckRet = ctx.RuleCheckResult
if len(ss) > 0 {
for _, s := range ss {
// indicate the result of rule based checking slot.
if !ruleCheckRet.IsBlocked() {
s.OnEntryPassed(ctx)
} else {
// The block error should not be nil.
s.OnEntryBlocked(ctx, ruleCheckRet.blockErr)
}
}
}
return ruleCheckRet
}
func (sc *SlotChain) exit(ctx *EntryContext) {
if ctx == nil || ctx.Entry() == nil {
logging.Error(errors.New("nil EntryContext or SentinelEntry"), "")
return
}
// The OnCompleted is called only when entry passed
if ctx.IsBlocked() {
return
}
for _, s := range sc.stats {
s.OnCompleted(ctx)
}
// relieve the context here
}
предположение: пакет Sentinel не может определить, какой компонент он использует для ресурса, и будет выполнять правила всех компонентов по очереди для EntryContext ресурса во время выполнения. Почему Sentinel-golang не предоставляет пользователям интерфейс для установки используемых ими компонентов управления потоком для сокращения следующих функцийSlotChain.Entry()
выполнить вRuleCheckSlot.Check()
количество казней? Связанные улучшения были представленыpr 264[Кроме того, был слит код. По словам ответственного лица, общая эффективность sentinel-go увеличилась на 15% после стресс-теста.]
globalSlotChain
Sentinel-Go определяет глобальную приватную переменную на уровне пакета SlotChain.globalSlotChain
Используется для хранения всех объектов компонентов управления потоком. Соответствующий пример кода выглядит следующим образом. Поскольку эта статья посвящена только токоограничивающим компонентам, ниже приводится только регистрационный код токоограничивающих компонентов.
// api/slot_chain.go
func BuildDefaultSlotChain() *base.SlotChain {
sc := base.NewSlotChain()
sc.AddStatPrepareSlotLast(&stat.ResourceNodePrepareSlot{})
sc.AddRuleCheckSlotLast(&flow.Slot{})
sc.AddStatSlotLast(&flow.StandaloneStatSlot{})
return sc
}
var globalSlotChain = BuildDefaultSlotChain()
Entry
Самая важная функция входа вне Sentinel-Goapi/api.go:Entry()
середина,globalSlotChain
Будет использоваться как параметр SlotChain для EntryOptions.
// api/api.go
// Entry is the basic API of Sentinel.
func Entry(resource string, opts ...EntryOption) (*base.SentinelEntry, *base.BlockError) {
options := entryOptsPool.Get().(*EntryOptions)
options.slotChain = globalSlotChain
return entry(resource, options)
}
Эволюция Sentinel неотделима от вклада сообщества. Версия Sentinel Go 1.0 GA будет выпущена в ближайшем будущем и принесет больше облачных функций. Мы очень приветствуем заинтересованных разработчиков, которые вносят свой вклад и вместе возглавят эволюцию будущих версий. Мы приветствуем вклад любого рода, включая, помимо прочего:
• исправление ошибок • новые функции/улучшения • панель приборов • документ/веб-сайт • тестовые примеры
Разработчики могут выбирать интересные проблемы из списка хороших первых проблем на GitHub, чтобы участвовать в обсуждениях и вносить свой вклад. Мы сосредоточимся на разработчиках, которые активно участвуют во вкладах, а основные участники будут назначены коммиттерами, чтобы вместе вести развитие сообщества. Мы также приветствуем любые вопросы и предложения, вы можете общаться через выпуск GitHub, группу Gitter или Dingding (номер группы: 30150716) и другие каналы. Теперь приступайте к взлому!
• Репозиторий Sentinel Go:GitHub.com/Alibaba/Сен…• Корпоративным пользователям предлагается зарегистрироваться:GitHub.com/alibaba/s ru…
об авторе
Ю Ю (github @AlexStocks), руководитель проекта apache/dubbo-go, программист с более чем десятилетним опытом работы на передовой в области исследований и разработок серверной инфраструктуры, в настоящее время работает над оркестровкой контейнеров и сервисной сеткой в Trusted Native Department of Ant Financial. Я люблю открытый исходный код.С тех пор, как я добавил код для Redis в 2015 году, я последовательно улучшал известные проекты, такие как Muduo/Pika/Dubbo/Dubbo-go.
"Облачная нативная платформа AlibabaСосредоточьтесь на микросервисах, бессерверных технологиях, контейнерах, Service Mesh и других технических областях, сосредоточьтесь на популярных тенденциях облачных технологий и практиках крупномасштабного внедрения облачных технологий, а также станьте официальной учетной записью самых облачных разработчиков. "