В прошлом году я разговаривал с Донг Да о таймерах Go.В то время я, вероятно, видел, что код представляет собой четырехзубую кучу, но детали реализации были проигнорированы.
Несколько дней назад я снова поболтал с Лю Дином и Донг Да о таймере, поэтому я просто воспользовался этой возможностью, чтобы полностью разобраться с реализацией таймера в Go. Нет сетевого опроса, потому что я не читал эту часть кода.
Упоминалась статья Донг Да о том, как совместить сеть и таймер, если интересно, можете почитать.здесь.
Не удивляйтесь, увидев одну и ту же картинку с обеих сторон, так что эту статью можно считать увязкой, ха-ха.
Таймер — очень важная функция серверной программы, особенно для систем, работающих с сетью или файловой системой, мы добавим разумный тайм-аут для каждой операции ввода-вывода, чтобы избежать утечки ресурсов в различных конкретных ситуациях. Поэтому очень важно понимать реализацию таймера.
PS: Похоже, поддержка Ghost для графов, состоящих из строк Unicode, не особенно хороша.Если вам неловко, вы можете проверить исходный текст этой статьи.здесь. Если вы вдохновлены, пожалуйста, дайте мне звезду~
таймер
Простое использование
Какие Date, Parse, ParseInLocation, не говоря уже. В основном сосредоточьтесь на следующих функциях:
связанный с тикером
func Tick(d Duration) <-chan Time
func NewTicker(d Duration) *Ticker
func (t *Ticker) Stop()
package main
import (
"fmt"
"time"
)
func main() {
for t := range time.Tick(time.Second * 2) {
fmt.Println(t, "hello world")
}
}
package main
import (
"fmt"
"time"
)
func main() {
ticker := time.NewTicker(time.Second * 2)
for {
select {
case t := <-ticker.C:
fmt.Println(t, "hello world")
}
}
}
Следует отметить, что когда тикер не используется, его следует останавливать вручную, иначе может произойти утечка таймера, например следующий код:
package main
import (
"fmt"
"time"
)
func main() {
for {
select {
case t := <-time.Tick(time.Second * 2):
fmt.Println(t, "hello world")
}
}
}
После утечки в куче времени будет накапливаться все больше и больше объектов-таймеров, вызывая все больше неприятных проблем.
связанные с таймером
func After(d Duration) <-chan Time
func NewTimer(d Duration) *Timer
func (t *Timer) Reset(d Duration) bool
func (t *Timer) Stop() bool
time.After обычно используется для управления определенным поведением, которое занимает много времени, и больше не ждет после тайм-аута, так что поведение программы можно ожидать. Если вы не отмените освобождение ресурсов по тайм-ауту, локальные ресурсы могут накапливаться из-за медленного ответа проверяющей стороны, например, fd, количество подключений, использование памяти и т. д. что приводит к простою службы.
Здесь функция, которая блокирует чтение канала навсегда, используется для имитации долговременного поведения, и он выходит из выбора после тайм-аута.
package main
import "time"
func main() {
var ch chan int
select {
case <-time.After(time.Second):
println("time out, and end")
case <-ch:
}
}
time.After отличается от time.Tick, он срабатывает один раз, а сам таймер будет удален из кучи времени после его срабатывания. Так что обычно используйте напрямую<-time.After
Проблем нет, но будьте осторожны при использовании цикла for:
package main
import "time"
func main() {
var ch = make(chan int)
go func() {
for {
ch <- 1
}
}()
for {
select {
case <-time.After(time.Second):
println("time out, and end")
case <-ch:
}
}
}
В приведенном выше коде время выполнения time.AfterБудет назначен новый таймер. Таким образом, за короткий промежуток времени будет создано большое количество бесполезных таймеров, и хотя бесполезные таймеры исчезнут после срабатывания, такой способ записи приведет к бессмысленной трате ресурсов процессора. Правильный способ написания должен повторно использовать таймер следующим образом:
package main
import "time"
func main() {
var ch = make(chan int)
go func() {
for {
ch <- 1
}
}()
timer := time.NewTimer(time.Second)
for {
timer.Reset(time.Second)
select {
case <-timer.C:
println("time out, and end")
case <-ch:
}
}
}
Как и в случае с Тикером, если предыдущий таймер бесполезен, его можно остановить вручную, чтобы удалить таймер из кучи времени.
Анализ исходного кода
структура данных
+--------+
| timers |
+----+---++----+----+----+----+----+-----------------------+----+
| | | | | | | | | |
| 0 | 1 | 2 | 3 | 4 | 5 | 6 | ... | 63 |
+----+----+----+----+----+----+----+-----------------------+----+
| |
| |
| |
| |
| |
| | | | | |
| | +--------------------+ | | | +--------------------+ |
| |--------> | cacheline size * N | <--------+ | |--------> | cacheline size * N | <--------+
| | +--------------------+ | | | +--------------------+ |
| +-------------+-----------------+----------+ | +-------------+-----------------+----------+
+>|timersBucket | | | +--->|timersBucket | | |
+-------------+-----------------+ pad | +-------------+-----------------+ pad |
| lock mutex | | | lock mutex | |
+-------------------------------+ | +-------------------------------+ |
| gp *g | | | gp *g | |
+-------------------------------+ | +-------------------------------+ |
| created bool | | | created bool | |
+-------------------------------+ | ........ +-------------------------------+ |
| sleeping bool | | | sleeping bool | |
+-------------------------------+ | +-------------------------------+ |
| rescheduling bool | | | rescheduling bool | |
+-------------------------------+ | +-------------------------------+ |
| sleepUntil int64 | | | sleepUntil int64 | |
+-------------------------------+ | +-------------------------------+ |
| waitnote note | | | waitnote note | |
+-------------------------------+ | +-------------------------------+ |
| t []*timer | | | t []*timer | |
+-------------------------------+----------+ +-------------------------------+----------+
|
|
|
v
+---+
| 0 |
+---+
|
|
|
|
v
+---+---+---+---+
| 1 | 2 | 3 | 4 |
+-+-+-+-+-+-+-+-+
+---------------------------------+ | | +------------------------------------------+
| +------+ +--------+ |
v | | v
+---+---+---+---+ v v +---+---+---+---+
| | | | | +---+---+---+---+ +---+---+---+---+ | | | | |
+-+-+-+-+---+---+ | | | | | | | | | | +---+---+-+-+-+-+
| | +---+---+---+---+ +---+---+---+---+ | |
+---------+ +---------+ +---------+ +-----+
| | | |
| | | |
v v v v
+---+---+---+---+ +---+---+---+---+ +---+---+---+---+ +---+---+---+---+
| | | | | | | | | | ................. | | | | | | | | | |
+---+---+---+---+ +---+---+---+---+ +---+---+---+---+ +---+---+---+---+
В сочетании с приведенным выше рисунком можно увидеть следующие выводы.
существуетruntime/time.go
Массив таймеров определен в:
var timers [timersLen]struct {
timersBucket
pad [sys.CacheLineSize - unsafe.Sizeof(timersBucket{})%sys.CacheLineSize]byte
}
В ранней реализации Go есть глобальный таймер, но работа кучи глобального таймера должна быть заблокирована, поэтому многоядерность обнажит проблему низкой производительности из-за конфликта блокировок. Начиная с определенной версии (ну, я не знаю, какой именно, наверное, 1.10) таймеры Go были изменены для реализации этой множественной кучи времени, которая в настоящее время жестко запрограммирована на 64 во время выполнения:
const timersLen = 64
Ну официал сказал, что если он равен GOMAXPROCS, то эти временные кучи будут перераспределяться и модифицироваться во время procresize. Мертвая запись в 64 — это компромисс между использованием памяти и производительностью. Если GOMAXPROCS больше 64, то несколько P могут совместно использовать одну и ту же кучу времени. Конечно, в реальных сценариях я не видел ЦП с более чем 64 ядрами.
Элемент массива timers представляет собой анонимную структуру, содержащую два члена: timersBucket и pad, который используется для заполнения структуры целым числом, кратным кэш-линии, чтобы избежать ложного разделения между разными Ps. Это чаще встречается в сценариях многоядерного программирования. Структура timerBucket:
//go:notinheap
type timersBucket struct {
lock mutex
gp *g
created bool
sleeping bool
rescheduling bool
sleepUntil int64
waitnote note
t []*timer
}
Среди них t — это наша временная куча, но она немного отличается от нашей традиционной структуры кучи, она разделена на четыре форка, такой дизайн я вижу впервые. Здесь также есть специальное примечание для timersBucket.go:notinheap
, официальное описание:
go:notinheap applies to type declarations. It indicates that a type must never be allocated from the GC'd heap. Specifically, pointers to this type must always fail the runtime.inheap check. The type may be used for global variables, for stack variables, or for objects in unmanaged memory (e.g., allocated with sysAlloc, persistentalloc, fixalloc, or from a manually-managed span). Specifically:
- new(T), make([]T), append([]T, ...) and implicit heap allocation of T are disallowed. (Though implicit allocations are disallowed in the runtime anyway.)
- A pointer to a regular type (other than unsafe.Pointer) cannot be converted to a pointer to a go:notinheap type, even if they have the same underlying type.
- Any type that contains a go:notinheap type is itself go:notinheap. Structs and arrays are go:notinheap if their elements are. Maps and channels of go:notinheap types are disallowed. To keep things explicit, any type declaration where the type is implicitly go:notinheap must be explicitly marked go:notinheap as well.
- Write barriers on pointers to go:notinheap types can be omitted.
The last point is the real benefit of go:notinheap. The runtime uses it for low-level internal structures to avoid memory barriers in the scheduler and the memory allocator where they are illegal or simply inefficient. This mechanism is reasonably safe and does not compromise the readability of the runtime.
Ну, просто познакомьтесь с ним, он в основном не используется в пользовательском коде.
Свойства малой верхней кучи с четырьмя ответвлениями
Высота четырехзубцовой сваи меньше, чем у двузубой. Все (до 4) дочерние узлы узла больше, чем этот узел. Родительский узел узла (только один) должен быть меньше текущего узла. Вот типичный четырехъядерный стек с заполненными значениями:
+-----+
| |
| 0 |
+-----+
|
|
|
v
+-----+-----+-----+-----+
| | | | |
| 3 | 2 | 2 | 10 |
+-----+-----+-----+-----+
| | | |
| | | |
+----------+ | | | |
+----------------+ 4*i+1 +-----------------------+ | | +-----------------------------+
| +----------+ +-------------------+ +---+ |
| | | |
| | | |
v | | v
+-----+-----+-----+-----+ | | +-----+-----+-----+-----+
| | | | | v v | | | | |
| 20 | 4 | 5 | 13 | +-----+-----+-----+-----+ +-----+-----+-----+-----+ | 99 | 13 | 11 | 12 |
+-----+-----+-----+-----+ | | | | | | | | | | +-----+-----+-----+-----+
| 12 | 14 | 15 | 16 | | 3 | 10 | 3 | 3 |
+-----+-----+-----+-----+ +-----+-----+-----+-----+
Как и в случае с двоичной кучей, единственным требованием к узлу является отношение размера с его родительским узлом и дочерними узлами. Между соседними узлами нет связи.
вставка временной кучи
// 分配 timerBucket
// 加锁,添加 timer 进时间堆
func addtimer(t *timer) {
tb := t.assignBucket()
lock(&tb.lock)
tb.addtimerLocked(t)
unlock(&tb.lock)
}
// 太简单了,就是用 g.m.p 的 id 模 64
// 然后分配对应的 timerBucket
func (t *timer) assignBucket() *timersBucket {
id := uint8(getg().m.p.ptr().id) % timersLen
t.tb = &timers[id].timersBucket
return t.tb
}
// 向时间堆中添加一个 timer,如果时间堆第一次被初始化或者当前的 timer 比之前所有的 timers 都要早,那么就启动(首次初始化)或唤醒(最早的 timer) timerproc
// 函数内假设外部已经对 timers 数组加锁了
func (tb *timersBucket) addtimerLocked(t *timer) {
// when 必须大于 0,否则会在计算 delta 的时候溢出并导致其它的 runtime timer 永远没法过期
if t.when < 0 {
t.when = 1<<63 - 1
}
t.i = len(tb.t)
tb.t = append(tb.t, t)
siftupTimer(tb.t, t.i)
if t.i == 0 {
// 新插入的 timer 比之前所有的都要早
// 向上调整堆
if tb.sleeping {
// 修改 timerBucket 的 sleep 状态
tb.sleeping = false
// 唤醒 timerproc
// 使 timerproc 中的 for 循环不再阻塞在 notesleepg 上
notewakeup(&tb.waitnote)
}
// 同一个 P 上的所有 timers 如果
// 都在 timerproc 中被弹出了
// 该 rescheduling 会被标记为 true
// 并且启动 timerproc 的 goroutine 会被 goparkunlock
if tb.rescheduling {
// 该标记会在这里和 timejumpLocked 中被设置为 false
tb.rescheduling = false
goready(tb.gp, 0)
}
}
// 如果 timerBucket 是第一次创建,需要启动一个 goroutine
// 来循环弹出时间堆,内部会根据需要最早触发的 timer
// 进行相应时间的 sleep
if !tb.created {
tb.created = true
go timerproc(tb)
}
}
При вставке таймера в кучу логика заключается в том, чтобы сначала добавить в конец массива (добавить), а затем отрегулировать (просеять) кучу вверх, чтобы восстановить свойство четверной малой верхней кучи.
куча времени удалить
// 从堆中删除 timer t
// 如果 timerproc 提前被唤醒也没所谓
func deltimer(t *timer) bool {
if t.tb == nil {
// t.tb can be nil if the user created a timer
// directly, without invoking startTimer e.g
// time.Ticker{C: c}
// In this case, return early without any deletion.
// See Issue 21874.
return false
}
tb := t.tb
lock(&tb.lock)
// t may not be registered anymore and may have
// a bogus i (typically 0, if generated by Go).
// Verify it before proceeding.
i := t.i
last := len(tb.t) - 1
if i < 0 || i > last || tb.t[i] != t {
unlock(&tb.lock)
return false
}
// 把 timer[i] 替换为 timer[last]
if i != last {
tb.t[i] = tb.t[last]
tb.t[i].i = i
}
// 删除 timer[last],并缩小 slice
tb.t[last] = nil
tb.t = tb.t[:last]
// 判断是不是删的最后一个
// 如果不是的话,需要重新调整堆
if i != last {
// 最后一个节点当前来的分叉可能并不是它那个分叉
// 所以向上走或者向下走都是有可能的
// 即使是二叉堆,也是有这种可能的
siftupTimer(tb.t, i)
siftdownTimer(tb.t, i)
}
unlock(&tb.lock)
return true
}
триггеры таймера
// timerproc 负责处理时间驱动的事件
// 在堆中的下一个事件需要触发之前,会一直保持 sleep 状态
// 如果 addtimer 插入了一个更早的事件,会提前唤醒 timerproc
func timerproc(tb *timersBucket) {
tb.gp = getg()
for {
// timerBucket 的局部大锁
lock(&tb.lock)
// 被唤醒,所以修改 sleeping 状态为 false
tb.sleeping = false
// 计时
now := nanotime()
delta := int64(-1)
// 在处理完到期的 timer 之前,一直循环
for {
// 如果 timer 已经都弹出了
// 那么不用循环了,跳出后接着睡觉
if len(tb.t) == 0 {
delta = -1
break
}
// 取小顶堆顶部元素
// 即最近会被触发的 timer
t := tb.t[0]
delta = t.when - now // 还差多长时间才需要触发最近的 timer
if delta > 0 {
// 大于 0 说明这个 timer 还没到需要触发的时间
// 跳出循环去睡觉
break
}
if t.period > 0 {
// 这个 timer 还会留在堆里
// 不过要调整它的下次触发时间
t.when += t.period * (1 + -delta/t.period)
siftdownTimer(tb.t, 0)
} else {
// 从堆中移除这个 timer
// 用最后一个 timer 覆盖第 0 个 timer
// 然后向下调整堆
last := len(tb.t) - 1
if last > 0 {
tb.t[0] = tb.t[last]
tb.t[0].i = 0
}
tb.t[last] = nil
tb.t = tb.t[:last]
if last > 0 {
siftdownTimer(tb.t, 0)
}
t.i = -1 // 标记 timer 在堆中的位置已经没有了
}
// timer 触发时需要调用的函数
f := t.f
arg := t.arg
seq := t.seq
unlock(&tb.lock)
// 调用需触发的函数
f(arg, seq)
// 把锁加回来,如果下次 break 了内层的 for 循环
// 能保证 timeBucket 是被锁住的
// 然后在下面的 goparkunlock 中被解锁
lock(&tb.lock)
}
if delta < 0 || faketime > 0 {
// 说明时间堆里已经没有 timer 了
// 让 goroutine 挂起,去睡觉
tb.rescheduling = true
goparkunlock(&tb.lock, "timer goroutine (idle)", traceEvGoBlock, 1)
continue
}
// 说明堆里至少还有一个以上的 timer
// 睡到最近的 timer 时间
tb.sleeping = true
tb.sleepUntil = now + delta
noteclear(&tb.waitnote)
unlock(&tb.lock)
// 内部是 futex sleep
// 时间睡到了会自动醒
// 或者 addtimer 的时候,发现新的 timer 更早,会提前唤醒
notetsleepg(&tb.waitnote, delta)
}
}
Вы можете обратить внимание на период здесь Таймер с периодом начнется с того, когда и будет срабатывать снова каждый период периода.
when+period
when+period*3
|
| |
| |
| when+period*2|
when | |
| | |
| | | | .....
| | | |
+-----------+ | | | |
| timeline +------------------------+-------+-------+-------+----------------->
+-----------+ | | | |
v v v v
trigger trigger
trigger trigger
настройка кучи времени
В предыдущем коде также было видно, что корректировка временной кучи имеет два метода корректировки: корректировка вверх и корректировка вниз.
приспособиться
func siftupTimer(t []*timer, i int) {
// 先暂存当前刚插入到数组尾部的节点
when := t[i].when
tmp := t[i]
// 从当前插入节点的父节点开始
// 如果最新插入的那个节点的触发时间要比父节点的触发时间更早
// 那么就把这个父节点下移
for i > 0 {
p := (i - 1) / 4 // parent
if when >= t[p].when {
break
}
t[i] = t[p]
t[i].i = i
i = p
}
// 如果发生过移动,用最新插入的节点
// 覆盖掉最后一个下移的父节点
if tmp != t[i] {
t[i] = tmp
t[i].i = i
}
}
приспособиться
func siftdownTimer(t []*timer, i int) {
n := len(t)
when := t[i].when
tmp := t[i]
for {
c := i*4 + 1 // 最左孩子节点
c3 := c + 2 // 第三个孩子节点
if c >= n {
break
}
w := t[c].when
if c+1 < n && t[c+1].when < w {
w = t[c+1].when
c++
}
if c3 < n {
w3 := t[c3].when
if c3+1 < n && t[c3+1].when < w3 {
w3 = t[c3+1].when
c3++
}
if w3 < w {
w = w3
c = c3
}
}
if w >= when {
break
}
t[i] = t[c]
t[i].i = i
i = c
}
if tmp != t[i] {
t[i] = tmp
t[i].i = i
}
}
Этот код действительно не элегантен, на самом деле он заключается в том, чтобы сначала найти наименьший среди всех дочерних узлов Если наименьший из них больше, чем текущий узел, который нужно переместить вниз, то сломать. Наоборот, переместите наименьший узел вверх, а затем оцените, больше ли четыре дочерних узла этого наименьшего узла, чем узел, который нужно переместить вниз. И так далее. Используйте схему, чтобы смоделировать этот процесс:
| +---+
| |*5 |
| +---+
| |
| +-----+
| v
| +---+---+---+---+
| | 7 | 3 |*2 | 6 |
| +---+---+---+---+
+-------------------+ | |
| siftdownTimer | | +----------+
+-------------------+ | v
.---------. | +---+---+---+---+
( before ) | | 4 | 5 | 9 |*3 |
`---------' | +---+---+---+---+
| |
| +-------------+
| v
| +---+---+---+---+
| | 6 | 6 | 6 |*4 |
| +---+---+---+---+
|
v
| +---+
| |*2 |
| +---+
| |
| +-----+
| v
| +---+---+---+---+
| | 7 | 3 |*3 | 6 |
| +---+---+---+---+
+-------------------+ | |
| siftdownTimer | | +----------+
+-------------------+ | v
.---------. | +---+---+---+---+
( after ) | | 4 | 5 | 9 |*4 |
`---------' | +---+---+---+---+
| |
| +-------------+
| v
| +---+---+---+---+
| | 6 | 6 | 6 |*5 |
| +---+---+---+---+
|
v
обработать
таймер.После процесса
func After(d Duration) <-chan Time {
return NewTimer(d).C
}
// NewTimer creates a new Timer that will send
// the current time on its channel after at least duration d.
func NewTimer(d Duration) *Timer {
c := make(chan Time, 1)
t := &Timer{
C: c,
r: runtimeTimer{
when: when(d),
f: sendTime,
arg: c,
},
}
startTimer(&t.r)
return t
}
func startTimer(*runtimeTimer)
Реализация этого startTimer находится вruntime/time.go
внутри:
// startTimer adds t to the timer heap.
// 把 t 添加到 timer 堆
//go:linkname startTimer time.startTimer
func startTimer(t *timer) {
addtimer(t)
}
Процесс, лежащий в основе addtimer, уже был замечен ранее.
timer.Tick процесс
func Tick(d Duration) <-chan Time {
if d <= 0 {
return nil
}
return NewTicker(d).C
}
// NewTicker 会返回一个 Ticker 对象,其 channel 每隔 period 时间
// 会收到一个时间值
// 如果 receiver 接收慢了,Ticker 会把不需要的 tick drop 掉
// d 必须比 0 大,否则 panic
// Stop ticker 才能释放相关的资源
func NewTicker(d Duration) *Ticker {
if d <= 0 {
panic(errors.New("non-positive interval for NewTicker"))
}
c := make(chan Time, 1)
t := &Ticker{
C: c,
r: runtimeTimer{
when: when(d),
period: int64(d),
f: sendTime,
arg: c,
},
}
startTimer(&t.r)
return t
}
Видно, что элементы r Тикера и Таймера находятся только в поле периода, Тот, который отправляет данные в канал через каждый второй период, — это Тикер, а тот, который срабатывает и исчезает, — это Таймер.
Остановить процесс
func (t *Ticker) Stop() {
stopTimer(&t.r)
}
func (t *Timer) Stop() bool {
if t.r.f == nil {
panic("time: Stop called on uninitialized Timer")
}
return stopTimer(&t.r)
}
И Timer, и Ticker вызывают stopTimer.
func stopTimer(t *timer) bool {
return deltimer(t)
}
deltimer также был замечен выше.
Процесс сброса
func (t *Timer) Reset(d Duration) bool {
if t.r.f == nil {
panic("time: Reset called on uninitialized Timer")
}
w := when(d)
active := stopTimer(&t.r)
t.r.when = w
startTimer(&t.r)
return active
}
Это все функции, которые я видел, ничего особенного.
В конце концов
В этой статье в основном рассказывается о реализации таймера в Go.В отрасли существует не одна реализация таймера. Если вы хотите узнать, как таймеры реализованы в других системах, таких как nginx, вы можете обратиться кВот этот.