java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > 令牌桶实现限流器

基于令牌桶的限流器注解的简单实现详解

作者:Source_J

令牌桶算法是一种常用的流量控制算法,用于限制请求或事件的发生速率,这篇文章主要介绍了如何基于令牌桶实现限流器注解,需要的可以参考一下

一、原理

令牌桶算法是一种常用的流量控制算法,用于限制请求或事件的发生速率,以防止系统过载。它的原理是基于令牌桶的数据结构和一定的令牌产生规则。

在令牌桶算法中,可以将令牌桶看作是一个具有固定容量的桶,以一定的速率产生令牌,并按照一定的规则进行存放。每当有请求或事件发生时,首先需要从桶中获取一个令牌,如果桶中没有可用的令牌,则需要等待或丢弃请求。当桶中的令牌数达到最大容量时,产生的令牌也不会继续增加。

具体工作原理如下:

令牌桶中有两个关键参数:令牌产生速率(token generation rate)和令牌容量(token bucket capacity)。

在每个固定时间间隔(例如,每秒),系统会向令牌桶中添加一定数量的令牌(即产生令牌),直到桶的容量达到最大值。

当有请求或事件发生时,需要先从令牌桶中获取一个令牌。

令牌桶算法的优点在于可以对请求的速率进行平滑的控制,且具备较好的适应性,可以应对突发流量。由于令牌的产生速率是固定的,因此可以精确控制系统的请求处理速率,防止系统的过载和资源耗尽。

在分布式系统中,令牌桶算法也常被用于实现分布式限流,保护后端服务免受过多请求的影响,确保系统的稳定性和可靠性。

二、基于redis实现令牌桶算法

这个算法,设计的时候主要是考虑到要支持分布式系统的令牌桶资源共享,因此这样设计,下面就是具体的实战代码

首先是配置:

application.yml

server:
  port: 8081
spring:
  #数据库连接配置
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://ip:3306/study?characterEncoding=utf-8&useSSL=false
    username:
    password:
​
  redis:
    # 地址
    host:
    # 端口,默认为6379
    port: 6379
    # 数据库索引
    database: 8
    # 密码
    password:
    # 连接超时时间
    timeout: 10s
​
#mybatis的相关配置
mybatis:
  #mapper配置文件
  mapper-locations: classpath:mapper/*.xml
  type-aliases-package: com.zhg.demo.mybatis.entity
  #开启驼峰命名
  configuration:
    map-underscore-to-camel-case: true

maven依赖

<!--        引入了AspectJ的运行时库-->
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjrt</artifactId>
            <version>1.9.7</version>
        </dependency>
​
        <!-- AspectJ编译器,用于编译切面 -->
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjtools</artifactId>
            <version>1.9.7</version>
            <scope>provided</scope>
        </dependency>
​
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
            <optional>true</optional>
        </dependency>
​
        <!-- redis 缓存操作 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
            <version>2.5.14</version>
        </dependency>

redis配置

package com.jlstest.springbootdemo.config;
​
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.serializer.StringRedisSerializer;
​
/**
 * redis配置
 *
 * @author admin
 */
@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport
{
    @Bean
    @SuppressWarnings(value = { "unchecked", "rawtypes" })
    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory connectionFactory)
    {
        RedisTemplate<Object, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(connectionFactory);
​
        FastJson2JsonRedisSerializer serializer = new FastJson2JsonRedisSerializer(Object.class);
​
        // 使用StringRedisSerializer来序列化和反序列化redis的key值
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(serializer);
​
        // Hash的key也采用StringRedisSerializer的序列化方式
        template.setHashKeySerializer(new StringRedisSerializer());
        template.setHashValueSerializer(serializer);
​
        template.afterPropertiesSet();
        return template;
    }
​
    @Bean
    public DefaultRedisScript<Long> limitScript()
    {
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
        redisScript.setScriptText(limitScriptText());
        redisScript.setResultType(Long.class);
        return redisScript;
    }
​
    /**
     * 限流脚本
     */
    private String limitScriptText()
    {
        return "local key = KEYS[1]\n" +
                "local count = tonumber(ARGV[1])\n" +
                "local time = tonumber(ARGV[2])\n" +
                "local current = redis.call('get', key);\n" +
                "if current and tonumber(current) > count then\n" +
                "    return tonumber(current);\n" +
                "end\n" +
                "current = redis.call('incr', key)\n" +
                "if tonumber(current) == 1 then\n" +
                "    redis.call('expire', key, time)\n" +
                "end\n" +
                "return tonumber(current);";
    }
}
package com.jlstest.springbootdemo.config;
​
import java.nio.charset.Charset;
​
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.SerializationException;
​
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONReader;
import com.alibaba.fastjson2.JSONWriter;
​
/**
 * Redis使用FastJson序列化
 * 
 * @author admin
 */
