Golang

关注公众号 jb51net

关闭
首页 > 脚本专栏 > Golang > Golang延迟队列

基于Golang实现延迟队列(DelayQueue)

作者:jiaxwu

延迟队列是一种特殊的队列,元素入队时需要指定到期时间(或延迟时间),从队头出队的元素必须是已经到期的。本文将用Golang实现延迟队列,感兴趣的可以了解下

背景

延迟队列是一种特殊的队列,元素入队时需要指定到期时间(或延迟时间),从队头出队的元素必须是已经到期的,而且最先到期的元素最先出队,也就是队列里面的元素是按照到期时间排序的,添加元素和从队头出队的时间复杂度是O(log(n))。

由于以上性质,延迟队列一般可以用于以下场景(定时任务、延迟任务):

其实在Golang中是自带定时器的,也就是time.After()time.AfterFunc()等函数,它们的性能也是非常好的,随着Golang版本升级还会优化。但是对于某些场景来说确实不够方便,比如缓存场景我们需要能够支持随机删除定时器,随机重置过期时间,更加灵活的删除一小批过期元素。而且像Kafka的时间轮算法(TimeWheel)里面也用到了延迟队列,因此还是有必要了解下如何实现延迟队列。

原理

延迟队列每次出队的是最小到期时间的元素,而堆就是用来获取最值的数据结构。使用堆我们可以实现O(log(n))时间复杂度添加元素和移除最小到期时间元素。

随机删除

有时候延迟队列还需要具有随机删除元素的能力,可以通过以下方式实现:

重置元素到期时间

如果需要重置延迟队列里面元素的到期时间,则必须知道元素在堆中的下标,因为重置到期时间之后必须对堆进行调整,因此只能是元素添加堆中下标字段

Golang实现

这里我们实现一个最简单的延迟队列,也就是不支持随机删除元素和重置元素的到期时间,因为有些场景只需要添加元素和获取到期元素这两个功能,比如Kafka中的时间轮,而且这种简单实现性能会高一点。

代码地址

数据结构

主要的结构可以看到就是一个heap,Entry是每个元素在堆中的表示,Value是具体的元素值,Expired是为了堆中元素根据到期时间排序。

mutex是一个互斥锁,主要是保证操作并发安全。

wakeup是一个缓冲区长度为1的通道,通过它实现添加元素的时候唤醒等待队列不为空或者有更小到期时间元素加入的协程。(重点)

type Entry[T any] struct {
	Value   T
	Expired time.Time // 到期时间
}

// 延迟队列
type DelayQueue[T any] struct {
	h      *heap.Heap[*Entry[T]]
	mutex  sync.Mutex    // 保证并发安全
	wakeup chan struct{} // 唤醒通道
}

// 创建延迟队列
func New[T any]() *DelayQueue[T] {
	return &DelayQueue[T]{
		h: heap.New(nil, func(e1, e2 *Entry[T]) bool {
			return e1.Expired.Before(e2.Expired)
		}),
		wakeup: make(chan struct{}, 1),
	}
}

实现原理

阻塞获取元素的时候如果队列已经没有元素,或者没有元素到期,那么协程就需要挂起等待。而被唤醒的条件是元素到期、队列不为空或者有更小到期时间元素加入。

其中元素到期协程在阻塞获取元素时发现堆顶元素还没到期,因此这个条件可以自己构造并等待。但是条件队列不为空和有更小到期时间元素加入则需要另外一个协程在添加元素时才能满足,因此必须通过一个中间结构来进行协程间通信,一般Golang里面会使用Channel来实现。

添加元素

一开始加了一个互斥锁,避免并发冲突,然后把元素加到堆里。

因为我们Take()操作,既阻塞获取元素操作,在不满足条件时会去等待wakeup通道,但是等待通道前必须释放锁,否则Push()无法写入新元素去满足条件队列不为空和有更小到期时间元素加入。而从释放锁后到开始读取wakeup通道这段时间是没有锁保护的,如果Push()在这期间插入新元素,为了保证通道不阻塞同时又能通知到Take()协程,我们的通道的长度需要是1,同时使用select+default保证在通道里面已经有元素的时候不阻塞Push()协程。

