предисловие
Цель создания этой библиотеки заключается в том, что существующие библиотеки Go не имеют таких функций, как гибкое определение запусков заданий, пакетная отправка задач для уменьшения отправки небольших задач. В прошлом можно было создавать код, основанный на идее рабочих пулов, но проблемы с его использованием никогда не создавали полную библиотеку в целом.
go-zero
только что созданныйexecutors
.
представлять
существуетgo-zero
середина,executors
Действуйте как пул задач, выполняйте многозадачную буферизацию и используйте задачи для пакетной обработки. Такие как:clickhouse
многочисленныйinsert
,sql batch insert
. Также доступно вgo-queue
также смexecutors
【существуетqueue
который используется вChunkExecutor
, ограничить размер байтов отправки задачи].
Поэтому, когда у вас есть следующие требования, вы можете использовать этот компонент:
- Отправляйте задачи пакетами
- Буферизируйте некоторые задачи и отправляйте их лениво
- Отложить отправку задачи
Перед конкретным объяснением давайте дадим общий обзор:
дизайн интерфейса
существуетexecutors
Под упаковкой находится следующееexecutor
:
Name | Margin value |
---|---|
bulkexecutor |
достигатьmaxTasks 【Максимальное количество заданий】 Отправить |
chunkexecutor |
достигатьmaxChunkSize 【Максимальное количество байт】Отправить |
periodicalexecutor |
basic executor |
delayexecutor |
Задержка исполнения входящегоfn()
|
lessexecutor |
Вы увидите, что в дополнение к специальным функциямdelay
,less
, остальные триexecutor
+ container
Комбинированный дизайн:
func NewBulkExecutor(execute Execute, opts ...BulkOption) *BulkExecutor {
// 选项模式:在 go-zero 中多处出现。在多配置下,比较好的设计思路
// https://halls-of-valhalla.org/beta/articles/functional-options-pattern-in-go,54/
options := newBulkOptions()
for _, opt := range opts {
opt(&options)
}
// 1. task container: [execute 真正做执行的函数] [maxTasks 执行临界点]
container := &bulkContainer{
execute: execute,
maxTasks: options.cachedTasks,
}
// 2. 可以看出 bulkexecutor 底层依赖 periodicalexecutor
executor := &BulkExecutor{
executor: NewPeriodicalExecutor(options.flushInterval, container),
container: container,
}
return executor
}
и этоcontainer
Являетсяinterface
:
TaskContainer interface {
// 把 task 加入 container
AddTask(task interface{}) bool
// 实际上是去执行传入的 execute func()
Execute(tasks interface{})
// 达到临界值,移除 container 中全部的 task,通过 channel 传递到 execute func() 执行
RemoveAll() interface{}
}
Это показывает зависимости между:
-
bulkexecutor
:periodicalexecutor
+bulkContainer
-
chunkexecutor
:periodicalexecutor
+chunkContainer
Итак, вы хотите закончить свой собственный
executor
, можно реализоватьcontainer
из этих трех интерфейсов, а затем объединитьperiodicalexecutor
просто хорошо
Итак, вернемся к 👆 этой картинке, наше внимание сосредоточено наperiodicalexecutor
, чтобы увидеть, как он был разработан?
как использовать
Сначала посмотрим, как использовать этот компонент в бизнесе:
Существует временная служба, которая выполняется в фиксированное время каждый день сmysql
прибытьclickhouse
синхронизация данных:
type DailyTask struct {
ckGroup *clickhousex.Cluster
insertExecutor *executors.BulkExecutor
mysqlConn sqlx.SqlConn
}
инициализацияbulkExecutor
:
func (dts *DailyTask) Init() {
// insertIntoCk() 是真正insert执行函数【需要开发者自己编写具体业务逻辑】
dts.insertExecutor = executors.NewBulkExecutor(
dts.insertIntoCk,
executors.WithBulkInterval(time.Second*3), // 3s会自动刷一次container中task去执行
executors.WithBulkTasks(10240), // container最大task数。一般设为2的幂次
)
}
Дополнительное введение:
clickhouse
Он подходит для крупномасштабных вставок, потому что скорость вставки очень высока, а крупномасштабные вставки могут в полной мере использовать Clickhouse.
Основная бизнес-логика написана:
func (dts *DailyTask) insertNewData(ch chan interface{}, sqlFromDb *model.Task) error {
for item := range ch {
if r, vok := item.(*model.Task); !vok {
continue
}
err := dts.insertExecutor.Add(r)
if err != nil {
r.Tag = sqlFromDb.Tag
r.TagId = sqlFromDb.Id
r.InsertId = genInsertId()
r.ToRedis = toRedis == constant.INCACHED
r.UpdateWay = sqlFromDb.UpdateWay
// 1⃣️
err := dts.insertExecutor.Add(r)
if err != nil {
logx.Error(err)
}
}
}
// 2⃣️
dts.insertExecutor.Flush()
// 3⃣️
dts.insertExecutor.Wait()
}
может задаться вопросом, почему
Flush(), Wait()
, который будет проанализирован исходным кодом позже
Обычно используются 3 шага:
-
Add()
: присоединиться к задаче -
Flush()
: обновитьcontainer
задачи в -
Wait()
: Дождитесь завершения выполнения всех задач.
Анализ исходного кода
Основной анализ здесь
periodicalexecutor
, потому что два других обычно используютсяexecutor
зависеть от этого
инициализация
func New...(interval time.Duration, container TaskContainer) *PeriodicalExecutor {
executor := &PeriodicalExecutor{
commander: make(chan interface{}, 1),
interval: interval,
container: container,
confirmChan: make(chan lang.PlaceholderType),
newTicker: func(d time.Duration) timex.Ticker {
return timex.NewTicker(interval)
},
}
...
return executor
}
-
commander
:передачаtasks
канал -
container
: склад временного храненияAdd()
задание -
confirmChan
:блокироватьAdd()
, в начале этогоexecuteTasks()
разблокирует -
ticker
: таймер, предотвратитьAdd()
При блокировке будет возможность регулярного выполнения, а временная задача будет вовремя выпущена
Add()
После инициализации первым шагом бизнес-логики является добавление задачиexecutor
:
func (pe *PeriodicalExecutor) Add(task interface{}) {
if vals, ok := pe.addAndCheck(task); ok {
pe.commander <- vals
<-pe.confirmChan
}
}
func (pe *PeriodicalExecutor) addAndCheck(task interface{}) (interface{}, bool) {
pe.lock.Lock()
defer func() {
// 一开始为 false
var start bool
if !pe.guarded {
// backgroundFlush() 会将 guarded 重新置反
pe.guarded = true
start = true
}
pe.lock.Unlock()
// 在第一条 task 加入的时候就会执行 if 中的 backgroundFlush()。后台协程刷task
if start {
pe.backgroundFlush()
}
}()
// 控制maxTask,>=maxTask 将container中tasks pop, return
if pe.container.AddTask(task) {
return pe.container.RemoveAll(), true
}
return nil, false
}
addAndCheck()
серединаAddTask()
Контролирует максимальное количество задач, если оно превышается, оно будет выполненоRemoveAll()
, будет временно хранитьcontainer
задачи выскакивают, переходят кcommander
, за которым следует цикл горутины для чтения и последующего выполнения задач.
backgroundFlush()
Запустить фоновую сопрограмму, даcontainer
Задача в , которая постоянно обновляется:
func (pe *PeriodicalExecutor) backgroundFlush() {
// 封装 go func(){}
threading.GoSafe(func() {
ticker := pe.newTicker(pe.interval)
defer ticker.Stop()
var commanded bool
last := timex.Now()
for {
select {
// 从channel拿到 []tasks
case vals := <-pe.commander:
commanded = true
// 实质:wg.Add(1)
pe.enterExecution()
// 放开 Add() 的阻塞,而且此时暂存区也为空。才开始新的 task 加入
pe.confirmChan <- lang.Placeholder
// 真正的执行 task 逻辑
pe.executeTasks(vals)
last = timex.Now()
case <-ticker.Chan():
if commanded {
// 由于select选择的随机性,如果同时满足两个条件同时执行完上面的,此处置反,并跳过本段执行
// https://draveness.me/golang/docs/part2-foundation/ch05-keyword/golang-select/
commanded = false
} else if pe.Flush() {
// 刷新完成,定时器清零。暂存区空了,开始下一次定时刷新
last = timex.Now()
} else if timex.Since(last) > pe.interval*idleRound {
// 既没到maxTask,Flush() err,并且 last->now 时间过长,会再次触发 Flush()
// 只有这置反,才会开启一个新的 backgroundFlush() 后台协程
pe.guarded = false
// 再次刷新,防止漏掉
pe.Flush()
return
}
}
}
})
}
Всего два процесса:
-
commander
полученоRemoveAll()
Сдал задачи, потом выполняю, и отпускаюAdd()
, продолжитьAdd()
-
ticker
Когда время истекло, если первый шаг не выполнен, то автоматическиFlush()
, также выполнит задание
Wait()
существуетbackgroundFlush()
, упоминая функцию:enterExecution()
:
func (pe *PeriodicalExecutor) enterExecution() {
pe.wgBarrier.Guard(func() {
pe.waitGroup.Add(1)
})
}
func (pe *PeriodicalExecutor) Wait() {
pe.wgBarrier.Guard(func() {
pe.waitGroup.Wait()
})
}
Таким образом, вы будете знать, почему вы должны принести его в конце.dts.insertExecutor.Wait()
, конечно ждём всехgoroutine task
Заканчивать.
думать
Глядя на исходный код, я подумал о некоторых других дизайнерских идеях. У вас есть похожие вопросы:
- в анализе
executors
, вы обнаружите, что во многих местах естьlock
go test
Существует состояние гонки, используйте блокировку, чтобы избежать этой ситуации.
- в анализе
confirmChan
обнаружил, что на этот разпредставитьОн только что появился, почему он разработан таким образом?
Это было раньше:
wg.Add(1)
написано наexecuteTasks()
; теперь это: первыйwg.Add(1)
, тогда отпустиconfirmChan
блокироватьесли
executor func
выполнить блокировку,Add task
Все еще в процессе, так как блокировки нет, возможно, она будет выполнена в ближайшее время.Executor.Wait()
, это появитсяwg.Wait()
существуетwg.Add()
перед выполнением это будетpanic
Подробности смотрите в последней версииTestPeriodicalExecutor_WaitFast()
, возможно, вы захотите запустить эту версию, вы можете воспроизвести
Суммировать
несколько оставшихсяexecutors
Для анализа я оставлю всем возможность посмотреть исходный код.
Короче, общий дизайн:
- следовать дизайну, ориентированному на интерфейс
- Гибкое использование
channel
,waitgroup
Компактный инструмент - Комбинация исполнительный блок + накопительный блок
существуетgo-zero
В книге также много практических инструментов для компонентов. Правильное использование инструментов очень помогает повысить производительность службы и эффективность разработки. Я надеюсь, что эта статья принесет вам пользу.
адрес проекта
Если вы считаете, что статья хорошая, нажмите звездочку на github 🤝.
В то же время каждый может использоватьgo-zero
,GitHub.com/them-specialty/go…