java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > SpringBoot QPS监控

SpringBoot实现QPS监控的原理与高性能实战

作者:(farerboy)

本文深入探讨了SpringBoot中基于滑动窗口算法的高效QPS监控实现,摒弃简单AtomicInteger简单计数器,采用LongAdderer和RingBuffer数据结构,提供低延迟、高并发安全的的方案

摘要:在微服务架构中,QPS(Queries Per Second)是衡量系统吞吐量和健康度的核心指标。本文将深入剖析 QPS 监控的核心算法,基于 Spring Boot 和 Micrometer 框架,设计并实现一套低开销、高并发安全、支持动态维度的 QPS 监控方案。我们将摒弃简单的计数器,采用 滑动窗口算法(Sliding Window) 结合 RingBuffer 数据结构,深入探讨 LongAdder 在高并发下的性能优势,并提供完整的源码实现。

一、 为什么我们需要 QPS 监控?

在日常开发中,我们通常使用 Prometheus、Grafana 等 APM 工具来获取流量数据。但在某些场景下,我们需要自研轻量级的 QPS 监控:

  1. 定制化指标:需要统计特定业务逻辑(如某个非 HTTP 接口、特定参数组合)的 QPS,通用探针无法覆盖。
  2. 本地快速诊断:在排查线上问题时,需要直接在应用日志或内存中查看瞬时流量,而不依赖外部监控系统。
  3. 限流前置判断:QPS 数据往往是限流(Rate Limiting)算法的基础。

QPS 监控的常见误区

二、 QPS 监控核心原理

QPS 是指系统每秒处理的请求数量。要实现精准的 QPS 监控,核心在于时间窗口的划分。

固定窗口 vs 滑动窗口

固定窗口 (Fixed Window)

将时间划分为固定的区间(如 1 秒),在区间内累加计数。

缺点:存在严重的临界点问题。假设 00:00:59 涌入 1000 个请求,00:01:01 又涌入 1000 个请求,虽然系统承受了 2000 QPS 的压力,但两个窗口的统计数据都显示只有 1000 QPS,容易掩盖瞬时峰值。

滑动窗口 (Sliding Window) - 我们的选择

将一个大窗口(如 10 秒)划分为多个小时间片(如 1 秒一个,共 10 个格子)。

优势

三、 SpringBoot 架构设计

1. 拦截点选择:Filter vs Interceptor vs AOP

为了获取最真实的 QPS,我们应该尽早捕获请求。OncePerRequestFilter 是最佳选择:

2. 指标框架集成:Micrometer

Spring Boot 2.x/3.x 默认集成了 Micrometer。它是一个“门面”库,类似于 SLF4J。

3. 高并发数据结构

四、 核心源码实现

1. 定义滑动窗口结构 (WindowCounter)

我们需要一个结构来维护时间片。

package com.example.qps.monitor;

import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * 基于 RingBuffer 和 LongAdder 实现的高性能滑动窗口计数器
 */
public class SlidingWindowCounter {

    // 窗口大小(秒)
    private final int windowSize;
    // 槽位数量,默认 1 秒一个槽位
    private final int slotCount;
    // 环形数组,存储每个时间片的计数
    private final LongAdder[] slots;
    // 读写锁,用于周期性清理过期数据时的并发控制
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    // 上次清理时间(纳秒)
    private volatile long lastClearTime;

    public SlidingWindowCounter(int windowSize) {
        this.windowSize = windowSize;
        this.slotCount = windowSize; // 假设粒度为 1s
        this.slots = new LongAdder[slotCount];
        for (int i = 0; i < slotCount; i++) {
            slots[i] = new LongAdder();
        }
        this.lastClearTime = System.currentTimeMillis();
    }

    /**
     * 记录一次请求
     */
    public void record() {
        // 1. 检查是否需要清理过期数据
        checkAndClearExpiredSlots();

        // 2. 获取当前槽位并累加
        int index = getCurrentSlotIndex();
        slots[index].increment();
    }

    /**
     * 获取当前窗口的总 QPS
     */
    public long getQps() {
        long total = 0;
        try {
            // 读锁:允许并发读取,但禁止在清理时读取
            lock.readLock().lock();
            checkAndClearExpiredSlots();
            for (LongAdder slot : slots) {
                total += slot.longValue();
            }
        } finally {
            lock.readLock().unlock();
        }
        // 注意:这里返回的是窗口内的总请求数。
        // 如果要算平均 QPS,应除以有效时间片数量。
        // 为了简化,此处通常暴露的是“最近 N 秒的总请求数”,由 Prometheus rate() 计算 QPS。
        // 或者我们可以直接算平均 QPS = total / validSlotCount。
        return total;
    }

    /**
     * 获取当前槽位索引
     */
    private int getCurrentSlotIndex() {
        long now = System.currentTimeMillis();
        long second = now / 1000;
        return (int) (second % slotCount);
    }

