Gin Framework Practice Serial Extras | Запланированные задачи Cron

Go

введение

  • Много раз нам нужно регулярно обрабатывать некоторые задачи, давайте сегодня узнаем о доступе к фреймворку gin.
  • Причина выбора github.com/robfig/cron/v3 github star 6.7k Автор поддерживает его
  • Общее написание
  • Интерпретация исходного кода
  • Как убедиться, что при выключении/перезагрузке запущенные задачи должны быть завершены, прежде чем продолжить выполнение выключения/перезапуска
  • Адрес кода дела

1. Быстрое использование

go mod init github.com/18211167516/go-lib/cron/rebfig_cron

Создайте новый файл main.go

package main

import (
  "fmt"
  "time"

  "github.com/robfig/cron/v3"
)

type testJob struct{}

//实现了 type Job interface {Run()}
func (t testJob) Run() {
	fmt.Println("i.m test job")
}

func main() {
  c := cron.New()

  c.AddFunc("@every 1s", func() {
    fmt.Println("tick every 1 second")
  })
  
  c.AddJob("* * * * *", testJob{})

  c.Start()
  
  select {} 
}

  • Создайте объект кукурузы для управления запланированными задачами
  • Используйте addFunc для добавления задач на время
  • Метод Start запускает запланированную задачу. начать новую горутину
  • select{} предотвращает выход основной горутины

2. Новые () опции

На самом деле возвращает тип Option func(*Cron)

В настоящее время существует 5 встроенных опций.

2.1 WithLocation указывает часовой пояс

loc,_ := time.LoadLocation("America/Los_Angeles")
c := cron.New(cron.WithLocation(loc))

2.2 WithSeconds поддерживает детализацию до второго уровня (по умолчанию используется тот же уровень минут, что и в crontab)

На самом деле WithSeconds реализован с использованием WithParser.

c:=cron.New(cron.WithSeconds())
//每2秒执行一次
c.AddFunc("*/2 * * * * *", func() {
		file, _ := os.OpenFile("log.txt", os.O_APPEND|os.O_CREATE, 0755)
		defer file.Close()
		fmt.Println("test 11")
		file.Write([]byte("test 111\r\n"))
	})

2.3 WithParser использует собственный парсер

Реализовать тип интерфейса ScheduleParser interface {

Parse(spec string) (Schedule, error)

}

paeser:= cron.NewParser(
		cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor,
	)
    
c := cron.New(cron.WithParser(paeser))
c.AddFunc("1 * * * * *", func () {
  fmt.Println("every 1 second")
})

2.4 Оболочка задания WithChain (промежуточное программное обеспечение задания)

  • Промежуточное ПО по умолчанию 3
  Recover 捕获内部Job产生的 panic;
  DelayIfStillRunning 触发时,如果上一次任务还未执行完成(耗时太长),则等待上一次任务完成之后再执行
  SkipIfStillRunning 触发时,如果上一次任务还未完成,则跳过此次执行
  • использовать

type testJob struct{}

func (t testJob) Run() {
	panic("test job")
	//fmt.Println("i.m test job")
}

logger := cron.VerbosePrintfLogger(log.New(io.MultiWriter(f, os.Stdout), "cron: ", log.LstdFlags))
//全局
c := cron.New(cron.WithChain(cron.Recover(logger)))

c.addFunc("* * * * * *",func(){
	panic("1232132")
})

//局部中间件
c.AddJob("@every 1s", cron.NewChain(cron.Recover(cron.DefaultLogger)).Then(testJob{}))

  • Основная реализация
//先执行Job中间件在执行具体Job
func (c Chain) Then(j Job) Job {
	for i := range c.wrappers {
		j = c.wrappers[len(c.wrappers)-i-1](j)
	}
	return j
}

2.5 Пользовательский журнал WithLogger Logger

Сначала посмотрите на Logger по умолчанию

var DefaultLogger Logger = PrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags))

func PrintfLogger(l interface{ Printf(string, ...interface{}) }) Logger {
  return printfLogger{l, false}
}

func VerbosePrintfLogger(l interface{ Printf(string, ...interface{}) }) Logger {
  return printfLogger{l, true}
}

type printfLogger struct {
  logger  interface{ Printf(string, ...interface{}) }
  logInfo bool
}

func (pl printfLogger) Info(msg string, keysAndValues ...interface{}) {
	if pl.logInfo {
		keysAndValues = formatTimes(keysAndValues)
		pl.logger.Printf(
			formatString(len(keysAndValues)),
			append([]interface{}{msg}, keysAndValues...)...)
	}
}

func (pl printfLogger) Error(err error, msg string, keysAndValues ...interface{}) {
	keysAndValues = formatTimes(keysAndValues)
	pl.logger.Printf(
		formatString(len(keysAndValues)+2),
		append([]interface{}{msg, "error", err}, keysAndValues...)...)
}

Вы можете настроить Logger Logger, если вы реализуете интерфейс Logger.

  type Logger interface {
      // Info logs routine messages about cron's operation.
      Info(msg string, keysAndValues ...interface{})
      // Error logs an error condition.
      Error(err error, msg string, keysAndValues ...interface{})
  }
    //实现控制台和文件日志双写
    f, _ := os.Create("cron.log")

	c := cron.New(cron.WithSeconds(), cron.WithLogger(
		cron.VerbosePrintfLogger(log.New(io.MultiWriter(f, os.Stdout), "cron: ", log.LstdFlags))))

	c.AddFunc("*/2 * * * * *", func() {
		fmt.Println("test 11")
	})

3. Создайте задачу

3.1 addFunc()

 c := cron.New()

  c.AddFunc("@every 1s", func() {
    fmt.Println("tick every 1 second")
  })
  
c.Start()

