Golang

关注公众号 jb51net

关闭
首页 > 脚本专栏 > Golang > golang 无锁队列

golang实现无锁队列的三种方式

作者:游学四方

本文主要介绍了golang实现无锁队列的三种方式,包括基于CAS操作的简单有界队列、Michael-Scott算法的无锁链表队列及Go简化指针操作方法,感兴趣的可以了解一下

1. 基于 CAS 的简单有界队列

使用固定大小的环形缓冲区,通过原子索引实现无锁。

package main
import (
	"fmt"
	"sync"
	"sync/atomic"
)
// LockFreeQueue 基于 CAS 的有界无锁队列
type LockFreeQueue struct {
	buffer []interface{}
	head   uint64 // 读取位置
	tail   uint64 // 写入位置
	cap    uint64
}
func NewLockFreeQueue(capacity int) *LockFreeQueue {
	return &LockFreeQueue{
		buffer: make([]interface{}, capacity),
		cap:    uint64(capacity),
	}
}
// Enqueue 入队
func (q *LockFreeQueue) Enqueue(val interface{}) bool {
	for {
		tail := atomic.LoadUint64(&q.tail)
		head := atomic.LoadUint64(&q.head)
		// 队列已满
		if tail-head >= q.cap {
			return false
		}
		// 尝试 CAS 更新 tail
		if atomic.CompareAndSwapUint64(&q.tail, tail, tail+1) {
			idx := tail % q.cap
			q.buffer[idx] = val
			return true
		}
		// CAS 失败,重试
	}
}
// Dequeue 出队
func (q *LockFreeQueue) Dequeue() (interface{}, bool) {
	for {
		head := atomic.LoadUint64(&q.head)
		tail := atomic.LoadUint64(&q.tail)
		// 队列为空
		if head == tail {
			return nil, false
		}
		// 尝试 CAS 更新 head
		if atomic.CompareAndSwapUint64(&q.head, head, head+1) {
			idx := head % q.cap
			val := q.buffer[idx]
			q.buffer[idx] = nil // 帮助 GC
			return val, true
		}
		// CAS 失败,重试
	}
}
func main() {
	q := NewLockFreeQueue(100)
	var wg sync.WaitGroup
	// 生产者
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(n int) {
			defer wg.Done()
			for j := 0; j < 100; j++ {
				q.Enqueue(n*100 + j)
			}
		}(i)
	}
	// 消费者
	var count int64
	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for {
				if val, ok := q.Dequeue(); ok {
					atomic.AddInt64(&count, 1)
					_ = val
				} else {
					// 空队列时短暂休眠避免忙等
					// 实际生产可用 runtime.Gosched()
				}
			}
		}()
	}
	wg.Wait()
	fmt.Printf("Processed: %d\n", atomic.LoadInt64(&count))
}

2. 无界链表队列(Michael-Scott 算法)

经典的无锁队列算法,使用链表实现,支持动态扩容。

package main
import (
	"fmt"
	"sync"
	"sync/atomic"
	"unsafe"
)
// node 链表节点
type node struct {
	value interface{}
	next  unsafe.Pointer // *node
}
// LockFreeListQueue 基于 Michael-Scott 算法的无锁队列
type LockFreeListQueue struct {
	head unsafe.Pointer // *node
	tail unsafe.Pointer // *node
}
func NewLockFreeListQueue() *LockFreeListQueue {
	n := unsafe.Pointer(&node{})
	return &LockFreeListQueue{
		head: n,
		tail: n,
	}
}
// Enqueue 入队
func (q *LockFreeListQueue) Enqueue(val interface{}) {
	newNode := &node{value: val}
	newNodePtr := unsafe.Pointer(newNode)
	for {
		tail := (*node)(atomic.LoadPointer(&q.tail))
		next := (*node)(atomic.LoadPointer(&tail.next))
		// 再次检查 tail 是否变化
		if tail != (*node)(atomic.LoadPointer(&q.tail)) {
			continue
		}
		if next == nil {
			// 尝试将新节点链接到尾部
			if atomic.CompareAndSwapPointer(&tail.next, unsafe.Pointer(nil), newNodePtr) {
				// 尝试更新 tail 指针
				atomic.CompareAndSwapPointer(&q.tail, unsafe.Pointer(tail), newNodePtr)
				return
			}
		} else {
			// 帮助推进 tail 指针
			atomic.CompareAndSwapPointer(&q.tail, unsafe.Pointer(tail), unsafe.Pointer(next))
		}
	}
}
// Dequeue 出队
func (q *LockFreeListQueue) Dequeue() (interface{}, bool) {
	for {
		head := (*node)(atomic.LoadPointer(&q.head))
		tail := (*node)(atomic.LoadPointer(&q.tail))
		next := (*node)(atomic.LoadPointer(&head.next))
		if head != (*node)(atomic.LoadPointer(&q.head)) {
			continue
		}
		if head == tail {
			if next == nil {
				return nil, false // 空队列
			}
			// 帮助推进 tail
			atomic.CompareAndSwapPointer(&q.tail, unsafe.Pointer(tail), unsafe.Pointer(next))
		} else {
			val := next.value
			if atomic.CompareAndSwapPointer(&q.head, unsafe.Pointer(head), unsafe.Pointer(next)) {
				return val, true
			}
		}
	}
}
func main() {
	q := NewLockFreeListQueue()
	var wg sync.WaitGroup
	var count int64
	// 生产者
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(n int) {
			defer wg.Done()
			for j := 0; j < 1000; j++ {
				q.Enqueue(n*1000 + j)
			}
		}(i)
	}
	// 消费者
	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for {
				if val, ok := q.Dequeue(); ok {
					atomic.AddInt64(&count, 1)
					_ = val
				}
			}
		}()
	}
	wg.Wait()
	fmt.Printf("Total dequeued: %d\n", atomic.LoadInt64(&count))
}

