Ограничитель тока для высокой доступности распределенных систем (реализация версии Go)

задняя часть Go распределенный

Текущий ограничитель, как следует из названия, используется для ограничения потока одновременных запросов с большим количеством запросов.

Текущее ограничение включает текущее ограничение на уровне Nginx и текущее ограничение логики бизнес-кода. Ограничения трафика используются во многих микросервисах и сервисных сетках. Существует три основных алгоритма ограничения тока: семафор, алгоритм дырявого ведра и алгоритм ведра с маркером. Три алгоритма описаны по очереди ниже.

Все примеры авторских программ в этой статье реализованы на языке Go.

1. Описание проблемы

Быстрое увеличение числа пользователей, злонамеренные атаки, такие как популярные службы или поисковые роботы, вызывают внезапное увеличение количества запросов.Например, в системе управления образованием школы в день проверки результатов объем запросов увеличился до более чем 100 раз предыдущего, и интерфейс скоро станет почти непригодным для использования, и вызовет цепную реакцию, которая приведет к краху всей системы. Как справиться с этой ситуацией? Жизнь дала нам ответ: например, электрические выключатели старого образца снабжены предохранителями, стоит кому-то использовать сверхмощное оборудование, как предохранитель перегорает, чтобы защитить различные электроприборы от перегорания сильными токами. Точно так же наши интерфейсы также должны быть установлены с «предохранителями», чтобы предотвратить паралич системы, вызванный чрезмерной нагрузкой на систему неожиданными запросами.Когда трафик слишком велик, могут быть приняты такие механизмы, как отклонение или слив.

Из-за различий и сложности каждого бизнеса каждая серверная служба может иметь одно узкое место при развертывании контейнеров. Превышение узкого места приведет к узким местам памяти или процессора, что приведет к недоступности службы или непосредственному зависанию одного контейнера или перезагрузке.

2. Ограничение тока семафора

сигналВо многих языках разработки есть родственные конструкции семафоров. Например, Semaphore в Java — это счетный семафор. Он часто используется для ограничения количества потоков, которые получают ресурс, и может быть реализован на основе параллельного пакета Java.

Двумя важными методами семафора являются Acquire() и Release(). Разрешения приобретаются с помощью методаAcquire(), который блокируется до тех пор, пока разрешение не будет получено. Освободите лицензию с помощью метода release().

Прочитав некоторые реализации языков с открытым исходным кодом, автор пришел к выводу, что существует два основных типа семафоров: неблокирующие и блокирующие.

2.1 Метод блокировки

Используя блокировки или блокирующие очереди, язык Go используется в качестве примера следующим образом:

// 采用channel作为底层数据结构,从而达到阻塞的获取和使用信号量
type Semaphore struct {
	innerChan chan struct{}
}
// 初始化信号量,本质初始化一个channel,channel的初始化大小为 信号量数值
func NewSemaphore(num uint64) *Semaphore {
	return &Semaphore{
		innerChan: make(chan struct{}, num),
	}
}
// 获取信号量,本质是 向channel放入元素,如果同时有很多协程并发获取信号量,则channel则会full阻塞,从而达到控制并发协程数的目的,也即是信号量的控制
func (s *Semaphore) Acquire() {
	for {
		select {
		case s.innerChan <- struct{}{}:
			return
		default:
			log.Error("semaphore acquire is blocking")
			time.Sleep(100 * time.Millisecond)
		}
	}
}
// 释放信号量 本质是 从channel中获取元素,由于有acquire的放入元素,所以此处一定能回去到元素 也就能释放成功,default只要是出于安全编程的目的
func (s *Semaphore) Release() {
	select {
	case <-s.innerChan:
		return
	default:
		return
	}
}

В реализации определяетсяSemaphoreструктура. Суть инициализации семафора заключается в инициализации канала, а инициализационный размер канала - это значение семафора, суть получения семафора в размещении элементов в канал.Если есть много сопрограмм, одновременно получающих семафор, канал будет полностью заблокирован, таким образом, для достижения цели контроля количества параллельных сопрограмм, то есть управления семафором, суть освобождения семафора заключается в получении элемента из канала. в элемент, он должен иметь возможность вернуться к элементу и успешно освободить его. , по умолчанию, если это необходимо для безопасного программирования.

