Golang无限缓存channel的设计与实现解析
作者:Y先森0.0
一.引言
Go语言的Channel有两种类型,一种是无缓存的channle,一个种是有缓存的channel,但是对于有缓存的channle来说,其缓存长度在创建时就已经固定了,中间也不能扩缩容,这导致对某些特定的业务场景来说不太方便
业务场景如下 :
爬虫场景,想爬取某个URL页面上可达的所有URL
一个channle中存在待处理的URL
一堆worker groutine从channle中读取URL,下载解析网页并且提取URL,再把URL放入channle
这种场景下,使用消息队列或sync包可以解决这个问题,但是比较复杂,如果有一个可以无限缓存的Channle或许是比较好的解决方案
二.设计
基于以上特定的业务场景,我们的无限缓存Channle应该满足以下要求:
缓存无限,最核心的基本要求。
不能阻塞写,普通channle的写操作之所以阻塞,是因为缓存满了,无限缓存的channle不应该存在这个问题。
无数据时阻塞读,此特性保持和普通channle一样。
读写都应通过channle操作 :通过channle的 <- 和 ->,第一个是方便,仍遵循普通channle的语法,第二是不能暴露内部缓存
channle被关闭后,未读取的数据应该仍然可读,此特性和普通channle保持一致
可基于数据量自动扩缩容,在数据量很大的时候要求可以自适应的扩容,在数据量变小后,为了避免内存浪费,要求可以自适应的缩容
针对以上要求,设计思想如下:
内部含有两个普通channle,分别用于读写,我们将其称作In和Out,往In中写入数据,然后从Out中读取数据
内部有一个可以自适应扩缩容的buf,当写channle满了写不了之后,写入到此buf中
内部含有一个工作goroutine,总是In中数据放入到Out或者buf中
内部的自适应扩缩容buf可以采用双向环形链表
和采用数组实现相比,优点如下:
数组大小是有限制的,语言层面就做不到真正的无限缓存
数组扩容代价大,而采用双向环形链表则只用增加节点即可,缩容同样
type T interface{} type UnlimitSizeChan struct { bufCount int64 // 统计元素数量,原子操作 In chan<- T // 写入channle Out <-chan T // 读取channle buffer *RingBuffer // 自适应扩缩容Buf }
双向环形链表 如何写入和读取数据,并且做到自适应扩缩容?
双向环形链表buf其结构类似于一个手串,手串上的珠子就可以当做是一个节点,每个节点可以是一个固定大小的数组
双向环形链表buf上分别有两个读写指针readCell和writeCell,指向将要进行读写操作的cell,负责进行数据读写
readCell永远追赶writeCell,当追上时,代表写满了,进行扩容操作
扩容操作即在写指针的后面插入一个新建的空闲cell
当buf中没有数据时,代表此时的流量高峰应该已经过去了,应该进行缩容操作
缩容操作修改链表指向即可,让buf恢复原样,仅保持两个cell即可,其他cell由于不再被引用,会被GC自动回收
cell上也有两个读写指针r和w,分别负责进行cell上的读写,也是r读指针永远追赶w写指针
type cell struct { Data []T // 数据部分 fullFlag bool // cell满的标志 next *cell // 指向后一个cellBuffer pre *cell // 指向前一个cellBuffer r int // 下一个要读的指针 w int // 下一个要下的指针 } type RingBuffer struct { cellCount int // cell 数量统计 readCell *cell // 下一个要读的cell writeCell *cell // 下一个要写的cell }
数据FIFO原则是如何保证的?
无限缓存Channle内部的Goroutine,我们称其为Worker
当Out channle还没有满时并且Buf中没有数据时,Worker将读取In中数据,将其放入Out,直到Out满
当Buf中有数据时,无论Out是否满,都将将In中读到的数据,直接写入到Buf中,目的就是为了保证数据的FIFO原则
当cell标记为满时,就算此cell中已经被读取了一部分数据了,此cell在读取完所有数据之前也不能用于写,目的也是为了保证数据的FIFO原则
三.实现
1.双向环形链表实现
package unlimitSizeChan import ( "errors" "fmt" ) var ErrRingIsEmpty = errors.New("ringbuffer is empty") // CellInitialSize cell的初始容量 var CellInitialSize = 1024 // CellInitialCount 初始化cell数量 var CellInitialCount = 2 type cell struct { Data []T // 数据部分 fullFlag bool // cell满的标志 next *cell // 指向后一个cellBuffer pre *cell // 指向前一个cellBuffer r int // 下一个要读的指针 w int // 下一个要下的指针 } type RingBuffer struct { cellCount int // cell 数量统计 readCell *cell // 下一个要读的cell writeCell *cell // 下一个要写的cell } // NewRingBuffer 新建一个ringbuffe,包含两个cell func NewRingBuffer() *RingBuffer { rootCell := &cell{ Data: make([]T, CellInitialSize), } lastCell := &cell{ Data: make([]T, CellInitialSize), } rootCell.pre = lastCell lastCell.pre = rootCell rootCell.next = lastCell lastCell.next = rootCell return &RingBuffer{ cellCount: CellInitialCount, readCell: rootCell, writeCell: rootCell, } } // Read 读取数据 func (r *RingBuffer) Read() (T, error) { // 无数据 if r.IsEmpty() { return nil, ErrRingIsEmpty } // 读取数据,并将读指针向右移动一位 value := r.readCell.Data[r.readCell.r] r.readCell.r++ // 此cell已经读完 if r.readCell.r == CellInitialSize { // 读指针归零,并将该cell状态置为非满 r.readCell.r = 0 r.readCell.fullFlag = false // 将readCell指向下一个cell r.readCell = r.readCell.next } return value, nil } // Pop 读一个元素,读完后移动指针 func (r *RingBuffer) Pop() T { value, err := r.Read() if err != nil { panic(err.Error()) } return value } // Peek 窥视 读一个元素,仅读但不移动指针 func (r *RingBuffer) Peek() T { if r.IsEmpty() { panic(ErrRingIsEmpty.Error()) } // 仅读 value := r.readCell.Data[r.readCell.r] return value } // Write 写入数据 func (r *RingBuffer) Write(value T) { // 在 r.writeCell.w 位置写入数据,指针向右移动一位 r.writeCell.Data[r.writeCell.w] = value r.writeCell.w++ // 当前cell写满了 if r.writeCell.w == CellInitialSize { // 指针置0,将该cell标记为已满,并指向下一个cell r.writeCell.w = 0 r.writeCell.fullFlag = true r.writeCell = r.writeCell.next } // 下一个cell也已满,扩容 if r.writeCell.fullFlag == true { r.grow() } } // grow 扩容 func (r *RingBuffer) grow() { // 新建一个cell newCell := &cell{ Data: make([]T, CellInitialSize), } // 总共三个cell,writeCell,preCell,newCell // 本来关系: preCell <===> writeCell // 现在将newcell插入:preCell <===> newCell <===> writeCell pre := r.writeCell.pre pre.next = newCell newCell.pre = pre newCell.next = r.writeCell r.writeCell.pre = newCell // 将writeCell指向新建的cell r.writeCell = r.writeCell.pre // cell 数量加一 r.cellCount++ } // IsEmpty 判断ringbuffer是否为空 func (r *RingBuffer) IsEmpty() bool { // readCell和writeCell指向同一个cell,并且该cell的读写指针也指向同一个位置,并且cell状态为非满 if r.readCell == r.writeCell && r.readCell.r == r.readCell.w && r.readCell.fullFlag == false { return true } return false } // Capacity ringBuffer容量 func (r *RingBuffer) Capacity() int { return r.cellCount * CellInitialSize } // Reset 重置为仅指向两个cell的ring func (r *RingBuffer) Reset() { lastCell := r.readCell.next lastCell.w = 0 lastCell.r = 0 r.readCell.r = 0 r.readCell.w = 0 r.cellCount = CellInitialCount lastCell.next = r.readCell }
2.无限缓存Channle实现
package unlimitSizeChan import "sync/atomic" type T interface{} // UnlimitSizeChan 无限缓存的Channle type UnlimitSizeChan struct { bufCount int64 // 统计元素数量,原子操作 In chan<- T // 写入channle Out <-chan T // 读取channle buffer *RingBuffer // 自适应扩缩容Buf } // Len uc中总共的元素数量 func (uc UnlimitSizeChan) Len() int { return len(uc.In) + uc.BufLen() + len(uc.Out) } // BufLen uc的buf中的元素数量 func (uc UnlimitSizeChan) BufLen() int { return int(atomic.LoadInt64(&uc.bufCount)) } // NewUnlimitSizeChan 新建一个无限缓存的Channle,并指定In和Out大小(In和Out设置得一样大) func NewUnlimitSizeChan(initCapacity int) *UnlimitSizeChan { return NewUnlitSizeChanSize(initCapacity, initCapacity) } // NewUnlitSizeChanSize 新建一个无限缓存的Channle,并指定In和Out大小(In和Out设置得不一样大) func NewUnlitSizeChanSize(initInCapacity, initOutCapacity int) *UnlimitSizeChan { in := make(chan T, initInCapacity) out := make(chan T, initOutCapacity) ch := UnlimitSizeChan{In: in, Out: out, buffer: NewRingBuffer()} go process(in, out, &ch) return &ch } // 内部Worker Groutine实现 func process(in, out chan T, ch *UnlimitSizeChan) { defer close(out) // in 关闭,数据读取后也把out关闭 // 不断从in中读取数据放入到out或者ringbuf中 loop: for { // 第一步:从in中读取数据 value, ok := <-in if !ok { // in 关闭了,退出loop break loop } // 第二步:将数据存储到out或者buf中 if atomic.LoadInt64(&ch.bufCount) > 0 { // 当buf中有数据时,新数据优先存放到buf中,确保数据FIFO原则 ch.buffer.Write(value) atomic.AddInt64(&ch.bufCount, 1) } else { // out 没有满,数据放入out中 select { case out <- value: continue default: } // out 满了,数据放入buf中 ch.buffer.Write(value) atomic.AddInt64(&ch.bufCount, 1) } // 第三步:处理buf,一直尝试把buf中的数据放入到out中,直到buf中没有数据 for !ch.buffer.IsEmpty() { select { // 为了避免阻塞in,还要尝试从in中读取数据 case val, ok := <-in: if !ok { // in 关闭了,退出loop break loop } // 因为这个时候out是满的,新数据直接放入buf中 ch.buffer.Write(val) atomic.AddInt64(&ch.bufCount, 1) // 将buf中数据放入out case out <- ch.buffer.Peek(): ch.buffer.Pop() atomic.AddInt64(&ch.bufCount, -1) if ch.buffer.IsEmpty() { // 避免内存泄露 ch.buffer.Reset() atomic.StoreInt64(&ch.bufCount, 0) } } } } // in被关闭退出loop后,buf中还有可能有未处理的数据,将他们塞入out中,并重置buf for !ch.buffer.IsEmpty() { out <- ch.buffer.Pop() atomic.AddInt64(&ch.bufCount, -1) } ch.buffer.Reset() atomic.StoreInt64(&ch.bufCount, 0) }
四.使用
ch := NewUnlimitSizeChan(1000) // or ch := NewUnlitSizeChanSize(100,200) go func() { for ...... { ... ch.In <- ... // send values ... } close(ch.In) // close In channel }() for v := range ch.Out { // read values fmt.Println(v) }
以上就是Golang无限缓存channel的设计与实现解析的详细内容,更多关于Golang无限缓存channel的资料请关注脚本之家其它相关文章!