public class FastJson2JsonRedisSerializer<T> implements RedisSerializer<T>
{
    public static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
​
    private Class<T> clazz;
​
    public FastJson2JsonRedisSerializer(Class<T> clazz)
    {
        super();
        this.clazz = clazz;
    }
​
    @Override
    public byte[] serialize(T t) throws SerializationException
    {
        if (t == null)
        {
            return new byte[0];
        }
        return JSON.toJSONString(t, JSONWriter.Feature.WriteClassName).getBytes(DEFAULT_CHARSET);
    }
​
    @Override
    public T deserialize(byte[] bytes) throws SerializationException
    {
        if (bytes == null || bytes.length <= 0)
        {
            return null;
        }
        String str = new String(bytes, DEFAULT_CHARSET);
​
        return JSON.parseObject(str, clazz, JSONReader.Feature.SupportAutoType);
    }
}

aop编程实现

package com.jlstest.springbootdemo.aop;
​
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.concurrent.TimeUnit;
​
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RateLimit {
    // 资源名称
    String resourceName();
    // 令牌桶初始容量
    int initialCapacity();
    // 令牌桶单位时间填充速率
    int refillRate();
    // 令牌桶填充时间单位
    TimeUnit refillTimeUnit();
}
package com.jlstest.springbootdemo.aop;
​
import com.jlstest.springbootdemo.util.RedisCache;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;
​
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
​
/**
 * @author JLS
 * @description:
 * @since 2023-06-21 14:14
 */
@Aspect
@Component
public class RateLimitAspect {
​
    @Resource
    private RedisCache redisCache;
​
    @Around("@annotation(rateLimit)")
    public Object aroundRateLimit(ProceedingJoinPoint joinPoint, RateLimit rateLimit) throws Throwable {
​
        String resourceName = rateLimit.resourceName();
        int initialCapacity = rateLimit.initialCapacity();
        int refillRate = rateLimit.refillRate();
        TimeUnit refillTimeUnit = rateLimit.refillTimeUnit();
​
        DistributedRateLimiterNew distributedRateLimiterNew = new DistributedRateLimiterNew(redisCache, resourceName, initialCapacity, refillRate, 10000L);
​
​
        if (!distributedRateLimiterNew.allowRequest()) {
            throw new RuntimeException("限流了");
        }
​
        return joinPoint.proceed();
    }
}
package com.jlstest.springbootdemo.aop;
​
import com.jlstest.springbootdemo.util.RedisCache;
import lombok.extern.slf4j.Slf4j;
​
import java.util.Objects;
import java.util.concurrent.TimeUnit;
​
/**
 * @author JLS
 * @description:
 * @since 2023-06-25 15:01
 */
@Slf4j
public class DistributedRateLimiterNew {
​
    private final RedisCache redisCache;
    private final String resourceName;
    private final int maxTokens;
    private final int refillRate;
    private final long refillInterval;
​
    /**
     * 资源key值前缀
     */
    public static final String RATE_LIMIT_KEY = "rate_limit:";
​
    /**
     * 最近一次更新token的时间
     */
    public static final String LAST_REFILL_TIME_KEY = "last_refill_time:";
​
    /**
     * @param redisCache
     *            redis工具类
     * @param resourceName
     *            资源名称
     * @param maxTokens
     *            令牌桶容量
     * @param refillRate
     *            令牌桶单位时间填充速率
     * @param refillInterval
     *            令牌桶填充时间间隔,单位ms
     */
    public DistributedRateLimiterNew(RedisCache redisCache, String resourceName, int maxTokens, int refillRate, long refillInterval) {
        this.redisCache = redisCache;
        this.resourceName = resourceName;
        this.maxTokens = maxTokens;
        this.refillRate = refillRate;
        this.refillInterval = refillInterval;
​
        // 初始化令牌桶
        initializeTokenBucket();
    }
​
    /**
     * 是否允许请求
     */
    public boolean allowRequest() {
        String key = RATE_LIMIT_KEY + resourceName;
        long currentTime = System.currentTimeMillis();
​
        // 获取当前令牌数量
        Integer tokenCountCache = redisCache.getCacheObject(key);
        int tokenCount = Objects.isNull(tokenCountCache) ? 0 : tokenCountCache;
​
        // 补充令牌
        long lastRefillTime = redisCache.getCacheObject(LAST_REFILL_TIME_KEY + resourceName);
        long timePassed = currentTime - lastRefillTime;
        int newTokens = (int) (timePassed * refillRate / refillInterval);
        tokenCount = Math.min(tokenCount + newTokens, maxTokens);
​
        log.info("扣除之前tokenCount:{}", tokenCount);
​
        // 判断是否允许请求
        if (tokenCount > 0) {
            tokenCount--;
            log.info("扣除之后tokenCount:{}", tokenCount);
            // 保存令牌桶数量
            redisCache.setCacheObject(key, tokenCount, 60, TimeUnit.MINUTES);
            // 保存最近一次更新token的时间
            redisCache.setCacheObject(LAST_REFILL_TIME_KEY + resourceName, System.currentTimeMillis(), 60, TimeUnit.MINUTES);
            return true;
        } else {
            return false;
        }
    }
​
    /**
     * 初始化令牌桶
     */
    private void initializeTokenBucket() {
        // 当资源为空时,则进行新建
        if (redisCache.getCacheObject(RATE_LIMIT_KEY + resourceName) == null) {
            // 保存最近一次更新token的时间
            redisCache.setCacheObject(LAST_REFILL_TIME_KEY + resourceName, System.currentTimeMillis(), 60, TimeUnit.MINUTES);
            // 保存令牌桶数量,设置默认值,设置默认值
            redisCache.setCacheObject(RATE_LIMIT_KEY + resourceName, maxTokens, 60, TimeUnit.MINUTES);
            // 设置过期时间,当长期不用则进行释放
            redisCache.expire(resourceName, 3600L);
        }
    }
}