3.2 addJob()

type testJob struct{}

//实现了 type Job interface {Run()}
func (t testJob) Run() {
	fmt.Println("i.m test job")
}

c.AddJob("* * * * *", testJob{})

c.Start()

3.3 Интерпретация исходного кода

step1 AddFunc реализован на основе addJob


func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) {
	return c.AddJob(spec, FuncJob(cmd))
}

// AddJob adds a Job to the Cron to be run on the given schedule.
// The spec is parsed using the time zone of this Cron instance as the default.
// An opaque ID is returned that can be used to later remove it.
func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) {
	//解析时间格式
	schedule, err := c.parser.Parse(spec)
	if err != nil {
		return 0, err
	}
	return c.Schedule(schedule, cmd), nil
}

step2

func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
	//并发锁
	c.runningMu.Lock()
	defer c.runningMu.Unlock()
    //自增ID
	c.nextID++
	entry := &Entry{
		ID:         c.nextID,
		Schedule:   schedule,
		WrappedJob: c.chain.Then(cmd),
		Job:        cmd,
	}
    //在服务已经启动的情况下新增任务(不知道为什么有这种操作)
	if !c.running {
		c.entries = append(c.entries, entry)
	} else {
		c.add <- entry
	}
	return entry.ID
}

step3, наконец, запускает c.Start(), ядром является cron, метод run()

  • Проверьте задачи и получите следующее время выполнения каждой задачи
	for _, entry := range c.entries {
		entry.Next = entry.Schedule.Next(now)
		c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
	}
  • Сортировать по таймеру запуска
//无线循环
	for {
		// 任务排序
		sort.Sort(byTime(c.entries))

		var timer *time.Timer
        //设置定时器(多少时间后执行)
		if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
			// If there are no entries yet, just sleep - it still handles new entries
			// and stop requests.
			timer = time.NewTimer(100000 * time.Hour)
		} else {
			timer = time.NewTimer(c.entries[0].Next.Sub(now))
		}
       }
  • слушать канал
		for {
			select {
            //定时器触发
			case now = <-timer.C:
				now = now.In(c.location)
				c.logger.Info("wake", "now", now)

				// Run every entry whose next time was less than now
                //遍历全部任务
				for _, e := range c.entries {
                	// 判断如果第一个执行时间少于当前时间或者时间零点,则跳出不执行
					if e.Next.After(now) || e.Next.IsZero() {
						break
					}
                    //执行Job包装器
					c.startJob(e.WrappedJob)
                    //更新
					e.Prev = e.Next
					e.Next = e.Schedule.Next(now)
					c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
				}
			// 运行后新增任务
			case newEntry := <-c.add:
				timer.Stop()
				now = c.now()
				newEntry.Next = newEntry.Schedule.Next(now)
				c.entries = append(c.entries, newEntry)
				c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)
			// 
			case replyChan := <-c.snapshot:
				replyChan <- c.entrySnapshot()
				continue
                
			//服务停止
            
			case <-c.stop:
            	//停止服务
				timer.Stop()
				c.logger.Info("stop")
				return
			//移除任务会影响服务停止
			case id := <-c.remove:
				timer.Stop()
				now = c.now()
				c.removeEntry(id)
				c.logger.Info("removed", "entry", id)
			}

			break
		}

  • time.Stop()

Используйте sync.WaitGroup для достижения

   func (c *Cron) Stop() context.Context {
      c.runningMu.Lock()
      defer c.runningMu.Unlock()
      if c.running {
          c.stop <- struct{}{}
          c.running = false
      }
      //后台阻塞阻塞代码
      ctx, cancel := context.WithCancel(context.Background())
      go func() {
      		//计数器为0
          c.jobWaiter.Wait()
          //取消阻塞
          cancel()
      }()
      return ctx
  }
  • выполнять задачи
func (c *Cron) startJob(j Job) {
	//计数器加一
	c.jobWaiter.Add(1)
	go func() {
		defer c.jobWaiter.Done()
		j.Run()
	}()
}

4. Формат времени

  • предопределенный формат
  @yearly:也可以写作@annually,表示每年第一天的 0 点。等价于0 0 1 1 *;
  @monthly:表示每月第一天的 0 点。等价于0 0 1 * *;
  @weekly:表示每周第一天的 0 点,注意第一天为周日,即周六结束,周日开始的那个 0 点。等价于0 0 * * 0;
  @daily:也可以写作@midnight,表示每天 0 点。等价于0 0 * * *;
  @hourly:表示每小时的开始。等价于0 * * * *。
  • формат фиксированного интервала
	@every <duration>  每个duration触发
    c.AddFunc("@every 1s", func() {
		fmt.Println("test 111")
	})
    
	c.AddFunc(fmt.Sprint("@every ", time.Duration(1)*time.Second), func() {
		fmt.Println("test 222")
	})
  • пользовательский формат времени

默认支持5位到分钟级(等同于crontab)

c.AddFunc("* * * * *", func () {
  fmt.Println("every 1 分钟")
})

//支持秒
c := cron.New(cron.WithSeconds())
c.AddFunc("* * * * * *", func () {
  fmt.Println("every 1 秒钟")
})

5. Резюме

Хотя воробей маленький и полноценный, из-за функции golang он естественным образом поддерживает потокобезопасность и обеспечивает целостность выполнения задачи. Если вы планируете отделить уровень сохранения задач, возможно, будет более выгодно расширить дистрибутив в будущем.Я лично считаю, что три промежуточных ПО должны быть добавлены по умолчанию (я думаю, никто не хочет завершать все задачи из-за отчета об ошибке задачи )

6. Ссылка

  1. Go ежедневная библиотека cron
  2. стратегия использования пакета времени
  3. cron

7. Цикл статей