Golang

关注公众号 jb51net

关闭
首页 > 脚本专栏 > Golang > Golang无限缓存channel

Golang无限缓存channel的设计与实现解析

作者:Y先森0.0

这篇文章主要为大家介绍了Golang无限缓存channel的设计与实现解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

一.引言

Go语言的Channel有两种类型,一种是无缓存的channle,一个种是有缓存的channel,但是对于有缓存的channle来说,其缓存长度在创建时就已经固定了,中间也不能扩缩容,这导致对某些特定的业务场景来说不太方便

业务场景如下 :

爬虫场景,想爬取某个URL页面上可达的所有URL

一个channle中存在待处理的URL

一堆worker groutine从channle中读取URL,下载解析网页并且提取URL,再把URL放入channle

这种场景下,使用消息队列或sync包可以解决这个问题,但是比较复杂,如果有一个可以无限缓存的Channle或许是比较好的解决方案

二.设计

基于以上特定的业务场景,我们的无限缓存Channle应该满足以下要求:

缓存无限,最核心的基本要求。

不能阻塞写,普通channle的写操作之所以阻塞,是因为缓存满了,无限缓存的channle不应该存在这个问题。

无数据时阻塞读,此特性保持和普通channle一样。

读写都应通过channle操作 :通过channle的 <- 和 ->,第一个是方便,仍遵循普通channle的语法,第二是不能暴露内部缓存

channle被关闭后,未读取的数据应该仍然可读,此特性和普通channle保持一致

可基于数据量自动扩缩容,在数据量很大的时候要求可以自适应的扩容,在数据量变小后,为了避免内存浪费,要求可以自适应的缩容

针对以上要求,设计思想如下:

内部含有两个普通channle,分别用于读写,我们将其称作In和Out,往In中写入数据,然后从Out中读取数据

内部有一个可以自适应扩缩容的buf,当写channle满了写不了之后,写入到此buf中

内部含有一个工作goroutine,总是In中数据放入到Out或者buf中

内部的自适应扩缩容buf可以采用双向环形链表

和采用数组实现相比,优点如下:

数组大小是有限制的,语言层面就做不到真正的无限缓存

数组扩容代价大,而采用双向环形链表则只用增加节点即可,缩容同样

type T interface{}
type UnlimitSizeChan struct {
bufCount int64    // 统计元素数量,原子操作
In       chan<- T // 写入channle
Out      <-chan T    // 读取channle
buffer   *RingBuffer // 自适应扩缩容Buf
}

双向环形链表 如何写入和读取数据,并且做到自适应扩缩容?

双向环形链表buf其结构类似于一个手串,手串上的珠子就可以当做是一个节点,每个节点可以是一个固定大小的数组

双向环形链表buf上分别有两个读写指针readCell和writeCell,指向将要进行读写操作的cell,负责进行数据读写

readCell永远追赶writeCell,当追上时,代表写满了,进行扩容操作

扩容操作即在写指针的后面插入一个新建的空闲cell

当buf中没有数据时,代表此时的流量高峰应该已经过去了,应该进行缩容操作

缩容操作修改链表指向即可,让buf恢复原样,仅保持两个cell即可,其他cell由于不再被引用,会被GC自动回收

cell上也有两个读写指针r和w,分别负责进行cell上的读写,也是r读指针永远追赶w写指针

type cell struct {
	Data     []T   // 数据部分
	fullFlag bool  // cell满的标志
	next     *cell // 指向后一个cellBuffer
	pre      *cell // 指向前一个cellBuffer
	r int // 下一个要读的指针
	w int // 下一个要下的指针
}
type RingBuffer struct {
	cellCount int // cell 数量统计
	readCell  *cell // 下一个要读的cell
	writeCell *cell // 下一个要写的cell
}

数据FIFO原则是如何保证的?

无限缓存Channle内部的Goroutine,我们称其为Worker

当Out channle还没有满时并且Buf中没有数据时,Worker将读取In中数据,将其放入Out,直到Out满

当Buf中有数据时,无论Out是否满,都将将In中读到的数据,直接写入到Buf中,目的就是为了保证数据的FIFO原则

当cell标记为满时,就算此cell中已经被读取了一部分数据了,此cell在读取完所有数据之前也不能用于写,目的也是为了保证数据的FIFO原则

三.实现

