Публичный аккаунт WeChat: ночная столовая Ву Циньцяна
вводить
Я наткнулся на статью, написанную лет в 15. Честно говоря, меня очень привлекло название, но, прочитав несколько раз, она была действительно замечательной. Что касается этой статьи, я не буду ее напрямую переводить. Требование проекта — клиент отправляет запрос, а сервер получает запрос и обрабатывает данные (исходный текст — загрузить ресурс на ресурс Amazon S3). Это по сути так,
Я немного изменил бизнес-код исходного текста, но это не влияет на основной модуль. В первом издании каждый получилRequest
, открытьgoroutine
Обработка, быстрый ответ, очень рутинная работа.
код показывает, как показано ниже
первое издание
package main
import (
"fmt"
"log"
"net/http"
"time"
)
type Payload struct {
// 传啥不重要
}
func (p *Payload) UpdateToS3() error {
//存储逻辑,模拟操作耗时
time.Sleep(500 * time.Millisecond)
fmt.Println("上传成功")
return nil
}
func payloadHandler(w http.ResponseWriter, r *http.Request) {
// 业务过滤
// 请求body解析......
var p Payload
go p.UpdateToS3()
w.Write([]byte("操作成功"))
}
func main() {
http.HandleFunc("/payload", payloadHandler)
log.Fatal(http.ListenAndServe(":8099", nil))
}
В чем проблема сделать это? В общем, без проблем. Но если это сценарий с высокой степенью параллелизма, это неправильно.goroutine
номер для управления вашимCPU
Скорость использования резко возросла, использование памяти резко возросло, пока программа не рухнула.
Если эта операция попадает в базу данных, например.mysql
, то соответственно дисковый ввод-вывод сервера и пропускная способность сети вашей базы данных
,CPU
Нагрузка и потребление памяти достигнут очень высокой ситуации и сработают вместе. Поэтому, когда в программе происходит что-то неконтролируемое, это часто является сигналом опасности.
китайская версия
package main
import (
"fmt"
"log"
"net/http"
"time"
)
const MaxQueue = 400
var Queue chan Payload
func init() {
Queue = make(chan Payload, MaxQueue)
}
type Payload struct {
// 传啥不重要
}
func (p *Payload) UpdateToS3() error {
//存储逻辑,模拟操作耗时
time.Sleep(500 * time.Millisecond)
fmt.Println("上传成功")
return nil
}
func payloadHandler(w http.ResponseWriter, r *http.Request) {
// 业务过滤
// 请求body解析......
var p Payload
//go p.UpdateToS3()
Queue <- p
w.Write([]byte("操作成功"))
}
// 处理任务
func StartProcessor() {
for {
select {
case payload := <-Queue:
payload.UpdateToS3()
}
}
}
func main() {
http.HandleFunc("/payload", payloadHandler)
//单独开一个g接收与处理任务
go StartProcessor()
log.Fatal(http.ListenAndServe(":8099", nil))
}
В этом издании используетсяbuffered
изchannel
для завершения этой функции, тем самым контролируя неограниченноеgoroutine
, но так и не решил проблему.
Обработка запроса — синхронная операция, и одновременно обрабатывается только одна задача, однако скорость входящих запросов при высоком параллелизме намного превышает скорость обработки. В этом случае однаждыchannel
После его заполнения последующие запросы будут заблокированы и так далее. Затем вы обнаружите, что время отклика начнет резко увеличиваться, и даже ответа больше не будет.
Окончательный версия
package main
import (
"fmt"
"log"
"net/http"
"time"
)
const (
MaxWorker = 100 //随便设置值
MaxQueue = 200 // 随便设置值
)
// 一个可以发送工作请求的缓冲 channel
var JobQueue chan Job
func init() {
JobQueue = make(chan Job, MaxQueue)
}
type Payload struct{}
type Job struct {
PayLoad Payload
}
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}
func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan bool),
}
}
// Start 方法开启一个 worker 循环,监听退出 channel,可按需停止这个循环
func (w Worker) Start() {
go func() {
for {
// 将当前的 worker 注册到 worker 队列中
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// 真正业务的地方
// 模拟操作耗时
time.Sleep(500 * time.Millisecond)
fmt.Printf("上传成功:%v\n", job)
case <-w.quit:
return
}
}
}()
}
func (w Worker) stop() {
go func() {
w.quit <- true
}()
}
// 初始化操作
type Dispatcher struct {
// 注册到 dispatcher 的 worker channel 池
WorkerPool chan chan Job
}
func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{WorkerPool: pool}
}
func (d *Dispatcher) Run() {
// 开始运行 n 个 worker
for i := 0; i < MaxWorker; i++ {
worker := NewWorker(d.WorkerPool)
worker.Start()
}
go d.dispatch()
}
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
go func(job Job) {
// 尝试获取一个可用的 worker job channel,阻塞直到有可用的 worker
jobChannel := <-d.WorkerPool
// 分发任务到 worker job channel 中
jobChannel <- job
}(job)
}
}
}
// 接收请求,把任务筛入JobQueue。
func payloadHandler(w http.ResponseWriter, r *http.Request) {
work := Job{PayLoad: Payload{}}
JobQueue <- work
_, _ = w.Write([]byte("操作成功"))
}
func main() {
// 通过调度器创建worker,监听来自 JobQueue的任务
d := NewDispatcher(MaxWorker)
d.Run()
http.HandleFunc("/payload", payloadHandler)
log.Fatal(http.ListenAndServe(":8099", nil))
}
Завершающий двухуровневыйchannel
, первый уровень заключается в том, чтобы поместить данные запроса пользователя вchan Job
, этоchannel job
Эквивалент очереди отложенных задач.
Другой уровень используется для хранения задач, которые могут быть обработаны.work
Очередь кэша, типchan chan Job
. Планировщик помещает отложенные задачи в незанятую очередь кэша,work
всегда будет обрабатывать свою кэшированную очередь. Таким образом,worker
бассейн. Нарисуйте схему примерно, чтобы помочь понять
Сначала после получения запроса создаемJob
задачу, поставь ее в очередь задач и ждиwork
Обработка пула.
func payloadHandler(w http.ResponseWriter, r *http.Request) {
work := Job{PayLoad: Payload{}}
JobQueue <- work
_, _ = w.Write([]byte("操作成功"))
}
Инициализация планировщикаwork
после бассейна, вdispatch
, как только мы получимJobQueue
задание, попробуйте получить доступныйworker
, распределять задачиworker
изjob channel
середина. Обратите внимание, что этот процесс не является синхронным, но каждый раз, когдаjob
, просто откройтеG
иметь дело с. Это гарантируетJobQueue
Нет необходимости блокировать, соответствующийJobQueue
Теоретически нет необходимости блокировать задачи записи.
func (d *Dispatcher) Run() {
// 开始运行 n 个 worker
for i := 0; i < MaxWorker; i++ {
worker := NewWorker(d.WorkerPool)
worker.Start()
}
go d.dispatch()
}
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
go func(job Job) {
// 尝试获取一个可用的 worker job channel,阻塞直到有可用的 worker
jobChannel := <-d.WorkerPool
// 分发任务到 worker job channel 中
jobChannel <- job
}(job)
}
}
}
"Неуправляемый" здесьG
Он отличается от вышеперечисленного. Заблокировано чтение только на очень короткое времяChan
состояние, когда есть холостойworker
пробуждается, а затем отправляет задачи, и весь жизненный цикл намного короче вышеописанных операций.
Наконец, настоятельно рекомендуется прочитать исходный текст, оригинальный адрес находится по адресу:Так ли это? IO/2015/07/Корейский…