Golang

关注公众号 jb51net

关闭
首页 > 脚本专栏 > Golang > Go优先级队列

Go高级特性探究之优先级队列详解

作者:Goland猫

Heap 是一种数据结构,这种数据结构常用于实现优先队列,这篇文章主要就是来和大家深入探讨一下GO语言中的优先级队列,感兴趣的可以了解一下

什么是heap

Heap 是一种数据结构,其中包含一个特殊的根节点,且每个节点的值都不小于(或不大于)其所有子节点的值。这种数据结构常用于实现优先队列。

Heap的数据结构

Heap 可以通过一个数组来实现,这个数组满足以下条件:

堆具有以下属性。

完全二叉树和满二叉树的区别如下所示。

根节点最大的堆称为最大堆或大根堆,根节点最小的堆称为最小堆或小根堆。

由于堆是完全二叉树,因此它们可以表示为顺序数组,如下所示。

如何实现优先级队列

优先队列是一种数据结构,其中每个元素都有一个优先级,优先级高的元素在前面,优先级相同时按照插入顺序排列。可以使用堆来实现优先队列。实现优先队列的关键是将一个元素添加到队列中,并保持队列中的元素有序。如果使用数组来存储元素,需要频繁对数组进行调整,时间复杂度是O(n),不够高效。如果使用堆来存储元素,则可以在插入时进行堆化,时间复杂度是O(nlogn)。

在堆中,节点的位置与它们在数组中的位置有一定的关系。例如,根节点位于数组的第一个元素,其他节点依次排列。左子节点位于(2i),右子节点位于(2i+1),父节点位于(i/2)。这个关系可以方便地实现在数组上进行堆化的操作。

image.png

为什么需要使用优先级队列

优先级队列是一种非常有用的数据结构,在很多应用中都会被广泛使用。比如作业调度、事件管理等领域,都需要使用优先级队列来帮助处理任务以及事件等的优先级顺序。

优点和缺点

优点

缺点

heap PriorityQueue实现

代码来自github.com/hashicorp/vault/blob/main/sdk/queue/priority_queue.go

