что такое таймер
Инструмент, который можно использовать для выполнения некоторых запланированных задач или периодических задач в собственном пакете времени Golang.
Эта статья основана на Go 1.14. Если что-то не так со следующими статьями или проблемами, добро пожаловать на обсуждение и изучение
Ежедневное использование таймера
Связанный таймер
func NewTimer(d Duration) *Timer
func (t *Timer) Reset(d Duration) bool
func (t *Timer) Stop() bool
func After(d Duration) <-chan Time
func AfterFunc(d Duration, f func()) *Timer
func main() {
timer := time.NewTimer(3 * time.Second)
select {
case <-timer.C:
fmt.Println("3秒执行任务")
}
timer.Stop() // 这里来提高 timer 的回收
}
func main() {
tChannel := time.After(3 * time.Second) // 其内部其实是生成了一个 timer
select {
case <-tChannel:
fmt.Println("3秒执行任务")
}
}
func main() {
timer := time.NewTimer(3 * time.Second)
for {
timer.Reset(4 * time.Second) // 这样来复用 timer 和修改执行时间
select {
case <-timer.C:
fmt.Println("每隔4秒执行任务")
}
}
}
Меры предосторожности:
Неправильное использование: time.After здесь будет продолжать генерировать таймер, хотя в конечном итоге он будет переработан, но это приведет к бессмысленному потреблению ресурсов процессора.
func main() {
for {
select {
case <-time.After(3 * time.Second):
fmt.Println("每隔3秒执行一次")
}
}
}
использовать правильно:
func main() {
timer := time.NewTimer(3 * time.Second)
for {
timer.Reset(3 * time.Second) // 这里复用了 timer
select {
case <-timer.C:
fmt.Println("每隔3秒执行一次")
}
}
}
Связанный с тикером
func NewTicker(d Duration) *Ticker
func Tick(d Duration) <-chan Time
func (t *Ticker) Stop()
func main() {
ticker := time.NewTicker(3 * time.Second)
for range ticker.C {
fmt.Print("每隔3秒执行任务")
}
ticker.Stop()
}
неправильное использование:
func main() {
for {
select {
case <-time.Tick(3 * time.Second): // 这里会不断生成 ticker,而且 ticker 会进行重新调度,造成泄漏(后面源码会有解析)
fmt.Println("每隔3秒执行一次")
}
}
}
Анализ исходного кода таймера
Статус первого таймера с заданной картой цикла, вы можете посмотреть статьи, изображения, исходный код, чтобы узнать (общедоступный номер ответа «карта состояния таймера» получить изображение)
Сначала я привожу соответствующие структуры, участвующие в процессе (! ! ! Обратите внимание на разницу между таймером и таймером)
type Timer struct {
C <-chan Time
r runtimeTimer
}
// Ticker 的结构与 Timer 一致
type Ticker struct {
C <-chan Time // 这里就是返回的 channel
r runtimeTimer
}
// If this struct changes,
// adjust ../time/sleep.go:/runtimeTimer.
// 这里是与 runtimeTimer 对应的
type timer struct {
pp puintptr // 对应的当前 P 的指针
when int64 // 需要执行的时间
period int64 // 周期,Ticker 会使用
f func(interface{}, uintptr) // 给 channel 推送信息的方式
arg interface{} // 与 f 相关的第一个参数,可以看下面 Ticker 的例子
seq uintptr // 与 f 相关的第二个参数(后续我们可以看到)
nextwhen int64 // 下次执行的时候
status uint32 // 当前状态
}
// P 结构体中的相关 timer 的字段
type p struct {
...
timersLock mutex // 一个 P 中保证 timers 同步锁
timers []*timer // timers 是四叉小顶堆(后续代码会有说明)
numTimers uint32 // timer 的数量
adjustTimers uint32 // 需要调整的 timer 的数量
deletedTimers uint32 // 需要删除的 timer 的数量
...
}
Начнем с Тикера
func NewTicker(d Duration) *Ticker {
if d <= 0 {
panic(errors.New("non-positive interval for NewTicker"))
}
c := make(chan Time, 1)
t := &Ticker{
C: c,
r: runtimeTimer{
when: when(d),//当前时间+d的时间,可看下面
period: int64(d),//执行周期
f: sendTime,
arg: c, // 就是 f 中第一个参数
},
}
startTimer(&t.r)
return t
}
func when(d Duration) int64 {
if d <= 0 {
return runtimeNano()
}
t := runtimeNano() + int64(d) //当前时间加上需要等待的时间
if t < 0 {
t = 1<<63 - 1 // math.MaxInt64
}
return t
}
func sendTime(c interface{}, seq uintptr) {
select {
case c.(chan Time) <- Now():
default:
}
}
Из NewTicker мы видим, что выполнение начинается с startTimer(), давайте зайдем и посмотрим
addtimer
// startTimer adds t to the timer heap.
// 这里已经说明了 timers 是一种堆的数据结构,由于是定时器,
// 最近的最先执行,所以猜测以 when 来判断的小顶堆
func startTimer(t *timer) {
addtimer(t)
}
func addtimer(t *timer) {
if t.when < 0 {
t.when = maxWhen //maxWhen 是 1<<63 - 1
}
if t.status != timerNoStatus {
throw("addtimer called with initialized timer")
}
t.status = timerWaiting
when := t.when
pp := getg().m.p.ptr()
lock(&pp.timersLock)
cleantimers(pp) // 根据 timer 删除和修改状态进行操作,可以看下面源码相关
doaddtimer(pp, t)// 添加 timer 的到 timers 堆
unlock(&pp.timersLock)
wakeNetPoller(when)
}
// 清理 timers 的源码部分
func cleantimers(pp *p) {
for {
if len(pp.timers) == 0 {
return
}
t := pp.timers[0]// 从 0 开始,即最小的堆顶开始
if t.pp.ptr() != pp {
throw("cleantimers: bad p")
}
switch s := atomic.Load(&t.status); s {
case timerDeleted:
if !atomic.Cas(&t.status, s, timerRemoving) {// status 变更为 timerRemoving
continue
}
dodeltimer0(pp) // 这里是删除 timer 的关键部分,删除堆顶的部分并调整
if !atomic.Cas(&t.status, timerRemoving, timerRemoved) { // stauts 变更为 timerRemoved
badTimer() // 这里就是 throw 一个异常
}
atomic.Xadd(&pp.deletedTimers, -1)
case timerModifiedEarlier, timerModifiedLater:
if !atomic.Cas(&t.status, s, timerMoving) { // stauts 变更为 timerMoving
continue
}
t.when = t.nextwhen // 将执行时间设置为其下次执行的时候
// -----删除堆顶位置,并按照其新的执行时间加入到对应的位置
dodeltimer0(pp)
doaddtimer(pp, t) // 添加 timer 的关键部分
// ------------
if s == timerModifiedEarlier {
atomic.Xadd(&pp.adjustTimers, -1)
}
if !atomic.Cas(&t.status, timerMoving, timerWaiting) {
badTimer()
}
default:
return
}
}
}
// timer 删除的源码部分
//(扩充:func dodeltimer(pp *p, i int) 意思就是删除指定所索引
// 的位置,然后恢复小顶堆的结构,可以看源码,就不解释了)
func dodeltimer0(pp *p) {
if t := pp.timers[0]; t.pp.ptr() != pp {
throw("dodeltimer0: wrong P")
} else {
t.pp = 0 // 这里将指针情况
}
// --- 将堆的最后一位 timer 放到堆顶,然后清空最后一位的空间,然后向下调整---
last := len(pp.timers) - 1
if last > 0 {
pp.timers[0] = pp.timers[last]
}
pp.timers[last] = nil
pp.timers = pp.timers[:last]
if last > 0 {
siftdownTimer(pp.timers, 0)//向下调整的核心部分
}
// ---------------------
updateTimer0When(pp) //更新当前 p 的最先执行 timer 的执行时间
atomic.Xadd(&pp.numTimers, -1)
}
func updateTimer0When(pp *p) {
if len(pp.timers) == 0 {
atomic.Store64(&pp.timer0When, 0)
} else {
atomic.Store64(&pp.timer0When, uint64(pp.timers[0].when))
}
}
// timer 增加的源码部分
func doaddtimer(pp *p, t *timer) {
...
if t.pp != 0 {
throw("doaddtimer: P already set in timer")
}
t.pp.set(pp)
// --- 将 timer 放置到堆的最后一位,然后向上调整 ---
i := len(pp.timers)
pp.timers = append(pp.timers, t)
siftupTimer(pp.timers, i)// 向上调整的核心部分
// ---------------------------
if t == pp.timers[0] {
atomic.Store64(&pp.timer0When, uint64(t.when))
}
atomic.Xadd(&pp.numTimers, 1)
}
Когда мы знаем, что таймеры — это структура данных малой верхней кучи (она удовлетворяет «значение текущей позиции меньше или равно значению родительской позиции», метод реализации использует массив, и следующий код может Знайте, что это небольшая верхняя куча с четырьмя зубцами, структура выглядит следующим образом) После ситуации давайте посмотрим на детали регулировки кучи вверх или вниз.
// timers 堆的向上调整
func siftupTimer(t []*timer, i int) {
...
when := t[i].when
tmp := t[i]
for i > 0 {
p := (i - 1) / 4 // 由这里可以看出,堆的节点长度是4
if when >= t[p].when {
break
}
// --- 向上进行调整,即父节点下移,当前节点上移 ---
t[i] = t[p]
i = p
//向上进行调整
}
if tmp != t[i] {
t[i] = tmp
}
}
//timers 堆的向下调整
func siftdownTimer(t []*timer, i int) {
n := len(t)
if i >= n {
badTimer()
}
when := t[i].when
tmp := t[i]
for {
// --- 以下部分就是找到当前4个节点中最小的那个值和在数组的位置 -----
c := i*4 + 1 // 这里是子节点最左边的节点
c3 := c + 2 // 这里是子节点第三个节点
if c >= n {
break
}
w := t[c].when
if c+1 < n && t[c+1].when < w {
w = t[c+1].when
c++
}
if c3 < n {
w3 := t[c3].when
if c3+1 < n && t[c3+1].when < w3 {
w3 = t[c3+1].when
c3++
}
if w3 < w {
w = w3
c = c3
}
}
//---------------------------------
if w >= when {
break
}
// --- 向下进行调整,即子节点上移,当前节点下移 ---
t[i] = t[c]
i = c
// ---------------
}
if tmp != t[i] {
t[i] = tmp
}
}
Теперь, когда известно, что таймер находится в небольшой верхней куче с четырьмя зубцами, как он работает? Далее идет основная часть записи таймера runtimer().
runtimer
// 这里执行的前提是当前 P 的 timesLock 已经锁了,所以不用担心并发问题
func runtimer(pp *p, now int64) int64 {
for {
t := pp.timers[0] //找到 timers 堆的堆顶,为最先执行的 timer
if t.pp.ptr() != pp {
throw("runtimer: bad p")
}
switch s := atomic.Load(&t.status); s {
case timerWaiting:
if t.when > now { //如果还没到时间,则返回调用的时间
return t.when
}
if !atomic.Cas(&t.status, s, timerRunning) {
continue
}
runOneTimer(pp, t, now)// 这里是执行timer的核心
return 0
case timerDeleted:
if !atomic.Cas(&t.status, s, timerRemoving) {
continue
}
dodeltimer0(pp) //删除 timers 堆顶的 timer
if !atomic.Cas(&t.status, timerRemoving, timerRemoved) {
badTimer()
}
atomic.Xadd(&pp.deletedTimers, -1)
if len(pp.timers) == 0 {
return -1
}
case timerModifiedEarlier, timerModifiedLater:
if !atomic.Cas(&t.status, s, timerMoving) {
continue
}
//删除堆顶的位置,调整 timer 到最新的时间,以及进行重新调整
t.when = t.nextwhen
dodeltimer0(pp)
doaddtimer(pp, t)
if s == timerModifiedEarlier {
atomic.Xadd(&pp.adjustTimers, -1)
}
if !atomic.Cas(&t.status, timerMoving, timerWaiting) {
badTimer()
}
case timerModifying:
osyield()
case timerNoStatus, timerRemoved:
badTimer()
case timerRunning, timerRemoving, timerMoving:
badTimer()
default:
badTimer()
}
}
}
Итак, мы знаем, что основным процессом выполнения является runOneTimer().
runOneTimer
// 由于是 runtimer 进行调用,因此也线程安全
func runOneTimer(pp *p, t *timer, now int64) {
...
f := t.f
arg := t.arg
seq := t.seq
if t.period > 0 { //如果有周期,则算出下次 timer 执行的时间,并加入到对应的位置(这里就是 Ticker 和 Timer 的区别)
delta := t.when - now
t.when += t.period * (1 + -delta/t.period)
siftdownTimer(pp.timers, 0)// 将四叉小顶堆向下调整
if !atomic.Cas(&t.status, timerRunning, timerWaiting) {
badTimer()
}
updateTimer0When(pp)//更新当前 P 的最先的 timer 的执行时间
} else {
// 从堆顶位置上删除 timer,并调整
dodeltimer0(pp)
if !atomic.Cas(&t.status, timerRunning, timerNoStatus) {
badTimer()
}
}
...
unlock(&pp.timersLock)
f(arg, seq) // 执行对应的 f,这里就是我们 Timer.C 来的地方
lock(&pp.timersLock)
...
}
Из звонка Runtimer мы знаем, что ввод выполнения - Checktimers (). Давайте подробно посмотрим на это.
checkTimers
Мы можем увидеть фигуру, видимую с следующего рисунка, это путем планирования истечения для поиска исполняемого таймера
Давайте посмотрим, что делает checkTimers
func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
if atomic.Load(&pp.adjustTimers) == 0 {// 如果没有需要可调整的,则直接返回最先执行 timer 的时间
next := int64(atomic.Load64(&pp.timer0When))
if next == 0 {
return now, 0, false
}
if now == 0 {
now = nanotime()
}
if now < next { // 表示还没有到执行时间
if pp != getg().m.p.ptr() || int(atomic.Load(&pp.deletedTimers)) <= int(atomic.Load(&pp.numTimers)/4) { //且要删除的 Timer数量小于 Timer总数的1/4
return now, next, false
}
}
}
lock(&pp.timersLock)
adjusttimers(pp)// 可以看下面的源码解析,当前 p 上的所有 timers 的状态,该删除的删了,该调整的调整
rnow = now
if len(pp.timers) > 0 {
if rnow == 0 {
rnow = nanotime()
}
for len(pp.timers) > 0 {
if tw := runtimer(pp, rnow); tw != 0 { // 通过 runtimer(可以看上面的源码解析) 开始调用
if tw > 0 {
pollUntil = tw
}
break
}
ran = true
}
}
// 如果可删除的 Timers 大于 Timer总数量的1/4,则进行删除(因为上面执行了 runtimer)
if pp == getg().m.p.ptr() && int(atomic.Load(&pp.deletedTimers)) > len(pp.timers)/4 {
clearDeletedTimers(pp)
}
unlock(&pp.timersLock)
return rnow, pollUntil, ran
}
adjusttimers
func adjusttimers(pp *p) {
if len(pp.timers) == 0 {
return
}
if atomic.Load(&pp.adjustTimers) == 0 { // 如果需要调整的 Timer 为 0,则直接返回
...
return
}
var moved []*timer
loop:
for i := 0; i < len(pp.timers); i++ {
t := pp.timers[i]
if t.pp.ptr() != pp {
throw("adjusttimers: bad p")
}
switch s := atomic.Load(&t.status); s {
case timerDeleted: // 这里就是将部分需要删除的 Timer 给清理掉
if atomic.Cas(&t.status, s, timerRemoving) {
dodeltimer(pp, i)
if !atomic.Cas(&t.status, timerRemoving, timerRemoved) {
badTimer()
}
atomic.Xadd(&pp.deletedTimers, -1)
i--
}
case timerModifiedEarlier, timerModifiedLater: // 把需要调整 Timer 放到 moved 中,然后删除当前堆的数据进行堆调整,后续将 moved 通过 addAdjustedTimers 添加
if atomic.Cas(&t.status, s, timerMoving) {
t.when = t.nextwhen
dodeltimer(pp, i)
moved = append(moved, t)
if s == timerModifiedEarlier {
if n := atomic.Xadd(&pp.adjustTimers, -1); int32(n) <= 0 {
break loop
}
}
i--
}
case timerNoStatus, timerRunning, timerRemoving, timerRemoved, timerMoving:
badTimer()
case timerWaiting:
case timerModifying:
osyield()
i--
default:
badTimer()
}
}
if len(moved) > 0 {
addAdjustedTimers(pp, moved) // 这里就是将需要调整的 timer 重新添加进来
}
...
}
addAdjustedTimers
func addAdjustedTimers(pp *p, moved []*timer) {
for _, t := range moved {
doaddtimer(pp, t)// 上文有源码解析
if !atomic.Cas(&t.status, timerMoving, timerWaiting) {
badTimer()
}
}
}
clearDeletedTimers
func clearDeletedTimers(pp *p) {
cdel := int32(0)
cearlier := int32(0)
to := 0
changedHeap := false
timers := pp.timers
nextTimer:
for _, t := range timers {
for {
switch s := atomic.Load(&t.status); s {
case timerWaiting:
if changedHeap {
timers[to] = t
siftupTimer(timers, to)
}
to++
continue nextTimer
case timerModifiedEarlier, timerModifiedLater: // 将 timer 状态调整成 timeWaiting,将其放至其正确的执行时间位置
if atomic.Cas(&t.status, s, timerMoving) {
t.when = t.nextwhen
timers[to] = t
siftupTimer(timers, to)
to++
changedHeap = true
if !atomic.Cas(&t.status, timerMoving, timerWaiting) {
badTimer()
}
if s == timerModifiedEarlier {
cearlier++
}
continue nextTimer
}
case timerDeleted: // 将 timerDeleted 转变成 timerRemoved,然后从 timers 堆中删掉(在当前函数后面可以看出)
if atomic.Cas(&t.status, s, timerRemoving) {
t.pp = 0
cdel++
if !atomic.Cas(&t.status, timerRemoving, timerRemoved) {
badTimer()
}
changedHeap = true
continue nextTimer
}
case timerModifying:
osyield()
case timerNoStatus, timerRemoved:
badTimer()
case timerRunning, timerRemoving, timerMoving:
badTimer()
default:
badTimer()
}
}
}
// 在这里对于剩余的空间 设置为 nil 操作(垃圾回收方便)
for i := to; i < len(timers); i++ {
timers[i] = nil
}
atomic.Xadd(&pp.deletedTimers, -cdel)
atomic.Xadd(&pp.numTimers, -cdel)
atomic.Xadd(&pp.adjustTimers, -cearlier)
// 在这里进行一次大清理
timers = timers[:to]
pp.timers = timers
updateTimer0When(pp)
...
}
Мы с оптимизмом смотрим на общую ситуацию с выполнением, тогда давайте посмотрим на часть исходного кода Stop().
deltimer
func (t *Ticker) Stop() {
stopTimer(&t.r)
}
func stopTimer(t *timer) bool {
return deltimer(t)
}
func deltimer(t *timer) bool {
for {
switch s := atomic.Load(&t.status); s {
case timerWaiting, timerModifiedLater: //将 timer 的 status变更为 timerDeleted ,并deletedTimers 加 1
mp := acquirem()
if atomic.Cas(&t.status, s, timerModifying) {
tpp := t.pp.ptr()
if !atomic.Cas(&t.status, timerModifying, timerDeleted) { //
badTimer()
}
releasem(mp)
atomic.Xadd(&tpp.deletedTimers, 1)
return true
} else {
releasem(mp)
}
case timerModifiedEarlier: //将 timer 的 status 变更为 timerDeleted,然后 adjustTimers 减 1,deletedTimers 加 1
mp := acquirem()
if atomic.Cas(&t.status, s, timerModifying) {
tpp := t.pp.ptr()
atomic.Xadd(&tpp.adjustTimers, -1)
if !atomic.Cas(&t.status, timerModifying, timerDeleted) {
badTimer()
}
releasem(mp)
atomic.Xadd(&tpp.deletedTimers, 1)
return true
} else {
releasem(mp)
}
case timerDeleted, timerRemoving, timerRemoved:
return false
case timerRunning, timerMoving:
osyield()
case timerNoStatus:
return false
case timerModifying:
osyield()
default:
badTimer()
}
}
}
При последующем планировании состояние Timer может быть установлено с timerDeleted на timerRemoved и удалено из кучи таймеров (обратите внимание, что здесь используется «can», вы можете увидеть диаграмму состояний выше)
При повторном использовании Timer мы часто используем Reset(), давайте посмотрим, как выглядит исходный код
modtimer
func (t *Timer) Reset(d Duration) bool {
if t.r.f == nil {
panic("time: Reset called on uninitialized Timer")
}
w := when(d)
active := stopTimer(&t.r) // 这里我们上面源码解释过了,即将当前的 timer 的 status 设置成 timerDeleted
resetTimer(&t.r, w)
return active
}
func resettimer(t *timer, when int64) {
modtimer(t, when, t.period, t.f, t.arg, t.seq)
}
func modtimer(t *timer, when, period int64, f func(interface{}, uintptr), arg interface{}, seq uintptr) {
if when < 0 {
when = maxWhen
}
status := uint32(timerNoStatus)
wasRemoved := false
var mp *m
loop:
for {
// 主要的目的就是将当前的 timer 的状态设置成 timerModifying
switch status = atomic.Load(&t.status); status {
case timerWaiting, timerModifiedEarlier, timerModifiedLater:
mp = acquirem()
if atomic.Cas(&t.status, status, timerModifying) {
break loop
}
releasem(mp)
case timerNoStatus, timerRemoved:
mp = acquirem()
if atomic.Cas(&t.status, status, timerModifying) {
wasRemoved = true
break loop
}
releasem(mp)
case timerDeleted:
mp = acquirem()
if atomic.Cas(&t.status, status, timerModifying) {
atomic.Xadd(&t.pp.ptr().deletedTimers, -1)
break loop
}
releasem(mp)
case timerRunning, timerRemoving, timerMoving:
osyield()
case timerModifying:
osyield()
default:
badTimer()
}
}
t.period = period
t.f = f
t.arg = arg
t.seq = seq
if wasRemoved { // 如果是已经被移除的,则要重新加回到 timers 中,且状态变更为 timerWaiting
t.when = when
pp := getg().m.p.ptr()
lock(&pp.timersLock)
doaddtimer(pp, t)
unlock(&pp.timersLock)
if !atomic.Cas(&t.status, timerModifying, timerWaiting) {
badTimer()
}
releasem(mp)
wakeNetPoller(when)
} else {
t.nextwhen = when
newStatus := uint32(timerModifiedLater)
if when < t.when { //判断这次新的时间是老的时间的前还是后
newStatus = timerModifiedEarlier
}
adjust := int32(0)
if status == timerModifiedEarlier {
adjust--
}
if newStatus == timerModifiedEarlier {
adjust++
}
if adjust != 0 {
atomic.Xadd(&t.pp.ptr().adjustTimers, adjust)
}
if !atomic.Cas(&t.status, timerModifying, newStatus) { // 将当前 timer 设置成 timerModifiedEarlier/timerModifiedEarlier
badTimer()
}
releasem(mp)
if newStatus == timerModifiedEarlier {
wakeNetPoller(when)
}
}
}