Java漏桶算法的简单代码实例
作者:岸河
这篇文章主要介绍了Java漏桶算法的简单代码实例,漏桶算法的意义在于能够平滑请求,不给下游服务造成过大压力,特别适用于突发流量或者定时任务拉取大量数据时,需要处理大量数据或者请求的场景,需要的朋友可以参考下
Java漏桶算法
漏桶算法的意义在于能够平滑请求,不给下游服务造成过大压力,特别适用于突发流量或者定时任务拉取大量数据时,需要处理大量数据或者请求的场景。
使用单线程的for循环太慢,使用线程池仍无法避免一瞬间会发起很多请求,我们需要的是匀速的请求第三方。
拿定时任务补偿数据来说,每隔一分钟拉取100条数据,希望下游服务能在1分钟之内将这些数据处理掉就行,如果使用线程池,可能1秒钟之内就将20条数据发出去了,即使使用的线程数比较少,在一瞬间也会有多个请求发出,我们希望每间隔一定时间,发出一个请求,让下游服务匀速消化,即希望控制匀速的QPS。
@FunctionalInterface public interface Callback<Task> { void process(Task task) throws InterruptedException; }
/** * <p>简单漏桶算法实现</p> */ public class LeakyBucketHandler<T> { // 漏水速率 /s(TPS/QPS) private Integer rate; private long lastTime = System.currentTimeMillis(); private final int capacity; // 最大并发量(桶最大容量) private final ArrayBlockingQueue<T> queue; // init public LeakyBucketHandler(Integer rate, int capacity) { this.rate = rate; this.capacity = capacity; this.queue = new ArrayBlockingQueue<T>(capacity); } public LeakyBucketHandler(int capacity) { this.capacity = capacity; this.queue = new ArrayBlockingQueue<T>(capacity); } public boolean acquire(T b) { if (queue.size() > capacity) { return false; } else { queue.offer(b); return true; } } public synchronized void consume(Callback<T> callBack) throws InterruptedException { if (rate == null || rate < 1) { throw new IllegalArgumentException("rate value is" + rate); } while (queue.size() > 0) { long now = System.currentTimeMillis(); if ((now-lastTime) > 1000/rate) { T t = queue.poll(); System.out.println("interval-" + (now - lastTime + "-ms"));; lastTime = now; callBack.process(t); System.out.println("bucket size is " + queue.size()); } else { Thread.sleep(1); } } } public Integer getQueueSize(){ return queue.size(); } }
public class Demo { public static void main(String[] args) throws InterruptedException { // 获取任务队列 List<String> taskList = getTaskList(); LeakyBucketHandler<String> leakyBucket = new LeakyBucketHandler<>(5, taskList.size()); for (String s : taskList) { leakyBucket.acquire(s); } System.out.println("leakyBucket.getQueueSize()="+leakyBucket.getQueueSize()); leakyBucket.consume(task -> { CompletableFuture.runAsync(()->{ System.out.println("消费桶中对象---"+task+"开始"); try { // 模拟业务耗时 Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } }); }); } // 一般从数据库或其他数据源拉取数据,这里mock一下 private static List<String> getTaskList() { List<String> list = new ArrayList<>(); for (int i = 0; i < 100; i++) { list.add(i + ""); } return list; } }
到此这篇关于Java漏桶算法的简单代码实例的文章就介绍到这了,更多相关Java漏桶算法内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!