2.2 Неблокирующий способ

Подсчет безопасным для параллелизма способом, таким как атомарное сложение и вычитание.

3. Алгоритм ограничения тока

Основные алгоритмы ограничения тока делятся на два алгоритма дырявого ведра и алгоритмы ведра токенов, Существует множество статей и документов по этим двум алгоритмам, которые дают подробные объяснения. В принципе, алгоритм ведра с токенами и алгоритм с дырявым ведром противоположны.进水,один漏水. Стоит отметить, что как Google Guava с открытым исходным кодом, так и Uber с открытым исходным кодом, ограничивающие текущие компоненты, используют алгоритм дырявого ведра.

3.1 Алгоритм дырявого ведра

Ведро (Leaky Bucket) алгоритм очень простая идея, вода (запрос) на слив в первое ведро, вода ведра с постоянной скоростью (скорость реагирует на интерфейс), скорость прилива при переливе воды через прямую сборку (частота доступа ответ выше скорости интерфейса), затем отклонить запрос. Видно, что алгоритм дырявого ведра может накладывать ограничения на скорость передачи данных. Схематическая диаграмма выглядит следующим образом:

Видно, что здесь есть две переменные: одна — размер ведра и сколько воды может храниться при увеличении трафика, а другая — размер уязвимости ведра (RATE).

Алгоритм дырявого ведра может быть реализован с использованием очереди Redis.Производитель проверяет, превышает ли длина очереди пороговое значение перед отправкой сообщения, и отбрасывает сообщение, если оно превышает пороговое значение.В противном случае сообщение отправляется в очередь Redis; потребитель извлекает сообщение из очереди Redis с фиксированной скоростью. Очередь Redis играет здесь роль буферного пула, а также сглаживания пиков и заполнения впадин, а также формирования трафика.

3.2 Алгоритм Token Bucket

Для многих сценариев приложений помимо возможности ограничения средней скорости передачи данных также требуется допускать некоторую степень пакетной передачи. В настоящее время алгоритм дырявого ведра может не подходить, а алгоритм ведра с токеном больше подходит. Принцип алгоритма ведра токенов заключается в том, что система будет помещать токены в ведро с постоянной скоростью, и если запрос необходимо обработать, то сначала нужно получить токен из ведра. ведро, затем отказ в обслуживании. Максимальное количество токенов, которое может храниться в корзине, — это допустимый объем пакетной передачи.

Действие по высвобождению токена непрерывное, если количество токенов в ведре достигает верхнего предела, токен будет сброшен, поэтому бывает такая ситуация, в ведре всегда большое количество доступных токенов, и входящий запрос может быть напрямую Получить токен и выполнить его.Например, установите qps равным 100. После инициализации ограничителя тока в течение одной секунды в ведре уже есть 100 токенов.Когда запуск завершен для предоставления внешних услуг, ограничитель тока может выдержать мгновенный 100 запрос. Следовательно, только когда в ведре нет токена, запрос будет ожидать, что эквивалентно выполнению с определенной скоростью.

Можно подготовить очередь для хранения токенов, а токены периодически генерируются и помещаются в очередь через пул потоков, для каждого запроса из очереди получается токен и выполнение продолжается.

3.3 Реализация алгоритма дырявого ведра

Итак, здесь я перехожу к делу и непосредственно показываю реализацию версии этого алгоритма на языке Go Код выглядит следующим образом:

// 此处截取自研的熔断器代码中的限流实现,这是非阻塞的实现
func (sp *servicePanel) incLimit() error {
	// 如果大于限制的条件则返回错误
	if sp.currentLimitCount.Load() > sp.currLimitFunc(nil) {
		return ErrCurrentLimit
	}
	sp.currentLimitCount.Inc()
	return nil
}

