Go使用TimerController解决timer过多的问题
作者:AnthonyDong
多路复用,实际上Go底层也是一种多路复用的思想去实现的timer,但是它是底层的timer,我们需要解决的问题就过多的timer问题!本文给大家介绍了Go使用TimerController解决timer过多的问题,需要的朋友可以参考下
背景
- 在Go里面我们实现超时需要起一个goroutine才能实现,但是当我有大量的任务需要做超时控制就需要起大量的goroutine,实际上是一种开销和负担!
- 有些时候需要注册一些Timer也是有需要起大量的 goroutine才能实现,比如我要异步定期刷新一个配置,异步的监听啥东西,此时简单做法就是使用大量的 goroutine + timer/sleep实现!
解决思路
多路复用,实际上Go底层也是一种多路复用的思想去实现的timer,但是它是底层的timer,我们需要解决的问题就过多的timer问题!
我们的思路是实现一个 TimerController 可以帮助我们管理很多个timer,并且可以开销做到最低!因此使用一个 小顶堆 + Timer调度器即可实现!
实现
小顶堆(最小堆)
使用Go自带的 container/heap
实现 小顶堆
import ( "container/heap" ) type HeapItem[T any] interface { Less(HeapItem[T]) bool GetValue() T } // 参考 IntHeap type heapQueue[T any] []HeapItem[T] func (h heapQueue[T]) Len() int { return len(h) } func (h heapQueue[T]) Less(i, j int) bool { return h[i].Less(h[j]) } func (h heapQueue[T]) Swap(i, j int) { h[i], h[j] = h[j], h[i] } func (h *heapQueue[T]) Push(x any) { // Push and Pop use pointer receivers because they modify the slice's length, // not just its contents. *h = append(*h, x.(HeapItem[T])) } func (h *heapQueue[T]) Pop() any { old := *h n := len(old) x := old[n-1] *h = old[0 : n-1] return x } type HeapQueue[T any] struct { queue heapQueue[T] } func (h *HeapQueue[T]) ptr() *heapQueue[T] { return &h.queue } // NewHeapQueue 非并发安全 func NewHeapQueue[T any](items ...HeapItem[T]) *HeapQueue[T] { queue := make(heapQueue[T], len(items)) for index, item := range items { queue[index] = item } heap.Init(&queue) return &HeapQueue[T]{queue: queue} } func (h *HeapQueue[T]) Push(item HeapItem[T]) { heap.Push(h.ptr(), item) } func (h *HeapQueue[T]) Pop() (T, bool ) { if h.ptr().Len() == 0 { var Nil T return Nil, false } return heap.Pop(h.ptr()).(HeapItem[T]).GetValue(), true } // Peek 方法用于返回堆顶元素而不移除它 func (h *HeapQueue[T]) Peek() (T, bool ) { if h.ptr().Len() > 0 { return h.queue[0].GetValue(), true } var Nil T return Nil, false } func (h *HeapQueue[T]) Len() int { return h.ptr().Len() }
调度器
type Timer struct { Timeout time.Time Name string NotifyFunc func() } func (t *Timer) GetCurTimeout() time.Duration { return t.Timeout.Sub(time.Now()) } // Notify todo support async notify func (t *Timer) Notify() { if t.NotifyFunc != nil { t.NotifyFunc() } } func (t *Timer) IsExpired() bool { return t.Timeout.Before(time.Now()) } func (t *Timer) Less(v HeapItem[*Timer]) bool { return t.Timeout.Before(v.GetValue().Timeout) } func (t *Timer) GetValue() *Timer { return t } type TimerController struct { timers chan *Timer minHeap *HeapQueue[*Timer] closeOnce sync.Once close chan struct{} } func (t *TimerController) AddTimer(timer *Timer) bool { if timer == nil { return false } select { case <-t.close: return false default: t.timers <- timer return true } } func (t *TimerController) Close() { t.closeOnce.Do(func() { close(t.close) }) } func NewTimerController(bufferSize int) *TimerController { return &TimerController{ timers: make(chan *Timer, bufferSize), minHeap: NewHeapQueue[*Timer](), close: make(chan struct{}), } } func (t *TimerController) Start() { go t._start() } func (t *TimerController) _start() { const defaultTimeout = time.Hour * 24 var ( curMinTimer *Timer timeout = time.NewTimer(defaultTimeout) ) for { select { case <-t.close: close(t.timers) timeout.Stop() return case timer := <-t.timers: t.minHeap.Push(timer) curMinTimer, _ = t.minHeap.Peek() timeout.Reset(curMinTimer.GetCurTimeout()) //fmt.Printf("timeout.Reset-1 name: %s, timeout: %s\n", curMinTimer.Name, curMinTimer.GetCurTimeout()) case <-timeout.C: if curMinTimer != nil { curMinTimer.Notify() curMinTimer = nil t.minHeap.Pop() } curMinTimer, _ = t.minHeap.Peek() if curMinTimer == nil { timeout.Reset(defaultTimeout) continue } timeout.Reset(curMinTimer.GetCurTimeout()) //fmt.Printf("timeout.Reset-2 name: %s, timeout: %s\n", curMinTimer.Name, curMinTimer.GetCurTimeout()) } } }
测试
func TestTimerController(t *testing.T) { controller := NewTimerController(1024) controller.Start() defer controller.Close() now := time.Now() arrs := make([]string, 0) NewTimer := func(num int) *Timer { return &Timer{Timeout: now.Add(time.Duration(num) * time.Millisecond), Name: strconv.Itoa(num), NotifyFunc: func() { arrs = append(arrs, strconv.Itoa(num)) }} } // 这里乱序的注册了8个timer controller.AddTimer(NewTimer(5)) controller.AddTimer(NewTimer(6)) controller.AddTimer(NewTimer(3)) controller.AddTimer(NewTimer(4)) controller.AddTimer(NewTimer(7)) controller.AddTimer(NewTimer(8)) controller.AddTimer(NewTimer(1)) controller.AddTimer(NewTimer(2)) time.Sleep(time.Second * 1) t.Logf("%#v\n", arrs) // 最终我们可以获取到 顺序执行的! assert.Equal(t, arrs, []string{"1", "2", "3", "4", "5", "6", "7", "8"}) } func TestTimerController_Stable(t *testing.T) { controller := NewTimerController(1024) controller.Start() defer controller.Close() now := time.Now() arrs := make(map[string]bool, 0) NewTimer := func(num int, name string) *Timer { return &Timer{Timeout: now.Add(time.Duration(num) * time.Millisecond), Name: name, NotifyFunc: func() { arrs[name] = true }} } // 我们重复注册了相同实现执行的 timer,那么预期是每次执行的结果和注册顺序一致 controller.AddTimer(NewTimer(2, "1")) controller.AddTimer(NewTimer(2, "2")) controller.AddTimer(NewTimer(2, "3")) controller.AddTimer(NewTimer(2, "4")) controller.AddTimer(NewTimer(2, "5")) time.Sleep(time.Second * 1) t.Logf("%#v\n", arrs) assert.Equal(t, arrs, map[string]bool{"1": true, "2": true, "3": true, "4": true, "5": true}) }
以上就是Go使用TimerController解决timer过多的问题的详细内容,更多关于Go TimerController解决timer过多的资料请关注脚本之家其它相关文章!