redis redisson 限流器的实例(RRateLimiter)
作者:o_瓜田李下_o
redis redisson 限流器实例
作用:限制一段时间内对数据的访问数量
相关接口
RRateLimiter
public interface RRateLimiter extends RRateLimiterAsync, RObject { boolean trySetRate(RateType var1, long var2, long var4, RateIntervalUnit var6); //设置访问速率,var2为访问数,var4为单位时间,var6为时间单位 void acquire(); //访问数据 void acquire(long var1); //占var1的速度计算值 boolean tryAcquire(); //尝试访问数据 boolean tryAcquire(long var1); //尝试访问数据,占var1的速度计算值 boolean tryAcquire(long var1, TimeUnit var3); //尝试访问数据,设置等待时间var3 boolean tryAcquire(long var1, long var3, TimeUnit var5); //尝试访问数据,占数据计算值var1,设置等待时间var3 RateLimiterConfig getConfig(); }
RateType
:速度类型
public enum RateType { OVERALL, //所有客户端加总限流 PER_CLIENT; //每个客户端单独计算流量 private RateType() { } }
RateInternalUnit
:速度单位
public enum RateIntervalUnit { MILLISECONDS { public long toMillis(long value) { return value; } }, SECONDS { public long toMillis(long value) { return TimeUnit.SECONDS.toMillis(value); } }, MINUTES { public long toMillis(long value) { return TimeUnit.MINUTES.toMillis(value); } }, HOURS { public long toMillis(long value) { return TimeUnit.HOURS.toMillis(value); } }, DAYS { public long toMillis(long value) { return TimeUnit.DAYS.toMillis(value); } }; private RateIntervalUnit() { } public abstract long toMillis(long var1); }
示例
public class MyTest8 { public static void main(String[] args){ Config config=new Config(); config.useSingleServer().setAddress("redis://192.168.57.120:6379").setPassword("123456"); RedissonClient client= Redisson.create(config); RRateLimiter rateLimiter=client.getRateLimiter("rate_limiter"); rateLimiter.trySetRate(RateType.PER_CLIENT,5,2, RateIntervalUnit.MINUTES); ExecutorService executorService= Executors.newFixedThreadPool(10); for (int i=0;i<10;i++){ executorService.submit(()->{ try{ rateLimiter.acquire(); System.out.println("线程"+Thread.currentThread().getId()+"进入数据区:"+System.currentTimeMillis()); }catch (Exception e){ e.printStackTrace(); } }); } } }
控制台输出
线程49进入数据区:1574672546522
线程55进入数据区:1574672546522
线程56进入数据区:1574672546526
线程50进入数据区:1574672546523
线程48进入数据区:1574672546523
线程51进入数据区:1574672666627
线程53进入数据区:1574672666627
线程54进入数据区:1574672666627
线程57进入数据区:1574672666628
线程52进入数据区:1574672666628
说明:两分钟之内最多只有5个线程在执行
分布式限流redission RRateLimiter使用及原理
前提:
最近公司在做有需求在做分布式限流,调研的限流框架大概有
- 1、spring cloud gateway集成redis限流,但属于网关层限流
- 2、阿里Sentinel,功能强大、带监控平台
- 3、srping cloud hystrix,属于接口层限流,提供线程池与信号量两种方式
- 4、其他:redission、手撸代码
实际需求情况属于业务端限流,redission更加方便,使用更加灵活,下面介绍下redission分布式限流如何使用及原理:
一、使用
使用很简单、如下
// 1、 声明一个限流器 RRateLimiter rateLimiter = redissonClient.getRateLimiter(key); // 2、 设置速率,5秒中产生3个令牌 rateLimiter.trySetRate(RateType.OVERALL, 3, 5, RateIntervalUnit.SECONDS); // 3、试图获取一个令牌,获取到返回true rateLimiter.tryAcquire(1)
二、原理
1、getRateLimiter
// 声明一个限流器 名称 叫key redissonClient.getRateLimiter(key)
2、trySetRate
trySetRate方法跟进去底层实现如下:
@Override public RFuture<Boolean> trySetRateAsync(RateType type, long rate, long rateInterval, RateIntervalUnit unit) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]);" + "redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]);" + "return redis.call('hsetnx', KEYS[1], 'type', ARGV[3]);", Collections.<Object>singletonList(getName()), rate, unit.toMillis(rateInterval), type.ordinal()); }
举个例子,更容易理解:
比如下面这段代码,5秒中产生3个令牌,并且所有实例共享(RateType.OVERALL所有实例共享、RateType.CLIENT单实例端共享)
trySetRate(RateType.OVERALL, 3, 5, RateIntervalUnit.SECONDS);
那么redis中就会设置3个参数:
hsetnx,key,rate,3
hsetnx,key,interval,5
hsetnx,key,type,0
接着看tryAcquire(1)方法:底层源码如下
private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "local rate = redis.call('hget', KEYS[1], 'rate');" //1 + "local interval = redis.call('hget', KEYS[1], 'interval');" //2 + "local type = redis.call('hget', KEYS[1], 'type');" //3 + "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')" //4 + "local valueName = KEYS[2];" //5 + "if type == 1 then " + "valueName = KEYS[3];" //6 + "end;" + "local currentValue = redis.call('get', valueName); " //7 + "if currentValue ~= false then " + "if tonumber(currentValue) < tonumber(ARGV[1]) then " //8 + "return redis.call('pttl', valueName); " + "else " + "redis.call('decrby', valueName, ARGV[1]); " //9 + "return nil; " + "end; " + "else " //10 + "redis.call('set', valueName, rate, 'px', interval); " + "redis.call('decrby', valueName, ARGV[1]); " + "return nil; " + "end;", Arrays.<Object>asList(getName(), getValueName(), getClientValueName()), value, commandExecutor.getConnectionManager().getId().toString()); }
第1、2、3备注行是获取上一步set的3个值:rate、interval、type,如果这3个值没有设置,直接返回rateLimiter没有被初始化。
第5备注行声明一个变量叫valueName 值为KEYS[2],KEYS[2]对应的值是getValueName()方法,getValueName()返回的就是上面第一步getRateLimiter我们设置的key;如果type=1,表示全局共享,那么valueName 的值改为取KEYS[3],KEYS[3]对应的值为getClientValueName(),查看getClientValueName()源码:
String getClientValueName() { return suffixName(getValueName(), commandExecutor.getConnectionManager().getId().toString()); }
ConnectionManager().getId()如下:
public interface ConnectionManager { UUID getId(); 省略... }
这个getId()是每个客户端初始化的时候生成的UUID,即每个客户端的getId是唯一的,这也就验证了trySetRate方法中RateType.ALL与RateType.PER_CLIENT的作用。
- 接着看第7标准行,获取valueName对应的值currentValue;首次获取肯定为空,那么看第10标准行else的逻辑
- set valueName 3 px 5,设置key=valueName value=3 过期时间为5秒
- decrby valueName 1,将上面valueName的值减1
- 那么如果第二次访问,第7标注行返回的值存在,将会走第8标注行,紧接着走如下判断
- 如果当前valueName的值也就是3,小于要获得的令牌数量(tryAcquire方法中的入参),那么说明当前时间内(key的有效期5秒内),令牌的数量已经被用完,返回pttl(key的剩余过期时间);反之说明桶中有足够的令牌,获取之后将会把桶中的令牌数量减1,至此结束。
总结:
redission分布式限流采用令牌桶思想和固定时间窗口,trySetRate方法设置桶的大小,利用redis key过期机制达到时间窗口目的,控制固定时间窗口内允许通过的请求量。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。