对应讲解

核心主要是通过redis来保存对应的令牌桶实例名以及对应的上次的更新token的时间,每次调用到令牌桶则重新计算令牌数量。当然这个设计比较毛糙,比如在规定时间中未必会有对应数量的令牌数量,主要是由于每次计算令牌数量,当计算成功时是不管是否整除都默认是整除来保存时间,所以会有数量偏少的情况

接口上的放置

package com.jlstest.springbootdemo.controller;
​
import java.util.concurrent.TimeUnit;
​
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
​
import com.jlstest.springbootdemo.aop.RateLimit;
import com.jlstest.springbootdemo.common.response.BaseController;
import com.jlstest.springbootdemo.common.response.JlsTestResponse;
​
/**
 * @author JLS
 * @description:
 * @since 2023-03-22 19:09
 */
@RestController
@RequestMapping("/test")
public class TestController extends BaseController {
​
    @GetMapping("/test")
    @ResponseBody
    @RateLimit(resourceName = "test", initialCapacity = 10, refillRate = 2, refillTimeUnit = TimeUnit.SECONDS)
    public JlsTestResponse<String> test() {
        return sendSuccessData("success");
    }
​
}

如图所示,放在接口上就行。

三、基于redisson实现

redisson本身就已经封装了限流器RRateLimiter,只要稍加封装即可使用,

对应的代码:

package com.jlstest.springbootdemo.aop.newLimit;
​
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
​
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RateLimitNew {
    // 资源名
    String resourceName();
    // 令牌总数设置
    int permits();
    // 恢复速率,一边填写个数单位默认秒。
    int restoreRate();
}
package com.jlstest.springbootdemo.aop.newLimit;
​
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
​
import javax.annotation.Resource;
​
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.redisson.api.RRateLimiter;
import org.redisson.api.RateIntervalUnit;
import org.redisson.api.RateType;
import org.redisson.api.RedissonClient;
import org.springframework.context.annotation.Scope;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
​
import com.jlstest.springbootdemo.common.exception.ServiceException;
​
/**
 * @author JLS
 * @description:
 * @since 2023-08-01 14:56
 */
@Aspect
@Component
@Scope
public class RateLimitAspectNew {
​
    private final Map<String, RRateLimiter> rateLimiterMap = new ConcurrentHashMap<>();
​
    private final Map<String, Long> lastAccessTimeMap = new ConcurrentHashMap<>();
​
    // @Value("${redis.address}")
    // private String redisAddress; // Redis连接地址,可以从配置文件中读取
​
    @Resource
    private RedissonClient redissonClient;
​
    @Before("@annotation(rateLimitNew)")
    public void before(JoinPoint joinPoint, RateLimitNew rateLimitNew) {
        String resourceName = rateLimitNew.resourceName();
        int permits = rateLimitNew.permits();
        int restoreRate = rateLimitNew.restoreRate();
​
        // 创建或获取令牌桶
        RRateLimiter rateLimiter = rateLimiterMap.computeIfAbsent(resourceName, key -> {
            // 获取对应资源名的实例,当资源不存在时会新建一个
            RRateLimiter limiter = redissonClient.getRateLimiter(resourceName);
            // 使用 trySetRate 方法设置令牌桶的速率。,只有新建限流器的时候才会设置属性
            limiter.trySetRate(RateType.OVERALL, permits, restoreRate, RateIntervalUnit.SECONDS);
            // 返回对应实例
            return limiter;
        });
​
        // 当时消费令牌
        if (!rateLimiter.tryAcquire()) {
            throw new ServiceException("Rate limit exceeded for resource: " + resourceName);
        }
​
        lastAccessTimeMap.put(resourceName, System.currentTimeMillis());
    }
​
    // 定期清除不活跃的令牌桶
    @Scheduled(fixedDelay = 60000) // 每分钟执行一次清理任务
    public void cleanUpRateLimiters() {
        long inactiveDuration = 5 * 60 * 1000; // 5分钟不活跃则清除
        long currentTime = System.currentTimeMillis();
        rateLimiterMap.entrySet().removeIf(entry -> {
            String resourceName = entry.getKey();
            Long lastAccessTime = lastAccessTimeMap.get(resourceName);
            // 判断是否超过不活跃时间
            if (lastAccessTime != null && currentTime - lastAccessTime > inactiveDuration) {
                // 移除令牌桶实例
                RRateLimiter rateLimiter = entry.getValue();
                rateLimiter.delete();
                // 移除资源名的记录
                lastAccessTimeMap.remove(resourceName);
                return true; // 移除该令牌桶实例
            }
            return false; // 不需要移除该令牌桶实例
        });
    }
}

