Говоря о дизайне пакетной обработки в go-zero | Семидневный пунш

Go

предисловие

Цель создания этой библиотеки заключается в том, что существующие библиотеки Go не имеют таких функций, как гибкое определение запусков заданий, пакетная отправка задач для уменьшения отправки небольших задач. В прошлом можно было создавать код, основанный на идее рабочих пулов, но проблемы с его использованием никогда не создавали полную библиотеку в целом.

go-zeroтолько что созданныйexecutors.

представлять

существуетgo-zeroсередина,executorsДействуйте как пул задач, выполняйте многозадачную буферизацию и используйте задачи для пакетной обработки. Такие как:clickhouseмногочисленныйinsert,sql batch insert. Также доступно вgo-queueтакже смexecutors【существуетqueueкоторый используется вChunkExecutor, ограничить размер байтов отправки задачи].

Поэтому, когда у вас есть следующие требования, вы можете использовать этот компонент:

  • Отправляйте задачи пакетами
  • Буферизируйте некоторые задачи и отправляйте их лениво
  • Отложить отправку задачи

Перед конкретным объяснением давайте дадим общий обзор:

дизайн интерфейса

существуетexecutorsПод упаковкой находится следующееexecutor:

Name Margin value
bulkexecutor достигатьmaxTasks【Максимальное количество заданий】 Отправить
chunkexecutor достигатьmaxChunkSize【Максимальное количество байт】Отправить
periodicalexecutor basic executor
delayexecutor Задержка исполнения входящегоfn()
lessexecutor

Вы увидите, что в дополнение к специальным функциямdelay,less, остальные триexecutor + containerКомбинированный дизайн:

func NewBulkExecutor(execute Execute, opts ...BulkOption) *BulkExecutor {
  // 选项模式:在 go-zero 中多处出现。在多配置下,比较好的设计思路
  // https://halls-of-valhalla.org/beta/articles/functional-options-pattern-in-go,54/
	options := newBulkOptions()
	for _, opt := range opts {
		opt(&options)
	}
  // 1. task container: [execute 真正做执行的函数] [maxTasks 执行临界点]
	container := &bulkContainer{
		execute:  execute,
		maxTasks: options.cachedTasks,
	}
  // 2. 可以看出 bulkexecutor 底层依赖 periodicalexecutor
	executor := &BulkExecutor{
		executor:  NewPeriodicalExecutor(options.flushInterval, container),
		container: container,
	}

	return executor
}

и этоcontainerЯвляетсяinterface:

TaskContainer interface {
  	// 把 task 加入 container
		AddTask(task interface{}) bool
    // 实际上是去执行传入的 execute func()
		Execute(tasks interface{})
		// 达到临界值,移除 container 中全部的 task,通过 channel 传递到 execute func() 执行
		RemoveAll() interface{}
	}

Это показывает зависимости между:

  • bulkexecutor:periodicalexecutor + bulkContainer
  • chunkexecutor:periodicalexecutor + chunkContainer

Итак, вы хотите закончить свой собственныйexecutor, можно реализоватьcontainerиз этих трех интерфейсов, а затем объединитьperiodicalexecutorпросто хорошо

Итак, вернемся к 👆 этой картинке, наше внимание сосредоточено наperiodicalexecutor, чтобы увидеть, как он был разработан?

как использовать

Сначала посмотрим, как использовать этот компонент в бизнесе:

Существует временная служба, которая выполняется в фиксированное время каждый день сmysqlприбытьclickhouseсинхронизация данных:

type DailyTask struct {
	ckGroup        *clickhousex.Cluster
	insertExecutor *executors.BulkExecutor
	mysqlConn      sqlx.SqlConn
}

инициализацияbulkExecutor:

func (dts *DailyTask) Init() {
  // insertIntoCk() 是真正insert执行函数【需要开发者自己编写具体业务逻辑】
	dts.insertExecutor = executors.NewBulkExecutor(
		dts.insertIntoCk,
		executors.WithBulkInterval(time.Second*3),	// 3s会自动刷一次container中task去执行
		executors.WithBulkTasks(10240),							// container最大task数。一般设为2的幂次
	)
}

Дополнительное введение:clickhouseОн подходит для крупномасштабных вставок, потому что скорость вставки очень высока, а крупномасштабные вставки могут в полной мере использовать Clickhouse.

Основная бизнес-логика написана:

func (dts *DailyTask) insertNewData(ch chan interface{}, sqlFromDb *model.Task) error {
	for item := range ch {
		if r, vok := item.(*model.Task); !vok {
			continue
		}
		err := dts.insertExecutor.Add(r)
		if err != nil {
			r.Tag = sqlFromDb.Tag
			r.TagId = sqlFromDb.Id
			r.InsertId = genInsertId()
			r.ToRedis = toRedis == constant.INCACHED
			r.UpdateWay = sqlFromDb.UpdateWay
      // 1⃣️
			err := dts.insertExecutor.Add(r)
			if err != nil {
				logx.Error(err)
			}
		}
	}
  // 2⃣️
	dts.insertExecutor.Flush()
  // 3⃣️
	dts.insertExecutor.Wait()
}

может задаться вопросом, почемуFlush(), Wait(), который будет проанализирован исходным кодом позже

Обычно используются 3 шага:

  • Add(): присоединиться к задаче
  • Flush(): обновитьcontainerзадачи в
  • Wait(): Дождитесь завершения выполнения всех задач.

Анализ исходного кода

Основной анализ здесьperiodicalexecutor, потому что два других обычно используютсяexecutorзависеть от этого

инициализация

func New...(interval time.Duration, container TaskContainer) *PeriodicalExecutor {
	executor := &PeriodicalExecutor{
		commander:   make(chan interface{}, 1),
		interval:    interval,
		container:   container,
		confirmChan: make(chan lang.PlaceholderType),
		newTicker: func(d time.Duration) timex.Ticker {
			return timex.NewTicker(interval)
		},
	}
  ...
	return executor
}

  • commander:передачаtasksканал
  • container: склад временного храненияAdd()задание
  • confirmChan:блокироватьAdd(), в начале этогоexecuteTasks()разблокирует
  • ticker: таймер, предотвратитьAdd()При блокировке будет возможность регулярного выполнения, а временная задача будет вовремя выпущена

Add()

После инициализации первым шагом бизнес-логики является добавление задачиexecutor:

func (pe *PeriodicalExecutor) Add(task interface{}) {
	if vals, ok := pe.addAndCheck(task); ok {
		pe.commander <- vals
		<-pe.confirmChan
	}
}

func (pe *PeriodicalExecutor) addAndCheck(task interface{}) (interface{}, bool) {
	pe.lock.Lock()
	defer func() {
    // 一开始为 false
		var start bool
		if !pe.guarded {
      // backgroundFlush() 会将 guarded 重新置反
			pe.guarded = true
			start = true
		}
		pe.lock.Unlock()
    // 在第一条 task 加入的时候就会执行 if 中的 backgroundFlush()。后台协程刷task
		if start {
			pe.backgroundFlush()
		}
	}()
	// 控制maxTask,>=maxTask 将container中tasks pop, return
	if pe.container.AddTask(task) {
		return pe.container.RemoveAll(), true
	}

	return nil, false
}

addAndCheck()серединаAddTask()Контролирует максимальное количество задач, если оно превышается, оно будет выполненоRemoveAll(), будет временно хранитьcontainerзадачи выскакивают, переходят кcommander, за которым следует цикл горутины для чтения и последующего выполнения задач.

backgroundFlush()

Запустить фоновую сопрограмму, даcontainerЗадача в , которая постоянно обновляется:

func (pe *PeriodicalExecutor) backgroundFlush() {
  // 封装 go func(){}
	threading.GoSafe(func() {
		ticker := pe.newTicker(pe.interval)
		defer ticker.Stop()

		var commanded bool
		last := timex.Now()
		for {
			select {
      // 从channel拿到 []tasks
			case vals := <-pe.commander:
				commanded = true
        // 实质:wg.Add(1)
				pe.enterExecution()
        // 放开 Add() 的阻塞,而且此时暂存区也为空。才开始新的 task 加入
				pe.confirmChan <- lang.Placeholder
        // 真正的执行 task 逻辑
				pe.executeTasks(vals)
				last = timex.Now()
			case <-ticker.Chan():
				if commanded {
          // 由于select选择的随机性,如果同时满足两个条件同时执行完上面的,此处置反,并跳过本段执行
          // https://draveness.me/golang/docs/part2-foundation/ch05-keyword/golang-select/
					commanded = false
				} else if pe.Flush() {
          // 刷新完成,定时器清零。暂存区空了,开始下一次定时刷新
					last = timex.Now()
				} else if timex.Since(last) > pe.interval*idleRound {
          // 既没到maxTask,Flush() err,并且 last->now 时间过长,会再次触发 Flush()
          // 只有这置反,才会开启一个新的 backgroundFlush() 后台协程
          pe.guarded = false
					// 再次刷新,防止漏掉
					pe.Flush()
					return
				}
			}
		}
	})
}

Всего два процесса:

  • commanderполученоRemoveAll()Сдал задачи, потом выполняю, и отпускаюAdd(), продолжитьAdd()
  • tickerКогда время истекло, если первый шаг не выполнен, то автоматическиFlush(), также выполнит задание

Wait()

существуетbackgroundFlush(), упоминая функцию:enterExecution():

func (pe *PeriodicalExecutor) enterExecution() {
	pe.wgBarrier.Guard(func() {
		pe.waitGroup.Add(1)
	})
}

func (pe *PeriodicalExecutor) Wait() {
	pe.wgBarrier.Guard(func() {
		pe.waitGroup.Wait()
	})
}

Таким образом, вы будете знать, почему вы должны принести его в конце.dts.insertExecutor.Wait(), конечно ждём всехgoroutine taskЗаканчивать.

думать

Глядя на исходный код, я подумал о некоторых других дизайнерских идеях. У вас есть похожие вопросы:

  • в анализеexecutors, вы обнаружите, что во многих местах естьlock

go testСуществует состояние гонки, используйте блокировку, чтобы избежать этой ситуации.

  • в анализеconfirmChanобнаружил, что на этот разпредставитьОн только что появился, почему он разработан таким образом?

Это было раньше:wg.Add(1)написано наexecuteTasks(); теперь это: первыйwg.Add(1), тогда отпустиconfirmChanблокировать

еслиexecutor funcвыполнить блокировку,Add taskВсе еще в процессе, так как блокировки нет, возможно, она будет выполнена в ближайшее время.Executor.Wait(), это появитсяwg.Wait()существуетwg.Add()перед выполнением это будетpanic

Подробности смотрите в последней версииTestPeriodicalExecutor_WaitFast(), возможно, вы захотите запустить эту версию, вы можете воспроизвести

Суммировать

несколько оставшихсяexecutorsДля анализа я оставлю всем возможность посмотреть исходный код.

Короче, общий дизайн:

  • следовать дизайну, ориентированному на интерфейс
  • Гибкое использованиеchannel,waitgroupКомпактный инструмент
  • Комбинация исполнительный блок + накопительный блок

существуетgo-zeroВ книге также много практических инструментов для компонентов. Правильное использование инструментов очень помогает повысить производительность службы и эффективность разработки. Я надеюсь, что эта статья принесет вам пользу.

адрес проекта

Если вы считаете, что статья хорошая, нажмите звездочку на github 🤝.

В то же время каждый может использоватьgo-zero,GitHub.com/them-specialty/go…