3、使用 sync/atomic 的简化版本(Go 1.19+)

Go 1.19 引入了 atomic.Pointer,可以简化指针操作:

locklessqueue.go

//locklessqueue.go
package lockless
import (
	"sync/atomic"
)
type LockFreeQueue struct {
	buf  []interface{}
	len  int32
	head int32
	tail int32
}
func NewQueue(n int32) *LockFreeQueue {
	q := &LockFreeQueue{buf: make([]interface{}, n+1, n+1), len: n + 1}
	return q
}
func (s *LockFreeQueue) PushBack(v interface{}) {
	for {
		tail := atomic.LoadInt32(&s.tail)
		n := (tail + 1) % s.len
		if atomic.CompareAndSwapInt32(&s.head, n, n) {
			continue // 队列满了
		}
		if !atomic.CompareAndSwapInt32(&s.tail, tail, n) {
			continue // 获取失败
		}
		s.buf[tail] = v
		break
	}
}
func (s *LockFreeQueue) PopFront() interface{} {
	for {
		tail := atomic.LoadInt32(&s.tail)
		head := atomic.LoadInt32(&s.head)
		if tail == head {
			continue
		}
		n := (head + 1) % s.len
		if !atomic.CompareAndSwapInt32(&s.head, head, n) {
			continue
		}
		return s.buf[head]
	}
}

测试代码
locklessqueue_test.go

//locklessqueue_test.go
package lockless
import (
	"sync"
	"testing"
)
func TestName(t *testing.T) {
	lq := NewQueue(10)
	w := sync.WaitGroup{}
	for i := 0; i < 100; i++ {
		w.Add(1)
		go func(gi int) {
			lq.PushBack(gi)
			w.Done()
		}(i)
	}
	go func() {
		for {
			lq.PopFront()
			//	time.Sleep(1 * time.Second)
		}
	}()
	w.Wait()
}
var ch = make(chan interface{}, 50000)
func BenchmarkGo_Chan(b *testing.B) {
	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		ch <- 123
		go func() {
			<-ch
		}()
	}
}
func BenchmarkGo_LockFree(b *testing.B) {
	lq := NewQueue(1000000000)
	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		lq.PushBack(123)
		go func() {
			lq.PopFront()
		}()
	}
}

执行命令

PS D:\golang\src\Test\Test> go test -v   locklessqueue_test.go locklessqueue.go     
=== RUN   TestName
--- PASS: TestName (0.00s)
PASS
ok      command-line-arguments  0.173s

性能测试

D:\golang\src\Test\Test>  go test -v -bench="." locklessqueue_test.go  locklessqueue.go
=== RUN   TestName
--- PASS: TestName (0.00s)
goos: windows
goarch: amd64
cpu: Intel(R) Core(TM) i5-9500F CPU @ 3.00GHz
BenchmarkGo_Chan
BenchmarkGo_Chan-6               2957491               414.7 ns/op
BenchmarkGo_LockFree
BenchmarkGo_LockFree-6           4356723               272.4 ns/op
PASS
ok      command-line-arguments  85.410s

到此这篇关于golang实现无锁队列的文章就介绍到这了,更多相关golang 无锁队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文