Go使用TimerController解决timer过多的问题
作者:AnthonyDong
多路复用,实际上Go底层也是一种多路复用的思想去实现的timer,但是它是底层的timer,我们需要解决的问题就过多的timer问题!本文给大家介绍了Go使用TimerController解决timer过多的问题,需要的朋友可以参考下
背景
- 在Go里面我们实现超时需要起一个goroutine才能实现,但是当我有大量的任务需要做超时控制就需要起大量的goroutine,实际上是一种开销和负担!
- 有些时候需要注册一些Timer也是有需要起大量的 goroutine才能实现,比如我要异步定期刷新一个配置,异步的监听啥东西,此时简单做法就是使用大量的 goroutine + timer/sleep实现!
解决思路
多路复用,实际上Go底层也是一种多路复用的思想去实现的timer,但是它是底层的timer,我们需要解决的问题就过多的timer问题!
我们的思路是实现一个 TimerController 可以帮助我们管理很多个timer,并且可以开销做到最低!因此使用一个 小顶堆 + Timer调度器即可实现!
实现
小顶堆(最小堆)
使用Go自带的 container/heap 实现 小顶堆
import (
"container/heap"
)
type HeapItem[T any] interface {
Less(HeapItem[T]) bool
GetValue() T
}
// 参考 IntHeap
type heapQueue[T any] []HeapItem[T]
func (h heapQueue[T]) Len() int { return len(h) }
func (h heapQueue[T]) Less(i, j int) bool { return h[i].Less(h[j]) }
func (h heapQueue[T]) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *heapQueue[T]) Push(x any) {
// Push and Pop use pointer receivers because they modify the slice's length,
// not just its contents.
*h = append(*h, x.(HeapItem[T]))
}
func (h *heapQueue[T]) Pop() any {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
type HeapQueue[T any] struct {
queue heapQueue[T]
}
func (h *HeapQueue[T]) ptr() *heapQueue[T] {
return &h.queue
}
// NewHeapQueue 非并发安全
func NewHeapQueue[T any](items ...HeapItem[T]) *HeapQueue[T] {
queue := make(heapQueue[T], len(items))
for index, item := range items {
queue[index] = item
}
heap.Init(&queue)
return &HeapQueue[T]{queue: queue}
}
func (h *HeapQueue[T]) Push(item HeapItem[T]) {
heap.Push(h.ptr(), item)
}
func (h *HeapQueue[T]) Pop() (T, bool ) {
if h.ptr().Len() == 0 {
var Nil T
return Nil, false
}
return heap.Pop(h.ptr()).(HeapItem[T]).GetValue(), true
}
// Peek 方法用于返回堆顶元素而不移除它
func (h *HeapQueue[T]) Peek() (T, bool ) {
if h.ptr().Len() > 0 {
return h.queue[0].GetValue(), true
}
var Nil T
return Nil, false
}
func (h *HeapQueue[T]) Len() int {
return h.ptr().Len()
}
调度器
type Timer struct {
Timeout time.Time
Name string
NotifyFunc func()
}
func (t *Timer) GetCurTimeout() time.Duration {
return t.Timeout.Sub(time.Now())
}
// Notify todo support async notify
func (t *Timer) Notify() {
if t.NotifyFunc != nil {
t.NotifyFunc()
}
}
func (t *Timer) IsExpired() bool {
return t.Timeout.Before(time.Now())
}
func (t *Timer) Less(v HeapItem[*Timer]) bool {
return t.Timeout.Before(v.GetValue().Timeout)
}
func (t *Timer) GetValue() *Timer {
return t
}
type TimerController struct {
timers chan *Timer
minHeap *HeapQueue[*Timer]
closeOnce sync.Once
close chan struct{}
}
func (t *TimerController) AddTimer(timer *Timer) bool {
if timer == nil {
return false
}
select {
case <-t.close:
return false
default:
t.timers <- timer
return true
}
}
func (t *TimerController) Close() {
t.closeOnce.Do(func() { close(t.close) })
}
func NewTimerController(bufferSize int) *TimerController {
return &TimerController{
timers: make(chan *Timer, bufferSize),
minHeap: NewHeapQueue[*Timer](),
close: make(chan struct{}),
}
}
func (t *TimerController) Start() {
go t._start()
}
func (t *TimerController) _start() {
const defaultTimeout = time.Hour * 24
var (
curMinTimer *Timer
timeout = time.NewTimer(defaultTimeout)
)
for {
select {
case <-t.close:
close(t.timers)
timeout.Stop()
return
case timer := <-t.timers:
t.minHeap.Push(timer)
curMinTimer, _ = t.minHeap.Peek()
timeout.Reset(curMinTimer.GetCurTimeout())
//fmt.Printf("timeout.Reset-1 name: %s, timeout: %s\n", curMinTimer.Name, curMinTimer.GetCurTimeout())
case <-timeout.C:
if curMinTimer != nil {
curMinTimer.Notify()
curMinTimer = nil
t.minHeap.Pop()
}
curMinTimer, _ = t.minHeap.Peek()
if curMinTimer == nil {
timeout.Reset(defaultTimeout)
continue
}
timeout.Reset(curMinTimer.GetCurTimeout())
//fmt.Printf("timeout.Reset-2 name: %s, timeout: %s\n", curMinTimer.Name, curMinTimer.GetCurTimeout())
}
}
}
测试
func TestTimerController(t *testing.T) {
controller := NewTimerController(1024)
controller.Start()
defer controller.Close()
now := time.Now()
arrs := make([]string, 0)
NewTimer := func(num int) *Timer {
return &Timer{Timeout: now.Add(time.Duration(num) * time.Millisecond), Name: strconv.Itoa(num), NotifyFunc: func() {
arrs = append(arrs, strconv.Itoa(num))
}}
}
// 这里乱序的注册了8个timer
controller.AddTimer(NewTimer(5))
controller.AddTimer(NewTimer(6))
controller.AddTimer(NewTimer(3))
controller.AddTimer(NewTimer(4))
controller.AddTimer(NewTimer(7))
controller.AddTimer(NewTimer(8))
controller.AddTimer(NewTimer(1))
controller.AddTimer(NewTimer(2))
time.Sleep(time.Second * 1)
t.Logf("%#v\n", arrs)
// 最终我们可以获取到 顺序执行的!
assert.Equal(t, arrs, []string{"1", "2", "3", "4", "5", "6", "7", "8"})
}
func TestTimerController_Stable(t *testing.T) {
controller := NewTimerController(1024)
controller.Start()
defer controller.Close()
now := time.Now()
arrs := make(map[string]bool, 0)
NewTimer := func(num int, name string) *Timer {
return &Timer{Timeout: now.Add(time.Duration(num) * time.Millisecond), Name: name, NotifyFunc: func() {
arrs[name] = true
}}
}
// 我们重复注册了相同实现执行的 timer,那么预期是每次执行的结果和注册顺序一致
controller.AddTimer(NewTimer(2, "1"))
controller.AddTimer(NewTimer(2, "2"))
controller.AddTimer(NewTimer(2, "3"))
controller.AddTimer(NewTimer(2, "4"))
controller.AddTimer(NewTimer(2, "5"))
time.Sleep(time.Second * 1)
t.Logf("%#v\n", arrs)
assert.Equal(t, arrs, map[string]bool{"1": true, "2": true, "3": true, "4": true, "5": true})
}
以上就是Go使用TimerController解决timer过多的问题的详细内容,更多关于Go TimerController解决timer过多的资料请关注脚本之家其它相关文章!
