[Перевод] 1 миллион запросов в минуту с Golang

Go

Я работал в сфере защиты от спама, вирусов и вредоносных программ более 15 лет в нескольких разных компаниях, и теперь я знаю, что эти системы со временем усложнятся из-за большого объема данных, которые мы обрабатываем каждый день.

В настоящее время я являюсь генеральным директором smsjunk.com и главным архитектором KnowBe4, обеих компаний, работающих в сфере кибербезопасности.

Интересно, что за последние 10 лет или около того большая часть разработки веб-сервера, над которой я работал как инженер-программист, выполнялась на Ruby on Rails. Не поймите меня неправильно, я люблю Ruby on Rails, я считаю, что это замечательная среда, но через некоторое время вы начинаете думать и проектировать системы на ruby, и если вы забудете об эффективности и простоте архитектуры программного обеспечения — можете воспользоваться многопоточности, распараллеливания, быстрого выполнения и небольших затрат памяти. Я был разработчиком C/C++, Delphi и C# в течение многих лет, и я только начинаю понимать, насколько сложным может быть использование правильных инструментов для работы.

Меня не слишком интересуют войны языков и фреймворков, о которых постоянно спорит Интернет.我相信效率,生产力和代码可维护性主要取决于您构建解决方案的简单程度。

проблема

При работе с нашей анонимной системой телеметрии и аналитики наша цель — иметь возможность обрабатывать большой объем запросов POST от миллионов конечных точек. Веб-обработчик получит документ JSON, который может содержать набор нескольких полезных данных, которые необходимо записать в Amazon S3, чтобы наша система сокращения карт могла работать с этими данными позже.

Традиционно мы бы рассмотрели возможность создания архитектуры рабочего уровня, которая использует такие вещи, как:

  • Sidekiq
  • Resque
  • DelayedJob
  • Elasticbeanstalk Worker Tier
  • RabbitMQ
  • ...

И настройте 2 разных кластера, один для веб-интерфейса и один для рабочих, чтобы мы могли масштабировать объем внутренней работы, с которой мы можем справиться.

Но с самого начала наша команда знала, что мы должны сделать это на Go, потому что на этапе обсуждения мы увидели, что это может быть очень большая транспортная система. Я использую Go уже около 2 лет, и мы разработали несколько систем для Go здесь, но ни одна из них не находится на таком уровне. Мы начинаем с создания пары структур для определения полезной нагрузки веб-запроса, которую мы получаем через вызов POST, и загружаем ее в хранилище S3.

type PayloadCollection struct {
	WindowsVersion  string    `json:"version"`
	Token           string    `json:"token"`
	Payloads        []Payload `json:"data"`
}

type Payload struct {
    // [redacted]
}

func (p *Payload) UploadToS3() error {
	// the storageFolder method ensures that there are no name collision in
	// case we get same timestamp in the key name
	storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())

	bucket := S3Bucket

	b := new(bytes.Buffer)
	encodeErr := json.NewEncoder(b).Encode(payload)
	if encodeErr != nil {
		return encodeErr
	}

	// Everything we post to the S3 bucket should be marked 'private'
	var acl = s3.Private
	var contentType = "application/octet-stream"

	return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}

Практика наива — использование рутины Go

Изначально у нас была очень наивная реализация обработчика POST, пытающаяся распараллелить обработку заданий в простой горутине:

func payloadHandler(w http.ResponseWriter, r *http.Request) {

	if r.Method != "POST" {
		w.WriteHeader(http.StatusMethodNotAllowed)
		return
	}

	// Read the body into a string for json decoding
	var content = &PayloadCollection{}
	err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
	if err != nil {
		w.Header().Set("Content-Type", "application/json; charset=UTF-8")
		w.WriteHeader(http.StatusBadRequest)
		return
	}
	
	// Go through each payload and queue items individually to be posted to S3
	for _, payload := range content.Payloads {
		go payload.UploadToS3()   // <----- DON'T DO THIS
	}

	w.WriteHeader(http.StatusOK)
}

При умеренных нагрузках это может работать для большинства людей, но быстро оказывается, что это не работает в больших масштабах. Мы ожидали много запросов, но к тому времени, когда мы развернули первую версию в рабочей среде, порядков, которые мы начали видеть, не было. Мы игнорируем трафик.