func (sp *servicePanel) clearLimit() {
	// 定期每秒重置计数器,从而达到每秒限制的并发数
	// 比如限制1000req/s,在这里指每秒清理1000的计数值
// 令牌桶是定期放,这里是逆思维,每秒清空,实现不仅占用内存低而且效率高
	t := time.NewTicker(time.Second)
	for {
		select {
		case <-t.C:
			sp.currentLimitCount.Store(0)
		}
	}
}

Вышеупомянутая реализация на самом деле является относительно грубой реализацией.Он не следует строго определенной фиксированной скорости для каждого запрашивающего, а использует секунды в качестве единицы, а счетчик очищается в грубой манере.Количество текущих ограничителей в секунду, хотя это кажется не соответствующим требованиям, на самом деле это просто двойное значение в данный момент, и нормальные системы должны справляться с запросами, которые удваивают количество текущих ограничителей в одно мгновение.

Улучшать

Если вы хотите строго следовать определенному фиксированному значению для каждого запроса, вы можете улучшить грубость времени следующим образом:

func (sp *servicePanel) incLimit() error {
	// 如果大于1则返回错误
	if sp.currentLimitCount.Load() > 1 {
		return ErrCurrentLimit
	}
	sp.currentLimitCount.Inc()
	return nil
}

func (sp *servicePanel) clearLimit() {
	// 1s除以每秒限流个数
	t := time.NewTicker(time.Second/time.Duration(sp.currLimitFunc(nil)))
	for {
		select {
		case <-t.C:
			sp.currentLimitCount.Store(0)
		}
	}
}

Читатели могут сами попробовать улучшенный алгоритм воронки.

4. Углубленный анализ реализации RateLimit с открытым исходным кодом Uber.

uber открыл исходный код набора языковых библиотек go ratelimit для текущего лимита сервиса на Github.Этот компонент реализован на основе Leaky Bucket.

4.1 Метод введения

#第一版本
go get github.com/uber-go/ratelimit@v0.1.0
#改进版本
go get github.com/uber-go/ratelimit@master

4.2 Использование

В первую очередь подчеркивается, что самое большое отличие от разработанного автором ограничителя тока заключается в том, что это компонент ограничения тока, который блокирует звонящего. Частота регулирования обычно выражается как скорость/с, то есть частота запросов в секунду. Без лишних слов, вот пример использования:

func ExampleRatelimit() {
	rl := ratelimit.New(100) // per second

	prev := time.Now()
	for i := 0; i < 10; i++ {
		now := rl.Take()
		if i > 0 {
			fmt.Println(i, now.Sub(prev))
		}
		prev = now
	}
}

Ожидаемый результат выглядит следующим образом:

	// Output:
	// 1 10ms
	// 2 10ms
	// 3 10ms
	// 4 10ms
	// 5 10ms
	// 6 10ms
	// 7 10ms
	// 8 10ms
	// 9 10ms

Результаты испытаний полностью соответствовали ожиданиям. В этом примере мы можем передавать 100 запросов в секунду с учетом ограничителя, что составляет в среднем 10 мс между каждым запросом. Таким образом, вы печатаете строку данных каждые 10 мс.

4.3 Детали реализации

Построить ограничитель

Первый - построить Limiter.В нем есть perRequest.Это ключевая переменная, указывающая время интервала между каждым запросом.Это основная идея алгоритма этого компонента, то есть ставить в очередь запрос, и есть скорость в течение одной секунды. Запросите, поставьте эти запросы в очередь и приходите один за другим. Интервал между каждым запросом составляет 1 с / скорость, что никогда не достигает концепции запросов скорости в течение 1 с, чтобы достичь цели ограничение тока.

// New returns a Limiter that will limit to the given RPS.
func New(rate int, opts ...Option) Limiter {
	l := &limiter{
		perRequest: time.Second / time.Duration(rate),
		maxSlack:   -10 * time.Second / time.Duration(rate),
	}
	for _, opt := range opts {
		opt(l)
	}
	if l.clock == nil {
		l.clock = clock.New()
	}
	return l
}
Метод блокировки Limiter Take()

Метод Take() Используется перед каждым запросом на получение утверждения Возвращает время момента утверждения.