1.双向环形链表实现

package unlimitSizeChan
import (
	"errors"
	"fmt"
)
var ErrRingIsEmpty = errors.New("ringbuffer is empty")
// CellInitialSize cell的初始容量
var CellInitialSize = 1024
// CellInitialCount 初始化cell数量
var CellInitialCount = 2
type cell struct {
	Data     []T   // 数据部分
	fullFlag bool  // cell满的标志
	next     *cell // 指向后一个cellBuffer
	pre      *cell // 指向前一个cellBuffer
	r int // 下一个要读的指针
	w int // 下一个要下的指针
}
type RingBuffer struct {
	cellCount int // cell 数量统计
	readCell  *cell // 下一个要读的cell
	writeCell *cell // 下一个要写的cell
}
// NewRingBuffer 新建一个ringbuffe,包含两个cell
func NewRingBuffer() *RingBuffer {
	rootCell := &cell{
		Data: make([]T, CellInitialSize),
	}
	lastCell := &cell{
		Data: make([]T, CellInitialSize),
	}
	rootCell.pre = lastCell
	lastCell.pre = rootCell
	rootCell.next = lastCell
	lastCell.next = rootCell
	return &RingBuffer{
		cellCount: CellInitialCount,
		readCell:  rootCell,
		writeCell: rootCell,
	}
}
// Read 读取数据
func (r *RingBuffer) Read() (T, error) {
	// 无数据
	if r.IsEmpty() {
		return nil, ErrRingIsEmpty
	}
	// 读取数据,并将读指针向右移动一位
	value := r.readCell.Data[r.readCell.r]
	r.readCell.r++
	// 此cell已经读完
	if r.readCell.r == CellInitialSize {
		// 读指针归零,并将该cell状态置为非满
		r.readCell.r = 0
		r.readCell.fullFlag = false
		// 将readCell指向下一个cell
		r.readCell = r.readCell.next
	}
	return value, nil
}
// Pop 读一个元素,读完后移动指针
func (r *RingBuffer) Pop() T {
	value, err := r.Read()
	if err != nil {
		panic(err.Error())
	}
	return value
}
// Peek 窥视 读一个元素,仅读但不移动指针
func (r *RingBuffer) Peek() T {
	if r.IsEmpty() {
		panic(ErrRingIsEmpty.Error())
	}
	// 仅读
	value := r.readCell.Data[r.readCell.r]
	return value
}
// Write 写入数据
func (r *RingBuffer) Write(value T) {
	// 在 r.writeCell.w 位置写入数据,指针向右移动一位
	r.writeCell.Data[r.writeCell.w] = value
	r.writeCell.w++
	// 当前cell写满了
	if r.writeCell.w == CellInitialSize {
		// 指针置0,将该cell标记为已满,并指向下一个cell
		r.writeCell.w = 0
		r.writeCell.fullFlag = true
		r.writeCell = r.writeCell.next
	}
	// 下一个cell也已满,扩容
	if r.writeCell.fullFlag == true {
		r.grow()
	}
}
// grow 扩容
func (r *RingBuffer) grow() {
	// 新建一个cell
	newCell := &cell{
		Data: make([]T, CellInitialSize),
	}
	// 总共三个cell,writeCell,preCell,newCell
	// 本来关系: preCell <===> writeCell
	// 现在将newcell插入:preCell <===> newCell <===> writeCell
	pre := r.writeCell.pre
	pre.next = newCell
	newCell.pre = pre
	newCell.next = r.writeCell
	r.writeCell.pre = newCell
	// 将writeCell指向新建的cell
	r.writeCell = r.writeCell.pre
	// cell 数量加一
	r.cellCount++
}
// IsEmpty 判断ringbuffer是否为空
func (r *RingBuffer) IsEmpty() bool {
	// readCell和writeCell指向同一个cell,并且该cell的读写指针也指向同一个位置,并且cell状态为非满
	if r.readCell == r.writeCell && r.readCell.r == r.readCell.w && r.readCell.fullFlag == false {
		return true
	}
	return false
}
// Capacity ringBuffer容量
func (r *RingBuffer) Capacity() int {
	return r.cellCount * CellInitialSize
}
// Reset 重置为仅指向两个cell的ring
func (r *RingBuffer) Reset() {
	lastCell := r.readCell.next
	lastCell.w = 0
	lastCell.r = 0
	r.readCell.r = 0
	r.readCell.w = 0
	r.cellCount = CellInitialCount
	lastCell.next = r.readCell
}