Есть несколько проблем с вышеуказанным методом. Невозможно контролировать количество работающих процедур go. И, поскольку мы получали 1 миллион POST-запросов в минуту, система быстро дала сбой.

Снова

Нам нужно найти другой путь. С самого начала мы обсуждали, как сделать обработчики запросов очень недолговечными и генерировать обработку в фоновом режиме. Конечно, это то, что должен делать Ruby on Rails, иначе, независимо от того, используете ли вы пуму, единорога или пассажира, все ваши доступные веб-воркеры будут заблокированы.

Затем нам нужно использовать общие решения для выполнения этой работы, такие как Resque, Sidekiq, SQS и т. д. Список можно продолжать, так как существует множество способов добиться этого.

Итак, вторая итерация заключается в создании буферного канала, в котором мы можем ставить в очередь некоторые задания и загружать их на S3, поскольку мы можем контролировать максимальное количество элементов в очереди и иметь достаточно оперативной памяти для постановки заданий в очередь в памяти. Мы подумали, что этого достаточно. чтобы просто буферизовать задания в очереди каналов.

var Queue chan Payload

func init() {
    Queue = make(chan Payload, MAX_QUEUE)
}

func payloadHandler(w http.ResponseWriter, r *http.Request) {
    ...
    // Go through each payload and queue items individually to be posted to S3
    for _, payload := range content.Payloads {
        Queue <- payload
    }
    ...
}

Затем, чтобы взять задачи из канала буфера и обработать их, мы используем что-то вроде этого:

func StartProcessor() {
    for {
        select {
        case job := <-Queue:
            job.payload.UploadToS3()  // <-- STILL NOT GOOD
        }
    }
}

Честно говоря, я не знаю, о чем мы думали. Должно быть, это была раскаленная ночь по всем направлениям. Такой подход нам ничего не дал, мы променяли ущербный параллелизм на буферизованную очередь, что просто отсрочило проблему. Наш синхронный процессор загружает только одну полезную нагрузку в S3 за раз, и, поскольку скорость входящих запросов намного превышает способность одного процессора загружать в S3, наш буферный канал быстро достигает своего предела и не позволяет обработчику продолжать работу в нем. Добавьте больше данных запроса.

Мы просто избежали проблемы и закончили обратный отсчет, пока наша система не умерла. После того, как мы развернули эту версию с ошибками, уровень задержки продолжал расти с постоянной скоростью.

лучшее решение

Мы решили использовать общий шаблон для каналов Go, чтобы создать двухуровневую систему каналов: один для обработки заданий в очереди, а другой для управления количеством рабочих процессов, одновременно работающих в JobQueue.

Идея состоит в том, чтобы распараллелить загрузку в S3 с устойчивой скоростью, которая не приводит к сбою машины или ошибкам подключения к S3. Поэтому мы решили создать шаблон Работа/Рабочий. Для тех, кто знаком с Java, C# и т. д., подумайте об этом как о способе Golang реализовать пул рабочих потоков с использованием каналов.

var (
	MaxWorker = os.Getenv("MAX_WORKERS")
	MaxQueue  = os.Getenv("MAX_QUEUE")
)

// Job represents the job to be run
type Job struct {
	Payload Payload
}

// A buffered channel that we can send work requests on.
var JobQueue chan Job

// Worker represents the worker that executes the job
type Worker struct {
	WorkerPool  chan chan Job
	JobChannel  chan Job
	quit    	chan bool
}

func NewWorker(workerPool chan chan Job) Worker {
	return Worker{
		WorkerPool: workerPool,
		JobChannel: make(chan Job),
		quit:       make(chan bool)}
}

// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
	go func() {
		for {
			// register the current worker into the worker queue.
			w.WorkerPool <- w.JobChannel

			select {
			case job := <-w.JobChannel:
				// we have received a work request.
				if err := job.Payload.UploadToS3(); err != nil {
					log.Errorf("Error uploading to S3: %s", err.Error())
				}

			case <-w.quit:
				// we have received a signal to stop
				return
			}
		}
	}()
}

// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
	go func() {
		w.quit <- true
	}()
}

Мы изменили наш обработчик веб-запросов, чтобы создать структуру Job с полезной нагрузкой и отправить ее в канал JobQueue, чтобы работники могли получить ее для обработки.