Первая версия

// Take blocks to ensure that the time spent between multiple
// Take calls is on average time.Second/rate.
func (t *limiter) Take() time.Time {
	t.Lock()
	defer t.Unlock()

	now := t.clock.Now()

	// If this is our first request, then we allow it.
	if t.last.IsZero() {
		t.last = now
		return t.last
	}

	// sleepFor calculates how much time we should sleep based on
	// the perRequest budget and how long the last request took.
	// Since the request may take longer than the budget, this number
	// can get negative, and is summed across requests.
	t.sleepFor += t.perRequest - now.Sub(t.last)

	// We shouldn't allow sleepFor to get too negative, since it would mean that
	// a service that slowed down a lot for a short period of time would get
	// a much higher RPS following that.
	if t.sleepFor < t.maxSlack {
		t.sleepFor = t.maxSlack
	}

	// If sleepFor is positive, then we should sleep now.
	if t.sleepFor > 0 {
		t.clock.Sleep(t.sleepFor)
		t.last = now.Add(t.sleepFor)
		t.sleepFor = 0
	} else {
		t.last = now
	}

	return t.last
}

С точки зрения реализации видно, что первая версия принимает блокировку Go, а затем встает в очередь для сна.После завершения сна интервал между запросами является постоянным, и для достижения цель ограничения тока.

второе издание

// Take blocks to ensure that the time spent between multiple
// Take calls is on average time.Second/rate.
func (t *limiter) Take() time.Time {
	newState := state{}
	taken := false
	for !taken {
		now := t.clock.Now()

		previousStatePointer := atomic.LoadPointer(&t.state)
		oldState := (*state)(previousStatePointer)

		newState = state{}
		newState.last = now

		// If this is our first request, then we allow it.
		if oldState.last.IsZero() {
			taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
			continue
		}

		// sleepFor calculates how much time we should sleep based on
		// the perRequest budget and how long the last request took.
		// Since the request may take longer than the budget, this number
		// can get negative, and is summed across requests.
		newState.sleepFor += t.perRequest - now.Sub(oldState.last)
		// We shouldn't allow sleepFor to get too negative, since it would mean that
		// a service that slowed down a lot for a short period of time would get
		// a much higher RPS following that.
		if newState.sleepFor < t.maxSlack {
			newState.sleepFor = t.maxSlack
		}
		if newState.sleepFor > 0 {
			newState.last = newState.last.Add(newState.sleepFor)
		}
		taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
	}
	t.clock.Sleep(newState.sleepFor)
	return newState.last
}

Вторая версия использует атомарную операцию + для операции вращения вместо операции блокировки, цель которой — уменьшить конкуренцию блокировок сопрограммы. Две версии, независимо от того, используют ли они блокировки или атомарные операции, по существу ставят запросы в очередь. В первой версии есть конкуренция за блокировку, а затем ставится в очередь на ожидание. Вторая версия избегает конкуренции за блокировку, но все сопрограммы могут быстро выйти из цикла for и затем все заснуть. на месте спать.

5. Резюме

Три мощных инструмента для обеспечения стабильной работы: переход на более раннюю версию автоматического выключателя, ограничение рабочего тока и имитация неисправности. В этой статье в основном объясняется общая стратегия обеспечения высокой доступности в распределенных системах: ограничение тока. Обычно существует три реализации ограничения тока: семафор (счетчик), дырявое ведро и ведро с маркерами. В этой статье реализован виджет ограничения тока на основе алгоритма дырявого ведра. Наконец, проанализируйте uber с открытым исходным кодомuber-go, вторая версия метода блокировки ограничителя Take() более дружелюбна к конфликтам блокировки сопрограммы.

Рекомендации по качеству книг

Добро пожаловать на покупку книги автора, которая теперь опубликована и включена в список:

Нелегко быть оригинальным, я надеюсь, что все это поддержат, и я с нетерпением жду общения и обучения с вами.

Ссылаться на

Ограничение тока в системе с высокой степенью параллелизма — алгоритм дырявого ведра и алгоритм ведра токена