Redis

关注公众号 jb51net

关闭
首页 > 数据库 > Redis > Redis API网关或服务请求限流

使用Redis实现API网关或单个服务的请求限流的具体代码

作者:冰糖心书房

在微服务架构中,对 API 网关或单个服务的请求进行速率限制至关重要,以防止恶意攻击、资源滥用并确保系统的稳定性和可用性,本文将详细探讨如何利用 Redis 实现 API 网关或单个服务的请求限流,深入分析各种主流算法,需要的朋友可以参考下

引言

在微服务架构中,对 API 网关或单个服务的请求进行速率限制至关重要,以防止恶意攻击、资源滥用并确保系统的稳定性和可用性。 Redis 凭借其高性能、原子操作和丰富的数据结构,成为实现请求限流的理想选择。

本文将详细探讨如何利用 Redis 实现 API 网关或单个服务的请求限流,深入分析各种主流算法,并提供在 Python 和 Java 环境下的具体代码示例,包括使用 Lua 脚本确保操作的原子性。

为什么选择 Redis 进行限流?

Redis 之所以成为实现限流的热门选择,主要得益于其以下几个关键特性:

核心概念:识别请求来源

在实施限流之前,首先需要确定如何识别和区分不同的请求来源。常见的识别方式包括:

主流限流算法及其 Redis 实现

以下是几种主流的限流算法及其使用 Redis 的实现方式:

1. 固定窗口计数器 (Fixed Window Counter)

这是最简单的限流算法。它在固定的时间窗口内(例如,每分钟)统计请求次数,如果超过预设的阈值,则拒绝后续的请求,直到下一个时间窗口开始。

优点: 实现简单,容易理解。

缺点: 在时间窗口的边界处可能会出现“突刺”流量问题。例如,在窗口结束前的瞬间和新窗口开始的瞬间,可能会有两倍于限制的请求通过。

Redis 实现:

主要利用 INCREXPIRE 命令。

Python 示例:

import redis
import time

r = redis.Redis()

def is_rate_limited_fixed_window(user_id: str, limit: int, window: int) -> bool:
    key = f"rate_limit:{user_id}"
    current_requests = r.get(key)

    if current_requests is None:
        # 使用 pipeline 保证原子性
        pipe = r.pipeline()
        pipe.incr(key)
        pipe.expire(key, window)
        pipe.execute()
        return False

    if int(current_requests) >= limit:
        return True

    r.incr(key)
    return False

# 示例: 每位用户每60秒最多10个请求
for i in range(15):
    if is_rate_limited_fixed_window("user123", 10, 60):
        print("请求被限制")
    else:
        print("请求成功")
    time.sleep(1)

使用 Lua 脚本保证原子性:

为了避免 GETINCR 之间的竞态条件,强烈建议使用 Lua 脚本将多个命令作为一个原子操作执行。

-- rate_limiter.lua
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])

local current = tonumber(redis.call("GET", key) or "0")

if current >= limit then
    return 1 -- 1 表示被限制
end

if current == 0 then
    redis.call("INCR", key)
    redis.call("EXPIRE", key, window)
else
    redis.call("INCR", key)
end

return 0 -- 0 表示未被限制

Java (Jedis) 调用 Lua 脚本示例:

import redis.clients.jedis.Jedis;

public class RateLimiter {
    private final Jedis jedis;
    private final String script;

    public RateLimiter(Jedis jedis) {
        this.jedis = jedis;
        // 实际项目中应从文件加载
        this.script = "local key = KEYS[1]..."
    }

    public boolean isRateLimited(String key, int limit, int window) {
        Object result = jedis.eval(this.script, 1, key, String.valueOf(limit), String.valueOf(window));
        return "1".equals(result.toString());
    }
}

2. 滑动窗口日志 (Sliding Window Log)

该算法记录每个请求的时间戳。当一个新请求到达时,会移除时间窗口之外的旧时间戳,然后统计窗口内的时间戳数量。如果数量超过限制,则拒绝请求。

优点: 限流精度高,有效解决了固定窗口的边界问题。

缺点: 需要存储所有请求的时间戳,当请求量很大时会占用较多内存。

Redis 实现:

使用有序集合 (Sorted Set),将成员 (member) 设置为唯一值(如请求 ID 或时间戳),将分数 (score) 设置为请求的时间戳。

Python 示例:

import redis
import time

r = redis.Redis()

def is_rate_limited_sliding_log(user_id: str, limit: int, window: int) -> bool:
    key = f"rate_limit_log:{user_id}"
    now = int(time.time() * 1000)
    window_start = now - window * 1000

    # 使用 pipeline 保证原子性
    pipe = r.pipeline()
    # 移除窗口外的数据
    pipe.zremrangebyscore(key, 0, window_start)
    # 添加当前请求
    pipe.zadd(key, {f"{now}:{int(time.time()*1000000)}": now}) # 保证 member 唯一
    # 获取窗口内的请求数
    pipe.zcard(key)
    # 设置过期时间,防止冷数据占用内存
    pipe.expire(key, window)
    results = pipe.execute()

    current_requests = results[2]
    return current_requests > limit

