使用Golang如何实现简易的令牌桶算法
作者:CG国斌
简介
在网络中传输数据的时候时,为了防止网络拥塞,需限制流出网络的流量,使流量以比较均匀的速度向外发送。
令牌桶算法就实现了这个功能,可控制发送到网络上数据的数目,并允许突发数据的发送。
令牌桶算法是网络流量整形和速率限制中最常使用的一种算法。
大小固定的令牌桶可自行以恒定的速率源源不断地产生令牌。
如果令牌不被消耗,或者被消耗的速度小于产生的速度,令牌就会不断地增多,直到把桶填满。
后面再产生的令牌就会从桶中溢出。最后桶中可以保存的最大令牌数永远不会超过桶的大小。
传送到令牌桶的数据包需要消耗令牌。不同大小的数据包,消耗的令牌数量不一样。
令牌桶这种控制机制基于令牌桶中是否存在令牌来指示什么时候可以发送流量。令牌桶中的每一个令牌都代表一个字节。
如果令牌桶中存在令牌,则允许发送流量;而如果令牌桶中不存在令牌,则不允许发送流量。
因此,如果突发门限被合理地配置并且令牌桶中有足够的令牌,那么流量就可以以峰值速率发送。
与“令牌桶算法”类似的算法还有“漏桶算法”,这两种算法的主要区别在于“漏桶算法”能够强行限制数据的传输速率,而“令牌桶算法”在能够限制数据的平均传输速率外,还允许某种程度的突发传输。
在“令牌桶算法”中,只要令牌桶中存在令牌,那么就允许突发地传输数据直到达到用户配置的门限,因此它适合于具有突发特性的流量。
在本文中,我们使用 Golong 语言实现一个简单的“令牌桶算法”,或者说是“漏桶算法”更为合适。
实现
首先,我们假设令牌桶的放入令牌的速率是恒定的,不考虑流量速率突变的情况。
package awesomeProject import ( "sync" "time" ) // 定义令牌桶结构 type tokenBucket struct { limitRate int // 限制频率,即每分钟加入多少个令牌 tokenChan chan struct{} // 令牌通道,可以理解为桶 cap int // 令牌桶的容量 muLock *sync.Mutex // 令牌桶锁,保证线程安全 stop bool // 停止标记,结束令牌桶 } // NewTokenBucket 创建令牌桶 func NewTokenBucket(limitRate, cap int) *tokenBucket { if cap < 1 { panic("token bucket cap must be large 1") } return &tokenBucket{ tokenChan: make(chan struct{}, cap), limitRate: limitRate, muLock: new(sync.Mutex), cap: cap, } } // Start 开启令牌桶 func (b *tokenBucket) Start() { go b.produce() } // 生产令牌 func (b *tokenBucket) produce() { for { b.muLock.Lock() if b.stop { close(b.tokenChan) b.muLock.Unlock() return } b.tokenChan <- struct{}{} d := time.Minute / time.Duration(b.limitRate) b.muLock.Unlock() time.Sleep(d) } } // Consume 消费令牌 func (b *tokenBucket) Consume() { <-b.tokenChan } // Stop 停止令牌桶 func (b *tokenBucket) Stop() { b.muLock.Lock() defer b.muLock.Unlock() b.stop = true }
其中,
tokenBucket
为令牌桶的结构,包括限制频率、令牌桶容量和通道等;NewTokenBucket
为对外提供的创建令牌桶的方法;Start
为开启令牌桶的方法;produce
为以恒定速率生成令牌的方法,以协程的方式启动;Consume
为消费令牌的方法;Stop
为停止令牌桶的方法。
如上述所示,即为令牌桶的简易实现。
轮子
实际上,在 Go 语言中已经提供了对令牌桶的支持了,因此不需要我们重复造轮子。
令牌桶,go语言创建和使用令牌桶
什么是令牌桶
百度百科
令牌桶算法是网络流量整形(Traffic Shaping)和速率限制(Rate Limiting)中最常使用的一种算法。
典型情况下,令牌桶算法用来控制发送到网络上的数据的数目,并允许突发数据的发送。
更详细的自行搜索理解,这里只提供一下代码思路
基本使用
代码
package tokenBucket import ( "log" "sync" "time" ) type TokensBucket struct { limiter float64 //速率 burst int //桶大小 mu sync.Mutex //锁 tokens float64 //桶里面的令牌数量 last time.Time //最后一次消耗令牌的时间 } // NewTokensBucket 创建令牌桶 func NewTokensBucket(limiter float64, burst int) *TokensBucket { return &TokensBucket{limiter: limiter, burst: burst} } // Allow 使用,每次消耗一个令牌 func (t *TokensBucket) Allow() bool { return t.AllowN(time.Now(), 1) } // AllowN 当前时间,一次消耗的令牌 func (t *TokensBucket) AllowN(now time.Time, i int) bool { t.mu.Lock() defer t.mu.Unlock() //当前时间-最后一次添加令牌的时间 * 桶速率 = 应该补充的令牌 delta := now.Sub(t.last).Seconds() * t.limiter t.tokens += delta //桶内令牌 > 桶总大小 = 只补充最大令牌数 if t.tokens > float64(t.burst) { t.tokens = float64(t.burst) } //桶内令牌 < 需要的令牌 = 返回false if t.tokens < float64(i) { return false } //否则返回true,并用桶的剩余令牌 - 消耗令牌 t.tokens -= float64(i) //桶最后一次补充时间重置为当前时间 t.last = now //返回true return true }
测试
func main() { bucket := NewTokensBucket(3, 5) for true { n := 4 for i := 0; i < n; i++ { go func(i int) { if bucket.Allow() { log.Printf("allow [%d]", i) } else { log.Printf("forbid [%d]", i) } }(i) } time.Sleep(time.Second) log.Println("========================================") } }
在开发中使用
最基本的使用,实际开发肯定不是这样的要考虑到更多的情况,这里只是一个小的演示而已
func main() { app := gin.Default() bucket := tokenBucket.NewTokensBucket(1, 2) app.Use(func(context *gin.Context) { //拿到令牌就给放行 if bucket.Allow() { context.Next() //拿不到就不给过 } else { context.JSON(500, gin.H{ "msg": "false", }) context.Abort() } }) app.GET("/", func(context *gin.Context) { context.JSON(200, gin.H{ "msg": "success", }) }) app.Run(":80") }
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。