[Перевод] Golang обрабатывает миллионы запросов в минуту

Go

оригинал:medium.com/Все в порядке/Корейский…

Я работаю с антикраулерами, антивирусами, антивредоносными программами уже 15 лет в разных компаниях и знаю, что со временем эти системы станут очень сложными из-за большого объема данных, которые нужно обрабатывать и обрабатывать с ежедневно.

В настоящее время яsmsjunk.comгенеральный директор иKnowBe4Главный архитектор обеих компаний активно работает в сфере кибербезопасности.

Самое смешное, что за последние 10 лет работы инженером-программистом почти каждый проект по разработке бэкенда, над которым я работал, был выполнен с использованием Ruby on Rails. Но не поймите меня неправильно, я люблю Ruby on Rails и думаю, что это отличная среда разработки, но когда вы какое-то время проектируете и разрабатываете системы на Ruby, вы склонны забывать, что вы также можете используйте преимущества многопоточности, распараллеливания, высокоскоростного выполнения и меньшего объема памяти для разработки системы. Я был разработчиком C/C++, Delphi и C# в течение многих лет, и я пришел к пониманию того, что правильно использовать правильные инструменты, чтобы сделать систему проще и понятнее.

Индустрия программирования никогда не останавливалась на языке программирования и обсуждении фреймворка, и я не хочу в этом участвовать. Я считаю, что эффективность высока, производительность и производительность кода имеют большую часть удобства обслуживания в зависимости от того, достаточно ли проста спроектированная вами архитектура.


проблема, которую нужно решить

Когда мы разрабатываем анонимную систему телеметрии и анализа данных, одним из требований является способность обрабатывать и справляться с миллионами запросов POST.Обработчик сетевых запросов будет получать POST через JSON, который будет содержать множество элементов, которые необходимо написать в Amazon Сбор данных из S3, чтобы наша система сокращения карт могла обрабатывать эти данные позже.

В общем, мы рассмотрим построение иерархической структуры воркеров и использование промежуточного программного обеспечения, такого как:

  • Sidekiq
  • Resque
  • DelayedJob
  • Elasticbeanstalk Worker Tier
  • RabbitMQ
  • и т.д..

Затем настройте два разных кластера, один для веб-клиента, а другой для рабочих процессов, а затем мы сможем масштабировать рабочие процессы до количества, необходимого для работы.

Но с самого начала наша команда понимала, что все это можно реализовать на Go, потому что во время обсуждения мы думали, что это будет очень высокотрафиковая система. Я использую Go для разработки в течение двух лет, и я использовал его для разработки некоторых систем, но масштаб нагрузки на этот раз намного меньше, чем спрос.

Давайте определим некоторыеstructЧтобы указать запрошенное тело нашего POST и определить один метод, загруженный в корзину S3UploadToS3

type PayloadCollection struct {
	WindowsVersion  string    `json:"version"`
	Token           string    `json:"token"`
	Payloads        []Payload `json:"data"`
}

type Payload struct {
    // [redacted]
}

func (p *Payload) UploadToS3() error {
	// the storageFolder method ensures that there are no name collision in
	// case we get same timestamp in the key name
	storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())

	bucket := S3Bucket

	b := new(bytes.Buffer)
	encodeErr := json.NewEncoder(b).Encode(payload)
	if encodeErr != nil {
		return encodeErr
	}

	// Everything we post to the S3 bucket should be marked 'private'
	var acl = s3.Private
	var contentType = "application/octet-stream"

	return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}

Используйте Go runtines наивно

В начале мы наивно реализовали метод ловушки POST следующим образом и просто поместили действие загрузки каждого тела запроса в процедуры Go и позволили им выполняться параллельно:

func payloadHandler(w http.ResponseWriter, r *http.Request) {

	if r.Method != "POST" {
		w.WriteHeader(http.StatusMethodNotAllowed)
		return
	}

	// Read the body into a string for json decoding
	var content = &PayloadCollection{}
	err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
	if err != nil {
		w.Header().Set("Content-Type", "application/json; charset=UTF-8")
		w.WriteHeader(http.StatusBadRequest)
		return
	}
	
	// Go through each payload and queue items individually to be posted to S3
	for _, payload := range content.Payloads {
		go payload.UploadToS3()   // <----- DON'T DO THIS
	}

	w.WriteHeader(http.StatusOK)
}