// 添加延迟元素到队列
func (q *DelayQueue[T]) Push(value T, delay time.Duration) {
	q.mutex.Lock()
	defer q.mutex.Unlock()
	entry := &Entry[T]{
		Value:   value,
		Expired: time.Now().Add(delay),
	}
	q.h.Push(entry)
	// 唤醒等待的协程
	// 这里表示新添加的元素到期时间是最早的,或者原来队列为空
	// 因此必须唤醒等待的协程,因为可以拿到更早到期的元素
	if q.h.Peek() == entry {
		select {
		case q.wakeup <- struct{}{}:
		default:
		}
	}
}

阻塞获取元素

这里先判断堆是否有元素,如果有获取堆顶元素,然后判断是否已经到期,如果到期则直接出堆并返回。否则等待直到超时或者元素到期或者有新的元素到达。

这里在解锁之前会清空wakeup通道,这样可以保证下面读取的wakeup通道里的元素肯定是新加入的。

// 等待直到有元素到期
// 或者ctx被关闭
func (q *DelayQueue[T]) Take(ctx context.Context) (T, bool) {
	for {
		var expired *time.Timer
		q.mutex.Lock()
		// 有元素
		if !q.h.Empty() {
			// 获取元素
			entry := q.h.Peek()
			if time.Now().After(entry.Expired) {
				q.h.Pop()
				q.mutex.Unlock()
				return entry.Value, true
			}
			// 到期时间,使用time.NewTimer()才能够调用Stop(),从而释放定时器
			expired = time.NewTimer(time.Until(entry.Expired))
		}
		// 避免被之前的元素假唤醒
		select {
		case <-q.wakeup:
		default:
		}
		q.mutex.Unlock()

		// 不为空,需要同时等待元素到期
                // 并且除非expired到期,否则都需要关闭expired避免泄露
		if expired != nil {
			select {
			case <-q.wakeup: // 新的更快到期元素
				expired.Stop()
			case <-expired.C: // 首元素到期
			case <-ctx.Done(): // 被关闭
				expired.Stop()
				var t T
				return t, false
			}
		} else {
			select {
			case <-q.wakeup: // 新的更快到期元素
			case <-ctx.Done(): // 被关闭
				var t T
				return t, false
			}
		}
	}
}

Channel方式阻塞读取

Golang里面可以使用Channel进行流式消费,因此简单包装一个Channel形式的阻塞读取接口,给通道一点缓冲区大小可以带来更好的性能。

// 返回一个通道,输出到期元素
// size是通道缓存大小
func (q *DelayQueue[T]) Channel(ctx context.Context, size int) <-chan T {
	out := make(chan T, size)
	go func() {
		for {
			entry, ok := q.Take(ctx)
			if !ok {
				return
			}
			out <- entry
		}
	}()
	return out
}

使用方式

for entry := range q.Channel(context.Background(), 10) {
    // do something
}

性能测试

这里进行一个简单的性能测试,也就是先添加元素,然后等待到期后全部拿出来。

func BenchmarkPushAndTake(b *testing.B) {
	q := New[int]()
	b.ResetTimer()
        
        // 添加元素
	for i := 0; i < b.N; i++ {
		q.Push(i, time.Duration(i))
	}
        
        // 等待全部元素到期
	b.StopTimer()
	time.Sleep(time.Duration(b.N))
	b.StartTimer()

        // 获取元素
	for i := 0; i < b.N; i++ {
		_, ok := q.Take(context.Background())
		if !ok {
			b.Errorf("want %v, but %v", true, ok)
		}
	}
}

测试结果:

Benchmark-8      2331534               476.8 ns/op            76 B/op          1 allocs/op

总结

堆实现的延迟队列是一种实现起来比较简单的定时器(当然阻塞读取Take()是比较复杂的),由于时间复杂度是O(log(n)),因此可以满足定时任务数量不是特别多的场景。堆实现的延迟队列也是可以随机删除元素的,可以根据具体任务选择是否实现。如果对定时器性能要求比较敏感的话可以选择使用时间轮实现定时器,它可以在O(1)的时间复杂度添加和删除一个定时器,不过实现起来比较复杂(挖个坑,下篇文章实现)。

到此这篇关于基于Golang实现延迟队列(DelayQueue)的文章就介绍到这了,更多相关Golang延迟队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文