    /**
     * 清理过期数据(防止 RingBuffer 数据重叠)
     * 简单判断:如果当前时间与上次清理时间跨过了一个窗口周期,则重置数组
     */
    private void checkAndClearExpiredSlots() {
        long now = System.currentTimeMillis();
        // 如果已经过了一个完整的窗口周期
        if (now - lastClearTime >= windowSize * 1000L) {
            lock.writeLock().lock();
            try {
                // 双重检查
                if (now - lastClearTime >= windowSize * 1000L) {
                    // 重置所有槽位
                    // 注意:在高并发下直接 new LongAdder[] 或者遍历 reset()
                    // 这里为了极致性能,采用遍历 reset
                    for (LongAdder slot : slots) {
                        slot.reset();
                    }
                    lastClearTime = now;
                }
            } finally {
                lock.writeLock().unlock();
            }
        }
    }
}

2. 核心过滤器 (QpsMonitorFilter)

实现请求拦截,并根据 URI 路由到不同的计数器。

package com.example.qps.monitor;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.filter.OncePerRequestFilter;
import org.springframework.web.util.UrlPathHelper;

import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Component
public class QpsMonitorFilter extends OncePerRequestFilter {

    private static final Logger log = LoggerFactory.getLogger(QpsMonitorFilter.class);

    // 存储每个 URI 的计数器
    private final Map<String, SlidingWindowCounter> counterMap = new ConcurrentHashMap<>();
    
    @Autowired
    private MeterRegistry meterRegistry;

    // 窗口大小 60s
    private static final int WINDOW_SIZE = 60;

    @Override
    protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain)
            throws ServletException, IOException {
        
        String uri = request.getRequestURI();
        
        // 1. 获取或创建该 URI 的计数器
        // computeIfAbsent 保证线程安全
        SlidingWindowCounter counter = counterMap.computeIfAbsent(uri, key -> {
            SlidingWindowCounter newCounter = new SlidingWindowCounter(WINDOW_SIZE);
            // 2. 注册到 Micrometer
            Gauge.builder("app.request.qps.total", newCounter, SlidingWindowCounter::getQps)
                    .tags("uri", key) // 维度标签
                    .description("Total requests in sliding window")
                    .register(meterRegistry);
            return newCounter;
        });

        // 3. 记录请求
        counter.record();

        // 4. 放行
        filterChain.doFilter(request, response);
    }
}

五、 深度解析与性能优化

1. LongAdder vs AtomicLong

在上述实现中,我们使用了 LongAdder 而不是 AtomicLong

2. RingBuffer 的内存优化

为什么不使用 LinkedListArrayList 来存储时间片?

3. 内存泄漏防御:动态 URL 问题

如果我们的接口是 RESTful 风格的,例如 /api/users/1, /api/users/2,直接以 uri 作为 Key 会导致 ConcurrentHashMap 无限膨胀,最终 OOM。

解决方案

// 简单 LRU 改造示例
public class LruCounterMap<K, V> extends LinkedHashMap<K, V> {
    private static final int MAX_CAPACITY = 500;
    
    public LruCounterMap() {
        super(MAX_CAPACITY, 0.75f, true);
    }

    @Override
    protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
        return size() > MAX_CAPACITY;
    }
}

六、 分布式场景下的 QPS 监控

以上方案适用于单机监控。但在微服务集群中,我们往往关心的是 全局 QPS

方案对比

方案原理优缺点适用场景
Prometheus 聚合每个实例暴露本地 Gauge,Prometheus 拉取后使用 sum(rate(...)) 计算。优点:无侵入,零代码改动,实时性好。
缺点:依赖外部组件,瞬时值可能存在几秒延迟。
推荐:大多数微服务场景。
Redis + Lua请求到来时,通过 Lua 脚本在 Redis 中进行原子累加和窗口计算。优点:数据绝对精确,支持分布式限流。
缺点:增加网络 RTT,影响业务性能(QPS 监控不应拖慢业务)。
强一致性限流场景。

最佳实践:在 SpringBoot 内部使用本文的本地滑动窗口方案,保证监控逻辑不影响业务 RTT。然后通过 Micrometer 暴露数据,由 Prometheus 完成最终的分布式聚合计算。

七、 总结

本文实现了一套生产级的 SpringBoot QPS 监控方案。核心要点如下:

  1. 算法选择:滑动窗口算法解决了固定窗口的临界点问题。
  2. 性能设计:利用 RingBuffer 减少内存分配,利用 LongAdder 解决高并发 CAS 竞争。
  3. 工程实践:通过 OncePerRequestFilter 拦截全量流量,结合 Micrometer 无缝对接主流监控生态。

通过这套方案,我们可以在极低性能损耗(单次请求纳秒级开销)的前提下,精准掌握系统的流量脉搏,为后续的限流、熔断和容量规划提供坚实的数据支撑。

以上就是SpringBoot实现QPS监控的原理与高性能实战的详细内容,更多关于SpringBoot QPS监控的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文