введение
- Много раз нам нужно регулярно обрабатывать некоторые задачи, давайте сегодня узнаем о доступе к фреймворку gin.
- Причина выбора github.com/robfig/cron/v3 github star 6.7k Автор поддерживает его
- Общее написание
- Интерпретация исходного кода
- Как убедиться, что при выключении/перезагрузке запущенные задачи должны быть завершены, прежде чем продолжить выполнение выключения/перезапуска
- Адрес кода дела
1. Быстрое использование
go mod init github.com/18211167516/go-lib/cron/rebfig_cron
Создайте новый файл main.go
package main
import (
"fmt"
"time"
"github.com/robfig/cron/v3"
)
type testJob struct{}
//实现了 type Job interface {Run()}
func (t testJob) Run() {
fmt.Println("i.m test job")
}
func main() {
c := cron.New()
c.AddFunc("@every 1s", func() {
fmt.Println("tick every 1 second")
})
c.AddJob("* * * * *", testJob{})
c.Start()
select {}
}
- Создайте объект кукурузы для управления запланированными задачами
- Используйте addFunc для добавления задач на время
- Метод Start запускает запланированную задачу. начать новую горутину
- select{} предотвращает выход основной горутины
2. Новые () опции
На самом деле возвращает тип Option func(*Cron)
В настоящее время существует 5 встроенных опций.
2.1 WithLocation указывает часовой пояс
loc,_ := time.LoadLocation("America/Los_Angeles")
c := cron.New(cron.WithLocation(loc))
2.2 WithSeconds поддерживает детализацию до второго уровня (по умолчанию используется тот же уровень минут, что и в crontab)
На самом деле WithSeconds реализован с использованием WithParser.
c:=cron.New(cron.WithSeconds())
//每2秒执行一次
c.AddFunc("*/2 * * * * *", func() {
file, _ := os.OpenFile("log.txt", os.O_APPEND|os.O_CREATE, 0755)
defer file.Close()
fmt.Println("test 11")
file.Write([]byte("test 111\r\n"))
})
2.3 WithParser использует собственный парсер
Реализовать тип интерфейса ScheduleParser interface {
Parse(spec string) (Schedule, error)
}
paeser:= cron.NewParser(
cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor,
)
c := cron.New(cron.WithParser(paeser))
c.AddFunc("1 * * * * *", func () {
fmt.Println("every 1 second")
})
2.4 Оболочка задания WithChain (промежуточное программное обеспечение задания)
- Промежуточное ПО по умолчанию 3
Recover 捕获内部Job产生的 panic;
DelayIfStillRunning 触发时,如果上一次任务还未执行完成(耗时太长),则等待上一次任务完成之后再执行
SkipIfStillRunning 触发时,如果上一次任务还未完成,则跳过此次执行
- использовать
type testJob struct{}
func (t testJob) Run() {
panic("test job")
//fmt.Println("i.m test job")
}
logger := cron.VerbosePrintfLogger(log.New(io.MultiWriter(f, os.Stdout), "cron: ", log.LstdFlags))
//全局
c := cron.New(cron.WithChain(cron.Recover(logger)))
c.addFunc("* * * * * *",func(){
panic("1232132")
})
//局部中间件
c.AddJob("@every 1s", cron.NewChain(cron.Recover(cron.DefaultLogger)).Then(testJob{}))
- Основная реализация
//先执行Job中间件在执行具体Job
func (c Chain) Then(j Job) Job {
for i := range c.wrappers {
j = c.wrappers[len(c.wrappers)-i-1](j)
}
return j
}
2.5 Пользовательский журнал WithLogger Logger
Сначала посмотрите на Logger по умолчанию
var DefaultLogger Logger = PrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags))
func PrintfLogger(l interface{ Printf(string, ...interface{}) }) Logger {
return printfLogger{l, false}
}
func VerbosePrintfLogger(l interface{ Printf(string, ...interface{}) }) Logger {
return printfLogger{l, true}
}
type printfLogger struct {
logger interface{ Printf(string, ...interface{}) }
logInfo bool
}
func (pl printfLogger) Info(msg string, keysAndValues ...interface{}) {
if pl.logInfo {
keysAndValues = formatTimes(keysAndValues)
pl.logger.Printf(
formatString(len(keysAndValues)),
append([]interface{}{msg}, keysAndValues...)...)
}
}
func (pl printfLogger) Error(err error, msg string, keysAndValues ...interface{}) {
keysAndValues = formatTimes(keysAndValues)
pl.logger.Printf(
formatString(len(keysAndValues)+2),
append([]interface{}{msg, "error", err}, keysAndValues...)...)
}
Вы можете настроить Logger Logger, если вы реализуете интерфейс Logger.
type Logger interface {
// Info logs routine messages about cron's operation.
Info(msg string, keysAndValues ...interface{})
// Error logs an error condition.
Error(err error, msg string, keysAndValues ...interface{})
}
//实现控制台和文件日志双写
f, _ := os.Create("cron.log")
c := cron.New(cron.WithSeconds(), cron.WithLogger(
cron.VerbosePrintfLogger(log.New(io.MultiWriter(f, os.Stdout), "cron: ", log.LstdFlags))))
c.AddFunc("*/2 * * * * *", func() {
fmt.Println("test 11")
})
3. Создайте задачу
3.1 addFunc()
c := cron.New()
c.AddFunc("@every 1s", func() {
fmt.Println("tick every 1 second")
})
c.Start()
3.2 addJob()
type testJob struct{}
//实现了 type Job interface {Run()}
func (t testJob) Run() {
fmt.Println("i.m test job")
}
c.AddJob("* * * * *", testJob{})
c.Start()
3.3 Интерпретация исходного кода
step1 AddFunc реализован на основе addJob
func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) {
return c.AddJob(spec, FuncJob(cmd))
}
// AddJob adds a Job to the Cron to be run on the given schedule.
// The spec is parsed using the time zone of this Cron instance as the default.
// An opaque ID is returned that can be used to later remove it.
func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) {
//解析时间格式
schedule, err := c.parser.Parse(spec)
if err != nil {
return 0, err
}
return c.Schedule(schedule, cmd), nil
}
step2
func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
//并发锁
c.runningMu.Lock()
defer c.runningMu.Unlock()
//自增ID
c.nextID++
entry := &Entry{
ID: c.nextID,
Schedule: schedule,
WrappedJob: c.chain.Then(cmd),
Job: cmd,
}
//在服务已经启动的情况下新增任务(不知道为什么有这种操作)
if !c.running {
c.entries = append(c.entries, entry)
} else {
c.add <- entry
}
return entry.ID
}
step3, наконец, запускает c.Start(), ядром является cron, метод run()
- Проверьте задачи и получите следующее время выполнения каждой задачи
for _, entry := range c.entries {
entry.Next = entry.Schedule.Next(now)
c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
}
- Сортировать по таймеру запуска
//无线循环
for {
// 任务排序
sort.Sort(byTime(c.entries))
var timer *time.Timer
//设置定时器(多少时间后执行)
if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
// If there are no entries yet, just sleep - it still handles new entries
// and stop requests.
timer = time.NewTimer(100000 * time.Hour)
} else {
timer = time.NewTimer(c.entries[0].Next.Sub(now))
}
}
- слушать канал
for {
select {
//定时器触发
case now = <-timer.C:
now = now.In(c.location)
c.logger.Info("wake", "now", now)
// Run every entry whose next time was less than now
//遍历全部任务
for _, e := range c.entries {
// 判断如果第一个执行时间少于当前时间或者时间零点,则跳出不执行
if e.Next.After(now) || e.Next.IsZero() {
break
}
//执行Job包装器
c.startJob(e.WrappedJob)
//更新
e.Prev = e.Next
e.Next = e.Schedule.Next(now)
c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
}
// 运行后新增任务
case newEntry := <-c.add:
timer.Stop()
now = c.now()
newEntry.Next = newEntry.Schedule.Next(now)
c.entries = append(c.entries, newEntry)
c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)
//
case replyChan := <-c.snapshot:
replyChan <- c.entrySnapshot()
continue
//服务停止
case <-c.stop:
//停止服务
timer.Stop()
c.logger.Info("stop")
return
//移除任务会影响服务停止
case id := <-c.remove:
timer.Stop()
now = c.now()
c.removeEntry(id)
c.logger.Info("removed", "entry", id)
}
break
}
- time.Stop()
Используйте sync.WaitGroup для достижения
func (c *Cron) Stop() context.Context {
c.runningMu.Lock()
defer c.runningMu.Unlock()
if c.running {
c.stop <- struct{}{}
c.running = false
}
//后台阻塞阻塞代码
ctx, cancel := context.WithCancel(context.Background())
go func() {
//计数器为0
c.jobWaiter.Wait()
//取消阻塞
cancel()
}()
return ctx
}
- выполнять задачи
func (c *Cron) startJob(j Job) {
//计数器加一
c.jobWaiter.Add(1)
go func() {
defer c.jobWaiter.Done()
j.Run()
}()
}
4. Формат времени
- предопределенный формат
@yearly:也可以写作@annually,表示每年第一天的 0 点。等价于0 0 1 1 *;
@monthly:表示每月第一天的 0 点。等价于0 0 1 * *;
@weekly:表示每周第一天的 0 点,注意第一天为周日,即周六结束,周日开始的那个 0 点。等价于0 0 * * 0;
@daily:也可以写作@midnight,表示每天 0 点。等价于0 0 * * *;
@hourly:表示每小时的开始。等价于0 * * * *。
- формат фиксированного интервала
@every <duration> 每个duration触发
c.AddFunc("@every 1s", func() {
fmt.Println("test 111")
})
c.AddFunc(fmt.Sprint("@every ", time.Duration(1)*time.Second), func() {
fmt.Println("test 222")
})
- пользовательский формат времени
默认支持5位到分钟级(等同于crontab)
c.AddFunc("* * * * *", func () {
fmt.Println("every 1 分钟")
})
//支持秒
c := cron.New(cron.WithSeconds())
c.AddFunc("* * * * * *", func () {
fmt.Println("every 1 秒钟")
})
5. Резюме
Хотя воробей маленький и полноценный, из-за функции golang он естественным образом поддерживает потокобезопасность и обеспечивает целостность выполнения задачи. Если вы планируете отделить уровень сохранения задач, возможно, будет более выгодно расширить дистрибутив в будущем.Я лично считаю, что три промежуточных ПО должны быть добавлены по умолчанию (я думаю, никто не хочет завершать все задачи из-за отчета об ошибке задачи )
6. Ссылка
7. Цикл статей
- Сериализация среды golang для сборки
- Серийная вторая установка Джин
- Сериализация 3. Определите структуру каталогов
- Строительный кейс серии 4 API1
- Строительный кейс Series 5 API2
- Документация по интерфейсу Serial 6 Access swagger
- Серийный компонент семи бревен
- Изящный перезапуск и остановка Serial Eight
- Серийная сборка Fanwai Makefile
- Серийные запланированные задачи Fanwai Cron
- Serial Fanwai Создание инструмента командной строки
- Сериализация Fanwai 3 дня на создание эксклюзивного кэша (первый день)
- 3 дня сериализации для создания эксклюзивного кэша (второй день)
- 3 дня сериализации для создания эксклюзивного кэша (третий день)