предисловие
Цель создания этой библиотеки заключается в том, что существующие библиотеки Go не имеют таких функций, как гибкое определение запусков заданий, пакетная отправка задач для уменьшения отправки небольших задач. В прошлом можно было создавать код, основанный на идее рабочих пулов, но проблемы с его использованием никогда не создавали полную библиотеку в целом.
go-zeroтолько что созданныйexecutors.
представлять
существуетgo-zeroсередина,executorsДействуйте как пул задач, выполняйте многозадачную буферизацию и используйте задачи для пакетной обработки. Такие как:clickhouseмногочисленныйinsert,sql batch insert. Также доступно вgo-queueтакже смexecutors【существуетqueueкоторый используется вChunkExecutor, ограничить размер байтов отправки задачи].
Поэтому, когда у вас есть следующие требования, вы можете использовать этот компонент:
- Отправляйте задачи пакетами
- Буферизируйте некоторые задачи и отправляйте их лениво
- Отложить отправку задачи
Перед конкретным объяснением давайте дадим общий обзор:
дизайн интерфейса
существуетexecutorsПод упаковкой находится следующееexecutor:
| Name | Margin value |
|---|---|
bulkexecutor |
достигатьmaxTasks【Максимальное количество заданий】 Отправить |
chunkexecutor |
достигатьmaxChunkSize【Максимальное количество байт】Отправить |
periodicalexecutor |
basic executor |
delayexecutor |
Задержка исполнения входящегоfn()
|
lessexecutor |
Вы увидите, что в дополнение к специальным функциямdelay,less, остальные триexecutor + containerКомбинированный дизайн:
func NewBulkExecutor(execute Execute, opts ...BulkOption) *BulkExecutor {
// 选项模式:在 go-zero 中多处出现。在多配置下,比较好的设计思路
// https://halls-of-valhalla.org/beta/articles/functional-options-pattern-in-go,54/
options := newBulkOptions()
for _, opt := range opts {
opt(&options)
}
// 1. task container: [execute 真正做执行的函数] [maxTasks 执行临界点]
container := &bulkContainer{
execute: execute,
maxTasks: options.cachedTasks,
}
// 2. 可以看出 bulkexecutor 底层依赖 periodicalexecutor
executor := &BulkExecutor{
executor: NewPeriodicalExecutor(options.flushInterval, container),
container: container,
}
return executor
}
и этоcontainerЯвляетсяinterface:
TaskContainer interface {
// 把 task 加入 container
AddTask(task interface{}) bool
// 实际上是去执行传入的 execute func()
Execute(tasks interface{})
// 达到临界值,移除 container 中全部的 task,通过 channel 传递到 execute func() 执行
RemoveAll() interface{}
}
Это показывает зависимости между:
-
bulkexecutor:periodicalexecutor+bulkContainer -
chunkexecutor:periodicalexecutor+chunkContainer
Итак, вы хотите закончить свой собственный
executor, можно реализоватьcontainerиз этих трех интерфейсов, а затем объединитьperiodicalexecutorпросто хорошо
Итак, вернемся к 👆 этой картинке, наше внимание сосредоточено наperiodicalexecutor, чтобы увидеть, как он был разработан?
как использовать
Сначала посмотрим, как использовать этот компонент в бизнесе:
Существует временная служба, которая выполняется в фиксированное время каждый день сmysqlприбытьclickhouseсинхронизация данных:
type DailyTask struct {
ckGroup *clickhousex.Cluster
insertExecutor *executors.BulkExecutor
mysqlConn sqlx.SqlConn
}
инициализацияbulkExecutor:
func (dts *DailyTask) Init() {
// insertIntoCk() 是真正insert执行函数【需要开发者自己编写具体业务逻辑】
dts.insertExecutor = executors.NewBulkExecutor(
dts.insertIntoCk,
executors.WithBulkInterval(time.Second*3), // 3s会自动刷一次container中task去执行
executors.WithBulkTasks(10240), // container最大task数。一般设为2的幂次
)
}
Дополнительное введение:
clickhouseОн подходит для крупномасштабных вставок, потому что скорость вставки очень высока, а крупномасштабные вставки могут в полной мере использовать Clickhouse.
Основная бизнес-логика написана:
func (dts *DailyTask) insertNewData(ch chan interface{}, sqlFromDb *model.Task) error {
for item := range ch {
if r, vok := item.(*model.Task); !vok {
continue
}
err := dts.insertExecutor.Add(r)
if err != nil {
r.Tag = sqlFromDb.Tag
r.TagId = sqlFromDb.Id
r.InsertId = genInsertId()
r.ToRedis = toRedis == constant.INCACHED
r.UpdateWay = sqlFromDb.UpdateWay
// 1⃣️
err := dts.insertExecutor.Add(r)
if err != nil {
logx.Error(err)
}
}
}
// 2⃣️
dts.insertExecutor.Flush()
// 3⃣️
dts.insertExecutor.Wait()
}
может задаться вопросом, почему
Flush(), Wait(), который будет проанализирован исходным кодом позже
Обычно используются 3 шага:
-
Add(): присоединиться к задаче -
Flush(): обновитьcontainerзадачи в -
Wait(): Дождитесь завершения выполнения всех задач.
Анализ исходного кода
Основной анализ здесь
periodicalexecutor, потому что два других обычно используютсяexecutorзависеть от этого
инициализация
func New...(interval time.Duration, container TaskContainer) *PeriodicalExecutor {
executor := &PeriodicalExecutor{
commander: make(chan interface{}, 1),
interval: interval,
container: container,
confirmChan: make(chan lang.PlaceholderType),
newTicker: func(d time.Duration) timex.Ticker {
return timex.NewTicker(interval)
},
}
...
return executor
}
-
commander:передачаtasksканал -
container: склад временного храненияAdd()задание -
confirmChan:блокироватьAdd(), в начале этогоexecuteTasks()разблокирует -
ticker: таймер, предотвратитьAdd()При блокировке будет возможность регулярного выполнения, а временная задача будет вовремя выпущена
Add()
После инициализации первым шагом бизнес-логики является добавление задачиexecutor:
func (pe *PeriodicalExecutor) Add(task interface{}) {
if vals, ok := pe.addAndCheck(task); ok {
pe.commander <- vals
<-pe.confirmChan
}
}
func (pe *PeriodicalExecutor) addAndCheck(task interface{}) (interface{}, bool) {
pe.lock.Lock()
defer func() {
// 一开始为 false
var start bool
if !pe.guarded {
// backgroundFlush() 会将 guarded 重新置反
pe.guarded = true
start = true
}
pe.lock.Unlock()
// 在第一条 task 加入的时候就会执行 if 中的 backgroundFlush()。后台协程刷task
if start {
pe.backgroundFlush()
}
}()
// 控制maxTask,>=maxTask 将container中tasks pop, return
if pe.container.AddTask(task) {
return pe.container.RemoveAll(), true
}
return nil, false
}
addAndCheck()серединаAddTask()Контролирует максимальное количество задач, если оно превышается, оно будет выполненоRemoveAll(), будет временно хранитьcontainerзадачи выскакивают, переходят кcommander, за которым следует цикл горутины для чтения и последующего выполнения задач.
backgroundFlush()
Запустить фоновую сопрограмму, даcontainerЗадача в , которая постоянно обновляется:
func (pe *PeriodicalExecutor) backgroundFlush() {
// 封装 go func(){}
threading.GoSafe(func() {
ticker := pe.newTicker(pe.interval)
defer ticker.Stop()
var commanded bool
last := timex.Now()
for {
select {
// 从channel拿到 []tasks
case vals := <-pe.commander:
commanded = true
// 实质:wg.Add(1)
pe.enterExecution()
// 放开 Add() 的阻塞,而且此时暂存区也为空。才开始新的 task 加入
pe.confirmChan <- lang.Placeholder
// 真正的执行 task 逻辑
pe.executeTasks(vals)
last = timex.Now()
case <-ticker.Chan():
if commanded {
// 由于select选择的随机性,如果同时满足两个条件同时执行完上面的,此处置反,并跳过本段执行
// https://draveness.me/golang/docs/part2-foundation/ch05-keyword/golang-select/
commanded = false
} else if pe.Flush() {
// 刷新完成,定时器清零。暂存区空了,开始下一次定时刷新
last = timex.Now()
} else if timex.Since(last) > pe.interval*idleRound {
// 既没到maxTask,Flush() err,并且 last->now 时间过长,会再次触发 Flush()
// 只有这置反,才会开启一个新的 backgroundFlush() 后台协程
pe.guarded = false
// 再次刷新,防止漏掉
pe.Flush()
return
}
}
}
})
}
Всего два процесса:
-
commanderполученоRemoveAll()Сдал задачи, потом выполняю, и отпускаюAdd(), продолжитьAdd() -
tickerКогда время истекло, если первый шаг не выполнен, то автоматическиFlush(), также выполнит задание
Wait()
существуетbackgroundFlush(), упоминая функцию:enterExecution():
func (pe *PeriodicalExecutor) enterExecution() {
pe.wgBarrier.Guard(func() {
pe.waitGroup.Add(1)
})
}
func (pe *PeriodicalExecutor) Wait() {
pe.wgBarrier.Guard(func() {
pe.waitGroup.Wait()
})
}
Таким образом, вы будете знать, почему вы должны принести его в конце.dts.insertExecutor.Wait(), конечно ждём всехgoroutine taskЗаканчивать.
думать
Глядя на исходный код, я подумал о некоторых других дизайнерских идеях. У вас есть похожие вопросы:
- в анализе
executors, вы обнаружите, что во многих местах естьlock
go testСуществует состояние гонки, используйте блокировку, чтобы избежать этой ситуации.
- в анализе
confirmChanобнаружил, что на этот разпредставитьОн только что появился, почему он разработан таким образом?
Это было раньше:
wg.Add(1)написано наexecuteTasks(); теперь это: первыйwg.Add(1), тогда отпустиconfirmChanблокироватьесли
executor funcвыполнить блокировку,Add taskВсе еще в процессе, так как блокировки нет, возможно, она будет выполнена в ближайшее время.Executor.Wait(), это появитсяwg.Wait()существуетwg.Add()перед выполнением это будетpanic
Подробности смотрите в последней версииTestPeriodicalExecutor_WaitFast(), возможно, вы захотите запустить эту версию, вы можете воспроизвести
Суммировать
несколько оставшихсяexecutorsДля анализа я оставлю всем возможность посмотреть исходный код.
Короче, общий дизайн:
- следовать дизайну, ориентированному на интерфейс
- Гибкое использование
channel,waitgroupКомпактный инструмент - Комбинация исполнительный блок + накопительный блок
существуетgo-zeroВ книге также много практических инструментов для компонентов. Правильное использование инструментов очень помогает повысить производительность службы и эффективность разработки. Я надеюсь, что эта статья принесет вам пользу.
адрес проекта
Если вы считаете, что статья хорошая, нажмите звездочку на github 🤝.
В то же время каждый может использоватьgo-zero,GitHub.com/them-specialty/go…