func payloadHandler(w http.ResponseWriter, r *http.Request) {

    if r.Method != "POST" {
		w.WriteHeader(http.StatusMethodNotAllowed)
		return
	}

    // Read the body into a string for json decoding
	var content = &PayloadCollection{}
	err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
    if err != nil {
		w.Header().Set("Content-Type", "application/json; charset=UTF-8")
		w.WriteHeader(http.StatusBadRequest)
		return
	}

    // Go through each payload and queue items individually to be posted to S3
    for _, payload := range content.Payloads {

        // let's create a job with the payload
        work := Job{Payload: payload}

        // Push the work onto the queue.
        JobQueue <- work
    }

    w.WriteHeader(http.StatusOK)
}

Во время инициализации нашего веб-сервера мы создаем Dispatcher и вызываем Run(), чтобы создать рабочий пул и начать прослушивание заданий, появляющихся в JobQueue.

dispatcher := NewDispatcher(MaxWorker) 
dispatcher.Run()

Вот код реализации нашего планировщика:

type Dispatcher struct {
	// A pool of workers channels that are registered with the dispatcher
	WorkerPool chan chan Job
}

func NewDispatcher(maxWorkers int) *Dispatcher {
	pool := make(chan chan Job, maxWorkers)
	return &Dispatcher{WorkerPool: pool}
}

func (d *Dispatcher) Run() {
    // starting n number of workers
	for i := 0; i < d.maxWorkers; i++ {
		worker := NewWorker(d.pool)
		worker.Start()
	}

	go d.dispatch()
}

func (d *Dispatcher) dispatch() {
	for {
		select {
		case job := <-JobQueue:
			// a job request has been received
			go func(job Job) {
				// try to obtain a worker job channel that is available.
				// this will block until a worker is idle
				jobChannel := <-d.WorkerPool

				// dispatch the job to the worker job channel
				jobChannel <- job
			}(job)
		}
	}
}

Обратите внимание, что мы создаем максимальное количество рабочих процессов и сохраняем их в пуле рабочих процессов (то есть в указанном выше канале WorkerPool). Поскольку мы уже используем Amazon Elasticbeanstalk для наших проектов Dockerized Go и всегда стараемся следовать12 факторспособ настройки системы в продакшене, поэтому мы считываем эти значения из переменных окружения. Таким образом, мы можем контролировать количество и максимальный размер рабочей очереди, поэтому мы можем быстро настроить эти значения без необходимости повторного развертывания кластера.

var ( 
  MaxWorker = os.Getenv("MAX_WORKERS") 
  MaxQueue  = os.Getenv("MAX_QUEUE") 
)

Сразу же после того, как мы развернули его, мы увидели, что все наши показатели задержки упали до незначительных цифр, а наша способность обрабатывать запросы резко возросла.

Через несколько минут после того, как наш Elastic Load Balancer был полностью прогрет, мы увидели, что наше приложение ElasticBeanstalk обслуживает почти 1 миллион запросов в минуту. Обычно у нас есть несколько утренних часов, когда трафик достигает более 1 миллиона в минуту.

Как только мы развернули новый код, количество серверов резко сократилось со 100 до примерно 20 серверов.

После того, как мы правильно настроили наш кластер и параметры автомасштабирования, мы смогли снизить его до конфигурации только 4x EC2 c4. Большие экземпляры и Elastic Auto-Scaling настроены на создание нового экземпляра, когда загрузка ЦП превышает 90% в течение 5 минут подряд.

в заключении

В моей книге всегда побеждает простота. Мы могли бы спроектировать сложную систему с множеством очередей, фоновых рабочих процессов и сложных развертываний, но мы решили воспользоваться преимуществами автоматического масштабирования Elasticbeanstalk, а также эффективностью и простотой параллелизма, которые Golang дает нам из коробки.

Не каждый день кластер состоит всего из 4 машин, что, вероятно, далеко не так хорошо, как мой текущий MacBook Pro, способный обрабатывать 100 Вт запросов в минуту.

Всегда есть инструмент, который соответствует данной потребности. Иногда, когда вашей системе Ruby on Rails требуется очень мощный веб-обработчик, рассмотрите возможность поиска за пределами экосистемы Ruby более простых, но более мощных альтернативных решений.

Оригинальный адрес:medium.com/Все в порядке/Корейский…