2.无限缓存Channle实现

package unlimitSizeChan
import "sync/atomic"
type T interface{}
// UnlimitSizeChan 无限缓存的Channle
type UnlimitSizeChan struct {
	bufCount int64       // 统计元素数量,原子操作
	In       chan<- T    // 写入channle
	Out      <-chan T    // 读取channle
	buffer   *RingBuffer // 自适应扩缩容Buf
}
// Len uc中总共的元素数量
func (uc UnlimitSizeChan) Len() int {
	return len(uc.In) + uc.BufLen() + len(uc.Out)
}
// BufLen uc的buf中的元素数量
func (uc UnlimitSizeChan) BufLen() int {
	return int(atomic.LoadInt64(&uc.bufCount))
}
// NewUnlimitSizeChan 新建一个无限缓存的Channle,并指定In和Out大小(In和Out设置得一样大)
func NewUnlimitSizeChan(initCapacity int) *UnlimitSizeChan {
	return NewUnlitSizeChanSize(initCapacity, initCapacity)
}
// NewUnlitSizeChanSize 新建一个无限缓存的Channle,并指定In和Out大小(In和Out设置得不一样大)
func NewUnlitSizeChanSize(initInCapacity, initOutCapacity int) *UnlimitSizeChan {
	in := make(chan T, initInCapacity)
	out := make(chan T, initOutCapacity)
	ch := UnlimitSizeChan{In: in, Out: out, buffer: NewRingBuffer()}
	go process(in, out, &ch)
	return &ch
}
// 内部Worker Groutine实现
func process(in, out chan T, ch *UnlimitSizeChan) {
	defer close(out) // in 关闭,数据读取后也把out关闭
	// 不断从in中读取数据放入到out或者ringbuf中
loop:
	for {
		// 第一步:从in中读取数据
		value, ok := <-in
		if !ok {
			// in 关闭了,退出loop
			break loop
		}
		// 第二步:将数据存储到out或者buf中
		if atomic.LoadInt64(&ch.bufCount) > 0 {
			// 当buf中有数据时,新数据优先存放到buf中,确保数据FIFO原则
			ch.buffer.Write(value)
			atomic.AddInt64(&ch.bufCount, 1)
		} else {
			// out 没有满,数据放入out中
			select {
			case out <- value:
				continue
			default:
			}
			// out 满了,数据放入buf中
			ch.buffer.Write(value)
			atomic.AddInt64(&ch.bufCount, 1)
		}
		// 第三步:处理buf,一直尝试把buf中的数据放入到out中,直到buf中没有数据
		for !ch.buffer.IsEmpty() {
			select {
			// 为了避免阻塞in,还要尝试从in中读取数据
			case val, ok := <-in:
				if !ok {
					// in 关闭了,退出loop
					break loop
				}
				// 因为这个时候out是满的,新数据直接放入buf中
				ch.buffer.Write(val)
				atomic.AddInt64(&ch.bufCount, 1)
			// 将buf中数据放入out
			case out <- ch.buffer.Peek():
				ch.buffer.Pop()
				atomic.AddInt64(&ch.bufCount, -1)
				if ch.buffer.IsEmpty() { // 避免内存泄露
					ch.buffer.Reset()
					atomic.StoreInt64(&ch.bufCount, 0)
				}
			}
		}
	}
	// in被关闭退出loop后,buf中还有可能有未处理的数据,将他们塞入out中,并重置buf
	for !ch.buffer.IsEmpty() {
		out <- ch.buffer.Pop()
		atomic.AddInt64(&ch.bufCount, -1)
	}
	ch.buffer.Reset()
	atomic.StoreInt64(&ch.bufCount, 0)
}

四.使用

ch := NewUnlimitSizeChan(1000)
// or ch := NewUnlitSizeChanSize(100,200)
go func() {
    for ...... {
        ...
        ch.In <- ... // send values
        ...
    }
    close(ch.In) // close In channel
}()
for v := range ch.Out { // read values
    fmt.Println(v)
}

以上就是Golang无限缓存channel的设计与实现解析的详细内容,更多关于Golang无限缓存channel的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文