package go_pool_priority
import (
	"container/heap"
	"errors"
	"sync"
	"github.com/mitchellh/copystructure"
)
// ErrEmpty is returned for queues with no items
var ErrEmpty = errors.New("queue is empty")
// ErrDuplicateItem is returned when the queue attmepts to push an item to a key that
// already exists. The queue does not attempt to update, instead returns this
// error. If an Item needs to be updated or replaced, pop the item first.
var ErrDuplicateItem = errors.New("duplicate item")
// New initializes the internal data structures and returns a new
// PriorityQueue
func NewPriorityQueue() *PriorityQueue {
	pq := PriorityQueue{
		data:    make(queue, 0),
		dataMap: make(map[string]*Item),
	}
	heap.Init(&pq.data)
	return &pq
}
// PriorityQueue facilitates queue of Items, providing Push, Pop, and
// PopByKey convenience methods. The ordering (priority) is an int64 value
// with the smallest value is the highest priority. PriorityQueue maintains both
// an internal slice for the queue as well as a map of the same items with their
// keys as the index. This enables users to find specific items by key. The map
// must be kept in sync with the data slice.
// See https://golang.org/pkg/container/heap/#example__priorityQueue
type PriorityQueue struct {
	// data is the internal structure that holds the queue, and is operated on by
	// heap functions
	data queue
	// dataMap represents all the items in the queue, with unique indexes, used
	// for finding specific items. dataMap is kept in sync with the data slice
	dataMap map[string]*Item
	// lock is a read/write mutex, and used to facilitate read/write locks on the
	// data and dataMap fields
	lock sync.RWMutex
}
// queue is the internal data structure used to satisfy heap.Interface. This
// prevents users from calling Pop and Push heap methods directly
type queue []*Item
// Item is something managed in the priority queue
type Item struct {
	// Key is a unique string used to identify items in the internal data map
	Key string
	// Value is an unspecified type that implementations can use to store
	// information
	Value interface{}
	// Priority determines ordering in the queue, with the lowest value being the
	// highest priority
	Priority int64
	// index is an internal value used by the heap package, and should not be
	// modified by any consumer of the priority queue
	index int
}
// Len returns the count of items in the Priority Queue
func (pq *PriorityQueue) Len() int {
	pq.lock.RLock()
	defer pq.lock.RUnlock()
	return pq.data.Len()
}
// Pop pops the highest priority item from the queue. This is a
// wrapper/convenience method that calls heap.Pop, so consumers do not need to
// invoke heap functions directly
func (pq *PriorityQueue) Pop() (*Item, error) {
	pq.lock.Lock()
	defer pq.lock.Unlock()
	if pq.data.Len() == 0 {
		return nil, ErrEmpty
	}
	item := heap.Pop(&pq.data).(*Item)
	delete(pq.dataMap, item.Key)
	return item, nil
}
// Push pushes an item on to the queue. This is a wrapper/convenience
// method that calls heap.Push, so consumers do not need to invoke heap
// functions directly. Items must have unique Keys, and Items in the queue
// cannot be updated. To modify an Item, users must first remove it and re-push
// it after modifications
func (pq *PriorityQueue) Push(i *Item) error {
	if i == nil || i.Key == "" {
		return errors.New("error adding item: Item Key is required")
	}
	pq.lock.Lock()
	defer pq.lock.Unlock()
	if _, ok := pq.dataMap[i.Key]; ok {
		return ErrDuplicateItem
	}
	// Copy the item value(s) so that modifications to the source item does not
	// affect the item on the queue
	clone, err := copystructure.Copy(i)
	if err != nil {
		return err
	}
	pq.dataMap[i.Key] = clone.(*Item)
	heap.Push(&pq.data, clone)
	return nil
}
// PopByKey searches the queue for an item with the given key and removes it
// from the queue if found. Returns nil if not found. This method must fix the
// queue after removing any key.
func (pq *PriorityQueue) PopByKey(key string) (*Item, error) {
	pq.lock.Lock()
	defer pq.lock.Unlock()
	item, ok := pq.dataMap[key]
	if !ok {
		return nil, nil
	}
	// Remove the item the heap and delete it from the dataMap
	itemRaw := heap.Remove(&pq.data, item.index)
	delete(pq.dataMap, key)
	if itemRaw != nil {
		if i, ok := itemRaw.(*Item); ok {
			return i, nil
		}
	}
	return nil, nil
}
// Len returns the number of items in the queue data structure. Do not use this
// method directly on the queue, use PriorityQueue.Len() instead.
func (q queue) Len() int { return len(q) }
// Less returns whether the Item with index i should sort before the Item with
// index j in the queue. This method is used by the queue to determine priority
// internally; the Item with the lower value wins. (priority zero is higher
// priority than 1). The priority of Items with equal values is undetermined.
func (q queue) Less(i, j int) bool {
	return q[i].Priority < q[j].Priority
}
// Swap swaps things in-place; part of sort.Interface
func (q queue) Swap(i, j int) {
	q[i], q[j] = q[j], q[i]
	q[i].index = i
	q[j].index = j
}
// Push is used by heap.Interface to push items onto the heap. This method is
// invoked by container/heap, and should not be used directly.
// See: https://golang.org/pkg/container/heap/#Interface
func (q *queue) Push(x interface{}) {
	n := len(*q)
	item := x.(*Item)
	item.index = n
	*q = append(*q, item)
}
// Pop is used by heap.Interface to pop items off of the heap. This method is
// invoked by container/heap, and should not be used directly.
// See: https://golang.org/pkg/container/heap/#Interface
func (q *queue) Pop() interface{} {
	old := *q
	n := len(old)
	item := old[n-1]
	old[n-1] = nil  // avoid memory leak
	item.index = -1 // for safety
	*q = old[0 : n-1]
	return item
}

- 内部使用container/heap中的Interface接口实现堆结构;

- 提供了Push、Pop和PopByKey等一系列方法;

- 使用一个内部slice和一个以Key为索引的映射map来维护队列元素;

- 根据元素的Priority值进行优先级排序,Priority值越小表示优先级越高;

- 在Push时需要保证Key值唯一;

- PopByKey方法可以根据Key查找并移除对应的元素。

实现思路

既然,我们了解了heap的一些特性,那么我们接下来就要考虑如何用现有的数据结构,实现优先队列。

我们都知道,无论是哪一种队列,必然是存在生产者和消费者两个部分,对于优先级队列来说,更是如此。因此,咱们的实现思路,也将从这两个部分来谈。

生产者

对于生产者来说,他只需要推送一个任务及其优先级过来,咱们就得根据优先级处理他的任务。

由于,我们不大好判断,到底会有多少种不同的优先级传过来,也无法确定,每种优先级下有多少个任务要处理,所以,我们可以直接使用heap存储task

消费者

对于消费者来说,他需要获取优先级最高的任务进行消费。使用heap pop 取出优先级最高的任务即可

数据结构

(1)优先级队列对象

type PriorityQueueTask struct {
	mLock    sync.Mutex // 互斥锁,queues和priorities并发操作时使用,当然针对当前读多写少的场景,也可以使用读写锁
	pushChan chan *task // 推送任务管道
	pq       *PriorityQueue
}

(2)任务对象

type task struct {
	priority int64 // 任务的优先级
	value    interface{}
	key      string
}

初始化优先级队列对象

