Миллионы запросов в минуту с Go

задняя часть Go
Миллионы запросов в минуту с Go

Публичный аккаунт WeChat: ночная столовая Ву Циньцяна

вводить

Я наткнулся на статью, написанную лет в 15. Честно говоря, меня очень привлекло название, но, прочитав несколько раз, она была действительно замечательной. Что касается этой статьи, я не буду ее напрямую переводить. Требование проекта — клиент отправляет запрос, а сервер получает запрос и обрабатывает данные (исходный текст — загрузить ресурс на ресурс Amazon S3). Это по сути так,image

Я немного изменил бизнес-код исходного текста, но это не влияет на основной модуль. В первом издании каждый получилRequest, открытьgoroutineОбработка, быстрый ответ, очень рутинная работа.

код показывает, как показано ниже

первое издание


package main

import (
	"fmt"
	"log"
	"net/http"
	"time"
)

type Payload struct {
	// 传啥不重要
}

func (p *Payload) UpdateToS3() error {
	//存储逻辑,模拟操作耗时
	time.Sleep(500 * time.Millisecond)
	fmt.Println("上传成功")
	return nil
}

func payloadHandler(w http.ResponseWriter, r *http.Request) {
	// 业务过滤
	// 请求body解析......
	var p Payload
	go p.UpdateToS3()
	w.Write([]byte("操作成功"))
}

func main() {
	http.HandleFunc("/payload", payloadHandler)
	log.Fatal(http.ListenAndServe(":8099", nil))
}

В чем проблема сделать это? В общем, без проблем. Но если это сценарий с высокой степенью параллелизма, это неправильно.goroutineномер для управления вашимCPUСкорость использования резко возросла, использование памяти резко возросло, пока программа не рухнула.

Если эта операция попадает в базу данных, например.mysql, то соответственно дисковый ввод-вывод сервера и пропускная способность сети вашей базы данных ,CPUНагрузка и потребление памяти достигнут очень высокой ситуации и сработают вместе. Поэтому, когда в программе происходит что-то неконтролируемое, это часто является сигналом опасности.

китайская версия

package main

import (
	"fmt"
	"log"
	"net/http"
	"time"
)

const MaxQueue = 400

var Queue chan Payload

func init() {
	Queue = make(chan Payload, MaxQueue)
}

type Payload struct {
	// 传啥不重要
}

func (p *Payload) UpdateToS3() error {
	//存储逻辑,模拟操作耗时
	time.Sleep(500 * time.Millisecond)
	fmt.Println("上传成功")
	return nil
}

func payloadHandler(w http.ResponseWriter, r *http.Request) {
	// 业务过滤
	// 请求body解析......
	var p Payload
	//go p.UpdateToS3()
	Queue <- p
	w.Write([]byte("操作成功"))
}

// 处理任务
func StartProcessor() {
	for {
		select {
		case payload := <-Queue:
			payload.UpdateToS3()
		}
	}
}

func main() {
	http.HandleFunc("/payload", payloadHandler)
	//单独开一个g接收与处理任务
	go StartProcessor()
	log.Fatal(http.ListenAndServe(":8099", nil))
}

В этом издании используетсяbufferedизchannelдля завершения этой функции, тем самым контролируя неограниченноеgoroutine, но так и не решил проблему.

Обработка запроса — синхронная операция, и одновременно обрабатывается только одна задача, однако скорость входящих запросов при высоком параллелизме намного превышает скорость обработки. В этом случае однаждыchannelПосле его заполнения последующие запросы будут заблокированы и так далее. Затем вы обнаружите, что время отклика начнет резко увеличиваться, и даже ответа больше не будет.

Окончательный версия

package main

import (
"fmt"
"log"
"net/http"
"time"
)

const (
	MaxWorker = 100 //随便设置值
	MaxQueue  = 200 // 随便设置值
)

// 一个可以发送工作请求的缓冲 channel
var JobQueue chan Job

func init() {
	JobQueue = make(chan Job, MaxQueue)
}

type Payload struct{}

type Job struct {
	PayLoad Payload
}

type Worker struct {
	WorkerPool chan chan Job
	JobChannel chan Job
	quit       chan bool
}

func NewWorker(workerPool chan chan Job) Worker {
	return Worker{
		WorkerPool: workerPool,
		JobChannel: make(chan Job),
		quit:       make(chan bool),
	}
}