使用 Lua 脚本实现滑动窗口:

-- sliding_window.lua
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local now = redis.call("TIME")
local now_ms = (now[1] * 1000) + math.floor(now[2] / 1000)
local window_start = now_ms - window * 1000

-- 移除过期的请求记录
redis.call("ZREMRANGEBYSCORE", key, 0, window_start)

-- 获取当前窗口内的请求数
local count = redis.call("ZCARD", key)

if count >= limit then
    return 1 -- 被限制
end

-- 添加当前请求
redis.call("ZADD", key, now_ms, now_ms .. "-" .. count) -- member 保证唯一性
redis.call("EXPIRE", key, window)

return 0 -- 未被限制

3. 令牌桶算法 (Token Bucket)

该算法的核心是一个固定容量的“令牌桶”,系统会以恒定的速率向桶里放入令牌。每个请求需要从桶里获取一个令牌才能被处理。如果桶里没有令牌,请求将被拒绝或排队等待。

优点: 能够应对突发流量,只要桶内有足够的令牌,就可以一次性处理多个请求。

缺点: 实现相对复杂,需要一个独立的进程或定时任务来持续生成令牌。

Redis 实现:

可以使用 Redis 的列表 (List) 作为令牌桶。一个后台任务(或在每次请求时计算)负责向列表中添加令牌。

Java (Spring Boot with Bucket4j) 示例:

Bucket4j 是一个流行的 Java 限流库,可以与 Redis 结合使用实现分布式令牌桶。

// 依赖: bucket4j-core, jcache-api, redisson
// 配置 RedissonClient...

// 创建一个基于 Redis 的代理管理器
ProxyManager<String> proxyManager = new JCacheProxyManager<>(jcache);

// 定义令牌桶配置:每分钟补充10个令牌,桶容量为10
BucketConfiguration configuration = BucketConfiguration.builder()
        .addLimit(Bandwidth.simple(10, Duration.ofMinutes(1)))
        .build();

// 获取或创建令牌桶
Bucket bucket = proxyManager.getProxy("rate-limit-bucket:user123", configuration);

// 尝试消费一个令牌
if (bucket.tryConsume(1)) {
    // 请求成功
} else {
    // 请求被限制
}

Spring Cloud Gateway 也内置了基于 Redis 的令牌桶算法限流器。

4. 漏桶算法 (Leaky Bucket)

漏桶算法将请求看作是流入“漏桶”的水,而漏桶以固定的速率漏出水(处理请求)。如果流入的速率过快,导致桶溢出,则多余的请求将被丢弃。

优点: 能够平滑请求流量,保证服务以恒定的速率处理请求。

缺点: 无法有效利用系统空闲资源来处理突发流量。

Redis 实现:

Redis-Cell 模块提供了基于 GCRA (Generic Cell Rate Algorithm) 的漏桶算法实现。

API 网关限流 vs. 单个服务限流

限流逻辑可以部署在不同的层面,主要分为 API 网关层和单个微服务层。

API 网关限流

在 API 网关层面进行统一限流是一种常见的做法。

优点:

实现方式:

Spring Cloud Gateway 示例 (application.yml):

spring:
  cloud:
    gateway:
      routes:
      - id: my_route
        uri: lb://my-service
        predicates:
        - Path=/my-api/**
        filters:
        - name: RequestRateLimiter
          args:
            redis-rate-limiter.replenishRate: 10      # 令牌桶每秒填充速率
            redis-rate-limiter.burstCapacity: 20     # 令牌桶容量
            redis-rate-limiter.requestedTokens: 1  # 每次请求消耗的令牌数
            key-resolver: "#{@ipKeyResolver}"     # 使用 IP 地址作为限流的 key

单个服务限流

将限流逻辑直接实现在单个微服务内部。

优点:

实现方式:

Python (FastAPI 中间件) 示例:

from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
import redis

app = FastAPI()
r = redis.Redis()

@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
    # 简单示例:使用固定窗口计数器
    ip = request.client.host
    key = f"rate_limit:{ip}"
    limit = 5
    window = 60

    # 此处应使用 Lua 脚本保证原子性
    current = r.incr(key)
    if current == 1:
        r.expire(key, window)

    if current > limit:
        return JSONResponse(status_code=429, content={"message": "Too Many Requests"})

    response = await call_next(request)
    return response

总结

使用 Redis 实现请求限流是一种高效且可扩展的方案。开发者应根据具体的业务场景和需求选择合适的限流算法。

在实施时,务必使用 Lua 脚本来保证操作的原子性,避免在并发环境下出现数据不一致的问题。将限流逻辑部署在 API 网关层可以对整个系统起到保护作用,而部署在单个服务层则提供了更高的灵活性。在实际应用中,两者也常常结合使用,构建多层级的防护体系。

以上就是使用Redis实现API网关或单个服务的请求限流的具体代码的详细内容,更多关于Redis API网关或服务请求限流的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文