在初始化对象时,需要先通过 NewPriorityQueue() 函数创建一个空的 PriorityQueue,然后再创建一个 PriorityQueueTask 对象,并将刚刚创建的 PriorityQueue 赋值给该对象的 pq 属性。同时,还要创建一个用于接收推送任务的管道,用于在生产者推送任务时,将新任务添加到队列中。

func NewPriorityQueueTask() *PriorityQueueTask {
	pq := &PriorityQueueTask{
		pushChan: make(chan *task, 100),
		pq:       NewPriorityQueue(),
	}
	// 监听pushChan
	go pq.listenPushChan()
	return pq
}
func (pq *PriorityQueueTask) listenPushChan() {
	for {
		select {
		case taskEle := <-pq.pushChan:
			pq.mLock.Lock()
			pq.pq.Push(&Item{Key: taskEle.key, Priority: taskEle.priority, Value: taskEle.value})
			pq.mLock.Unlock()
		}
	}
}

生产者推送任务

生产者向推送任务管道中推送新任务时,实际上是将一个 task 结构体实例发送到了管道中。在 task 结构体中,priority 属性表示这个任务的优先级,value 属性表示这个任务的值,key 属性表示这个任务的键。

// 插入work
func (pq *PriorityQueueTask) Push(priority int64, value interface{}, key string) {
    pq.pushChan <- &task{
        value:    value,
        priority: priority,
        key:      key,
    }
}

消费者消费队列

消费者从队列中取出一个任务,然后进行相应的操作。在这段代码中,消费者轮询获取最高优先级的任务。如果没有获取到任务,则继续轮询;如果获取到了任务,则执行对应的操作。在这里,执行操作的具体形式是打印任务的编号、优先级等信息。

// Consume 消费者轮询获取最高优先级的任务
func (pq *PriorityQueueTask) Consume() {
    for {
        task := pq.Pop()
        if task == nil {
            // 未获取到任务,则继续轮询
            time.Sleep(time.Millisecond)
            continue
        }
        // 获取到了任务,就执行任务
        fmt.Println("推送任务的编号为:", task.Value)
        fmt.Println("推送的任务优先级为:", task.Priority)
        fmt.Println("============")
    }
}

完整代码

package go_pool_priority
import (
	"fmt"
	"sync"
	"time"
)
type PriorityQueueTask struct {
	mLock    sync.Mutex // 互斥锁,queues和priorities并发操作时使用,当然针对当前读多写少的场景,也可以使用读写锁
	pushChan chan *task // 推送任务管道
	pq       *PriorityQueue
}
type task struct {
	priority int64 // 任务的优先级
	value    interface{}
	key      string
}
func NewPriorityQueueTask() *PriorityQueueTask {
	pq := &PriorityQueueTask{
		pushChan: make(chan *task, 100),
		pq:       NewPriorityQueue(),
	}
	// 监听pushChan
	go pq.listenPushChan()
	return pq
}
func (pq *PriorityQueueTask) listenPushChan() {
	for {
		select {
		case taskEle := <-pq.pushChan:
			pq.mLock.Lock()
			pq.pq.Push(&Item{Key: taskEle.key, Priority: taskEle.priority, Value: taskEle.value})
			pq.mLock.Unlock()
		}
	}
}
// 插入work
func (pq *PriorityQueueTask) Push(priority int64, value interface{}, key string) {
	pq.pushChan <- &task{
		value:    value,
		priority: priority,
		key:      key,
	}
}
// Pop 取出最高优先级队列中的一个任务
func (pq *PriorityQueueTask) Pop() *Item {
	pq.mLock.Lock()
	defer pq.mLock.Unlock()
	item, err := pq.pq.Pop()
	if err != nil {
		return nil
	}
	// 如果所有队列都没有任务,则返回null
	return item
}
// Consume 消费者轮询获取最高优先级的任务
func (pq *PriorityQueueTask) Consume() {
	for {
		task := pq.Pop()
		if task == nil {
			// 未获取到任务,则继续轮询
			time.Sleep(time.Millisecond)
			continue
		}
		// 获取到了任务,就执行任务
		fmt.Println("推送任务的编号为:", task.Value)
		fmt.Println("推送的任务优先级为:", task.Priority)
		fmt.Println("============")
	}
}

测试用例

func TestQueue(t *testing.T) {
	defer func() {
		if err := recover(); err != nil {
			fmt.Println(err)
		}
	}()
	pq := NewPriorityQueueTask()
	// 我们在这里,随机生成一些优先级任务
	for i := 0; i < 100; i++ {
		a := rand.Intn(1000)
		go func(a int64) {
			pq.Push(a, a, strconv.Itoa(int(a)))
		}(int64(a))
	}
	// 这里会阻塞,消费者会轮询查询任务队列
	pq.Consume()
}

以上就是Go高级特性探究之优先级队列详解的详细内容,更多关于Go优先级队列的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文