// Start 方法开启一个 worker 循环,监听退出 channel,可按需停止这个循环
func (w Worker) Start() {
	go func() {
		for {
			// 将当前的 worker 注册到 worker 队列中
			w.WorkerPool <- w.JobChannel
			select {
			case job := <-w.JobChannel:
				// 	真正业务的地方
				//	模拟操作耗时
				time.Sleep(500 * time.Millisecond)
				fmt.Printf("上传成功:%v\n", job)
			case <-w.quit:
				return
			}
		}
	}()
}

func (w Worker) stop() {
	go func() {
		w.quit <- true
	}()
}

// 初始化操作

type Dispatcher struct {
	// 注册到 dispatcher 的 worker channel 池
	WorkerPool chan chan Job
}

func NewDispatcher(maxWorkers int) *Dispatcher {
	pool := make(chan chan Job, maxWorkers)
	return &Dispatcher{WorkerPool: pool}
}

func (d *Dispatcher) Run() {
	// 开始运行 n 个 worker
	for i := 0; i < MaxWorker; i++ {
		worker := NewWorker(d.WorkerPool)
		worker.Start()
	}
	go d.dispatch()
}

func (d *Dispatcher) dispatch() {
	for {
		select {
		case job := <-JobQueue:
			go func(job Job) {
				// 尝试获取一个可用的 worker job channel,阻塞直到有可用的 worker
				jobChannel := <-d.WorkerPool
				// 分发任务到 worker job channel 中
				jobChannel <- job
			}(job)
		}
	}
}

// 接收请求,把任务筛入JobQueue。
func payloadHandler(w http.ResponseWriter, r *http.Request) {
	work := Job{PayLoad: Payload{}}
	JobQueue <- work
	_, _ = w.Write([]byte("操作成功"))
}

func main() {
	// 通过调度器创建worker,监听来自 JobQueue的任务
	d := NewDispatcher(MaxWorker)
	d.Run()
	http.HandleFunc("/payload", payloadHandler)
	log.Fatal(http.ListenAndServe(":8099", nil))
}


Завершающий двухуровневыйchannel, первый уровень заключается в том, чтобы поместить данные запроса пользователя вchan Job, этоchannel jobЭквивалент очереди отложенных задач.

Другой уровень используется для хранения задач, которые могут быть обработаны.workОчередь кэша, типchan chan Job. Планировщик помещает отложенные задачи в незанятую очередь кэша,workвсегда будет обрабатывать свою кэшированную очередь. Таким образом,workerбассейн. Нарисуйте схему примерно, чтобы помочь понятьimage

Сначала после получения запроса создаемJobзадачу, поставь ее в очередь задач и ждиworkОбработка пула.

func payloadHandler(w http.ResponseWriter, r *http.Request) {
	work := Job{PayLoad: Payload{}}
	JobQueue <- work
	_, _ = w.Write([]byte("操作成功"))
}

Инициализация планировщикаworkпосле бассейна, вdispatch, как только мы получимJobQueueзадание, попробуйте получить доступныйworker, распределять задачиworkerизjob channelсередина. Обратите внимание, что этот процесс не является синхронным, но каждый раз, когдаjob, просто откройтеGиметь дело с. Это гарантируетJobQueueНет необходимости блокировать, соответствующийJobQueueТеоретически нет необходимости блокировать задачи записи.

func (d *Dispatcher) Run() {
	// 开始运行 n 个 worker
	for i := 0; i < MaxWorker; i++ {
		worker := NewWorker(d.WorkerPool)
		worker.Start()
	}
	go d.dispatch()
}

func (d *Dispatcher) dispatch() {
	for {
		select {
		case job := <-JobQueue:
			go func(job Job) {
				// 尝试获取一个可用的 worker job channel,阻塞直到有可用的 worker
				jobChannel := <-d.WorkerPool
				// 分发任务到 worker job channel 中
				jobChannel <- job
			}(job)
		}
	}
}

"Неуправляемый" здесьGОн отличается от вышеперечисленного. Заблокировано чтение только на очень короткое времяChanсостояние, когда есть холостойworkerпробуждается, а затем отправляет задачи, и весь жизненный цикл намного короче вышеописанных операций.

Наконец, настоятельно рекомендуется прочитать исходный текст, оригинальный адрес находится по адресу:Так ли это? IO/2015/07/Корейский…