测试接口

package com.jlstest.springbootdemo.controller;
​
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
​
import com.jlstest.springbootdemo.aop.newLimit.RateLimitNew;
import com.jlstest.springbootdemo.common.response.BaseController;
import com.jlstest.springbootdemo.common.response.JlsTestResponse;
​
/**
 * @author JLS
 * @description:
 * @since 2023-03-22 19:09
 */
@RestController
@RequestMapping("/test")
public class TestController extends BaseController {
​
    @GetMapping("/test")
    @ResponseBody
    // @RateLimit(resourceName = "test", initialCapacity = 10, refillRate = 2, refillTimeUnit =
    // TimeUnit.SECONDS)
    @RateLimitNew(resourceName = "test1", permits = 1, restoreRate = 10)
    public JlsTestResponse<String> test() {
        return sendSuccessData("success");
    }
​
}

其他的一些配置代码,同redis的实现,就不再重复写出,

每个服务实例可以独立管理自己的限流器,但令牌桶的状态和数据是存储在 Redis 中的,这意味着所有实例共享相同的令牌桶信息。当一个实例获取或更新令牌桶的状态时,其他实例也可以立即感知到这些变化,从而实现在分布式系统中的限流效果。

结合 Redisson 的 RRateLimiter 和 Redis 缓存,可以实现分布式系统的限流,确保系统稳定性和资源的合理利用。

四、现成的工具-sentinel实现

Sentinel是阿里巴巴开源的一款分布式系统的流量控制组件,用于保护后端服务免受过多请求的影响,确保系统的稳定性和可靠性。Sentinel提供了多种限流策略和流量控制规则,能够灵活地适应不同场景的需求。

以下是Sentinel组件限流的基本使用步骤:

使用Sentinel进行限流的示例代码(基于Spring Boot):

引入Sentinel依赖:

Maven:

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-core</artifactId>
    <version>1.8.1</version>
</dependency>

在配置文件中配置限流规则(可以根据实际需求配置):

yamlCopy codespring:
  cloud:
    sentinel:
      transport:
        dashboard: localhost:8080 # Sentinel Dashboard 地址

在启动类中初始化Sentinel组件:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
​
@SpringBootApplication
public class Application {
​
    public static void main(String[] args) {
        // 初始化 Sentinel
        initSentinel();
        SpringApplication.run(Application.class, args);
    }
​
    private static void initSentinel() {
        // 这里的参数可以根据实际情况进行调整
        System.setProperty("csp.sentinel.api.port", "8720");
        System.setProperty("csp.sentinel.dashboard.server", "localhost:8080");
        System.setProperty("project.name", "your-project-name");
    }
}

在业务代码中添加限流注解:

import com.alibaba.csp.sentinel.annotation.SentinelResource;
​
@Service
public class YourService {
​
    @SentinelResource(value = "yourResourceName", blockHandler = "blockHandlerMethod")
    public void yourMethod() {
        // 业务逻辑
    }
​
    // 定义限流策略的处理方法
    public void blockHandlerMethod(BlockException ex) {
        // 限流处理逻辑
    }
}

上述示例代码中,我们使用了Sentinel的注解@SentinelResource来标注需要进行限流保护的方法。当达到限流阈值时,会调用blockHandler指定的方法进行限流处理。在blockHandlerMethod中,可以自定义限流策略的处理逻辑。

需要注意的是,Sentinel的流控规则可以在Dashboard中进行配置和管理,也可以通过代码进行动态配置,使得限流策略可以根据实际情况进行灵活调整。同时,Dashboard提供了实时的监控和统计功能,方便查看应用程序的流量控制情况和系统状态。

sentinel虽然好 ,但是这个组件所包含的东西过大,有些时候只需要用到限流功能,则会显得有点大材小用,没有必要。

到此这篇关于基于令牌桶的Java限流器注解的简单实现详解的文章就介绍到这了,更多相关Java限流器内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文