При умеренных нагрузках этот подход работал нормально для большинства людей, но мы быстро стали перегружены большими запросами. Когда мы развертываем эту версию кода в производственной среде, мы ожидаем, что будет поступать большое количество запросов, но на самом деле оно не может достигать порядка миллионов. Мы полностью недооценили объем трафика, который эта система собиралась обрабатывать.

Но в любом случае описанный выше метод неадекватен. Потому что у нас нет возможности контролировать количество запусков Go. Поэтому, когда наша система сталкивается с миллионами POST-запросов в минуту, она быстро выходит из строя.

бороться снова

Нам нужно найти другой путь. В самом начале мы обсуждали, как сохранить наши запросы обработчиков максимально короткими и загружать на S3, чтобы бежать на заднем плане или асинхронно. Конечно, в Ruby на рельсах вы должны сделать это, иначе вы заблокируете все другие обработчики сети. Используете ли вы Cougar, Unicorn или Passerby (пожалуйста, не участвуйте в JRuby обсуждений). Затем мы подумали об использовании более распространенного метода очереди сообщений для достижения нашей цели, таких как Resque, Sidekiq, SQS и т. Д., И бесчисленные инструменты, потому что существует слишком много способов достижения этой функции.

Итак, во второй итерации нам нужно создать буферную очередь, мы будем ставить задачи в очередь и загружать их в S3 одну за другой, но так как мы хотим достичь цели управления максимальной емкостью этой очереди, и мы у нас достаточно оперативной памяти, чтобы мы могли хранить тело запроса в памяти, поэтому мы думаем, что можем напрямую использовать канал, предоставляемый Go, а затем поставить наш запрос в очередь прямо в канал для обработки.

var Queue chan Payload

func init() {
   Queue = make(chan Payload, MAX_QUEUE)
}

func payloadHandler(w http.ResponseWriter, r *http.Request) {
   ...
   // Go through each payload and queue items individually to be posted to S3
   for _, payload := range content.Payloads {
       Queue <- payload
   }
   ...
}

Мы будем получать задания с канала и выполнять их загрузку

func StartProcessor() {
    for {
        select {
        case job := <-Queue:
            job.payload.UploadToS3()  // <-- STILL NOT GOOD
        }
    }
}

Но, честно говоря, я не знаю, что он делает. Должно быть, потому что было слишком поздно, и у нас было слишком много Red Bull. 😌😌

Это изменение никоим образом не улучшило наше затруднительное положение, мы поставили параллельные задачи в очередь для выполнения и просто решили проблему. Однако наша асинхронная программа будет загружать только одно тело запроса на S3 за раз, но количество наших запросов намного больше, чем количество, которое мы загружаем на S3 в это время. Вполне возможно, что наша буферная очередь скоро достигнет своего предела. ., то он блокирует постановку в очередь других сетевых запросов.

Как будто мы просто избежали проблемы и позволили нашей системе запустить обратный отсчет времени. После того, как наша версия этой ошибки была выпущена, скорость задержки всей системы продолжала увеличиваться с каждой минутой.

лучшее решение

Мы решили использовать совместный подход к улучшению наших каналов Go, создав систему обработки каналов с двумя каналами, один для постановки в очередь тела запроса и один для контроля.workerсуществуетJobQueueКоличество одновременных сред выполнения в .

Суть этой идеи заключается в том, чтобы загружать данные в S3 параллельно с относительно стабильной частотой, чтобы мы не сбивали наши серверы и не вызывали много ошибок соединения S3 из-за слишком большого количества соединений. Итак, мы начали с шаблона Job/Worker. Это знакомо тем, кто знаком с разработкой на Java и C#.Вы можете понять, что именно так Go использует каналы для реализации пулов рабочих потоков.

var (
	MaxWorker = os.Getenv("MAX_WORKERS")
	MaxQueue  = os.Getenv("MAX_QUEUE")
)

// Job represents the job to be run
type Job struct {
	Payload Payload
}

// A buffered channel that we can send work requests on.
var JobQueue chan Job

// Worker represents the worker that executes the job
type Worker struct {
	WorkerPool  chan chan Job
	JobChannel  chan Job
	quit    	chan bool
}

func NewWorker(workerPool chan chan Job) Worker {
	return Worker{
		WorkerPool: workerPool,
		JobChannel: make(chan Job),
		quit:       make(chan bool)}
}

// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
	go func() {
		for {
			// register the current worker into the worker queue.
			w.WorkerPool <- w.JobChannel

			select {
			case job := <-w.JobChannel:
				// we have received a work request.
				if err := job.Payload.UploadToS3(); err != nil {
					log.Errorf("Error uploading to S3: %s", err.Error())
				}

			case <-w.quit:
				// we have received a signal to stop
				return
			}
		}
	}()
}

// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
	go func() {
		w.quit <- true
	}()
}

Затем измените функцию ловушки нашего сетевого запроса, которая отвечает за создание экземпляра структуры Job и размещение его в канале JobQueue для ожидания выполнения исполнителем.

func payloadHandler(w http.ResponseWriter, r *http.Request) {

    if r.Method != "POST" {
		w.WriteHeader(http.StatusMethodNotAllowed)
		return
	}

    // Read the body into a string for json decoding
	var content = &PayloadCollection{}
	err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
    if err != nil {
		w.Header().Set("Content-Type", "application/json; charset=UTF-8")
		w.WriteHeader(http.StatusBadRequest)
		return
	}

    // Go through each payload and queue items individually to be posted to S3
    for _, payload := range content.Payloads {

        // let's create a job with the payload
        work := Job{Payload: payload}

        // Push the work onto the queue.
        JobQueue <- work
    }

    w.WriteHeader(http.StatusOK)
}

Создайте его, когда наш веб-сервис инициализируетсяDispatherи позвониRun()Создайте пул потоков с определенным количеством рабочих для получения и обработки сообщений отJobQueueизJob

dispatcher := NewDispatcher(MaxWorker) 
dispatcher.Run()

Ниже нашDispatherреализация

type Dispatcher struct {
	// A pool of workers channels that are registered with the dispatcher
	WorkerPool chan chan Job
}

func NewDispatcher(maxWorkers int) *Dispatcher {
	pool := make(chan chan Job, maxWorkers)
	return &Dispatcher{WorkerPool: pool}
}

func (d *Dispatcher) Run() {
    // starting n number of workers
	for i := 0; i < d.maxWorkers; i++ {
		worker := NewWorker(d.pool)
		worker.Start()
	}

	go d.dispatch()
}

func (d *Dispatcher) dispatch() {
	for {
		select {
		case job := <-JobQueue:
			// a job request has been received
			go func(job Job) {
				// try to obtain a worker job channel that is available.
				// this will block until a worker is idle
				jobChannel := <-d.WorkerPool

				// dispatch the job to the worker job channel
				jobChannel <- job
			}(job)
		}
	}
}

Обратите внимание, что мы ограничиваем максимальное количество рабочих процессов в пуле потоков. Наше приложение работает в докеризованной среде Go, развернутой на Amazon Elasticbeanstalk, и пытается следовать12 элементовПринципы настройки нашей производственной среды, получение соответствующих значений параметров в переменных среды, чтобы мы могли контролировать количество рабочих иJobQueueМаксимальная мощность путем изменения значений соответствует напрямую без необходимости повторного развертывания нашего приложения.

var ( 
  MaxWorker = os.Getenv("MAX_WORKERS") 
  MaxQueue  = os.Getenv("MAX_QUEUE") 
)

Когда мы освободим в производственной среде, которую мы задерживаем ставку сразу, значительно снизились, наша способность иметь дело с просьбой иметь качественный скачок.

После минуты ожидания, пока наш балансировщик нагрузки полностью заработает, мы видим, что сервер ElasticBeanstalk получает почти миллион запросов в минуту. Обычно у нас бывает несколько часов всплесков трафика утром, когда запросов даже больше миллиона в минуту.

А когда мы выпустили новый код, количество серверов упало со 100 до 20 и стабилизировалось.

После добавления соответствующей конфигурации в кластер и настройки автоматического масштабирования его можно даже сократить до 4 экземпляров EC2 c4.Large для выполнения повседневных задач. И кластер автоматически добавит новые экземпляры, когда загрузка ЦП достигнет 90% в течение 5 минут.

Суммировать

Простой дизайн — это всегда то, что мне нужно. Мы могли бы спроектировать сложную систему с большим количеством очередей, запущенными рабочими процессами в фоновом режиме, сложными развертываниями и т. д., но вместо этого мы решили воспользоваться мощными возможностями автоматического масштабирования Elasticbeanstalk и готовыми функциями параллелизма, которые предоставляет Go.

Всегда есть инструмент для вашей работы, и иногда, когда вашей системе Ruby on Rails требуется мощная возможность обработки сетевых запросов, попробуйте рассмотреть более мощную и лаконичную альтернативу экосистеме ruby.


Перед тем, как ты уйдешь

Если вы сможете подписаться на мой Twitter и поделиться им с друзьями, я буду очень благодарен! мой твиттерtwitter.com/mcastilho