Java高并发下请求合并处理方式
作者:Soda_lw
Java高并发下请求合并处理
场景描述
在大并发量下每秒有一万个请求向后端查询数据,这样我们就需要向后端请求一万次,甚至查询一万次数据库。
我们要做的请求合并就是每隔一段时间(10ms)将这段时间内的请求合并到一起进行批量查询,减少查询数据库的次数。
思考
1、如何存放一段时间内的请求?这里我们可以用队列。
2、如何每隔一段时间执行任务?用定时任务线程池。
3、每个请求都是单独的线程,如何保证各个请求能得到自己的查询结果?这里我们使用callable返回查询结果,在没有查到结果前阻塞线程。
下面来看看具体实现的demo
package cn.codingxiaxw.combine; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.*; import java.util.stream.Collectors; @Service public class QueryService { //用来存放请求的队列,我们将请求封装成了一个Request对象 private LinkedBlockingQueue<Request> queue = new LinkedBlockingQueue<>() ; //这个是我们的单个的查询方法,假设每隔请求都根据唯一的code进行查询 public Map<String,Object> query(String code){ //这个request是我们自定义的内部类 Request request = new Request(); request.code = code; CompletableFuture<Map<String,Object>> future = new CompletableFuture<>(); request.future = future; queue.add(request); //阻塞 直到返回结果 try { return future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return null; } //这个是个模拟批量查询的方法 public List<Map<String,Object>> batchQuery(List<String> codes){ return null; } //封装的请求 class Request { String code; CompletableFuture<Map<String,Object>> future; } @PostConstruct public void init(){ //在init方法中初始化一个定时任务线程,去定时执行我们的查询任务.具体的任务实现是我们根据唯一code查询出来的结果集,以code为key转成map,然后我们队列中的每个Request对象都有自己的唯一code,我们根据code一一对应,给相应的future返回对应的查询结果。 ScheduledExecutorService poolExecutor = new ScheduledThreadPoolExecutor(1); poolExecutor.scheduleAtFixedRate(()->{ int size = queue.size(); //如果没有请求直接返回 if(size==0) return ; List<Request> list = new ArrayList<>(); for (int i = 0; i < size;i++){ Request request = queue.poll(); list.add(request); } System.out.println("批量处理:"+size); List<String> codes = list.stream().map(s->s.code).collect(Collectors.toList()); //合并之后的结果集 List<Map<String, Object>> batchResult = batchQuery(codes); Map<String,Map<String,Object>> responseMap = new HashMap<>(); for (Map<String,Object> result : batchResult) { String code = result.get("code").toString(); responseMap.put(code,result); } //返回对应的请求结果 for (Request request : list) { Map<String, Object> response = responseMap.get(request.code); request.future.complete(response); } },0,10,TimeUnit.MILLISECONDS); } }
利用请求合并发挥高并发下批量处理的优势
需求分析
我们经常会写一些查询接口,假设现在我们需要写一个查询用户信息的接口,要求传入用户Id,返回用户名称。
那么最简化的流程就是:用户发送请求->controller层->service层->dao层->数据库。
每次请求就相当于请求一条用户信息。
当这个接口被用户频繁请求时,此接口就不断的在做“请求”到“返回”的操作,服务端同时会开辟许多线程帮我们执行这些操作,这么多的线程会消耗许多系统资源,服务端承受了巨大压力。
//单查询接口 @GetMapping("/getUser") public String getUser(Long key){ long currentMillis = System.currentTimeMillis(); //单查询service,大量线程怼到这个service上去 String userName = userService.getUser(key); System.out.printf("##############################################\n"); System.out.printf("用户名为:" + userName + "---线程名为:" + Thread.currentThread().getName() + "---执行时间为:" + (System.currentTimeMillis() - currentMillis) + "\n"); return userName; }
那么我们有什么方式可以优化这种操作呢?
我目前能想到的就是利用缓存(缓存热点数据)、消息队列(接收请求慢慢消费达到流量削峰)、多个服务实例(分散请求压力提高计算能力)等方式应对高并发场景。
在本文中,我利用另一种思路:把多个请求合并为一个请求,把单查询变为批量查询,这样就能有效减少开辟线程的数量。
具体实现
首先我们定义一个用户请求类Request:
//用户请求类 public class RequestTest { //请求条件 private Long key; //传话人 private CompletableFuture<String> future; public CompletableFuture<String> getFuture() { return future; } public void setFuture(CompletableFuture<String> future) { this.future = future; } public Long getKey() { return key; } public void setKey(Long key) { this.key = key; } }
接着是请求合并的主要代码:
//存放请求的队列 LinkedBlockingDeque<RequestTest> queue = new LinkedBlockingDeque<>(); //初始化方法 @PostConstruct public void init() { //定时执行的线程池,每隔5毫秒执行一次(间隔时间可以由业务决定),把所有堆积的请求 ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); executorService.scheduleAtFixedRate(() -> { //在这里具体执行批量查询逻辑 int size = queue.size(); if (size == 0) { //若没有请求堆积,直接返回,等10毫秒再执行一次 return; } //若有请求堆积把所有请求都拿出来 List<RequestTest> requestTests = new ArrayList<>(); for (int i = 0; i < size; i++) { //把请求拿出来 RequestTest poll = queue.poll(); requestTests.add(poll); } //至此请求已经被合并了 System.out.printf("##############################################\n"); System.out.printf("请求合并了" + requestTests.size() + "条!\n"); //组装批量查询条件 List<Long> keyList = new ArrayList<>(); for (RequestTest requestTest : requestTests) { keyList.add(requestTest.getKey()); } //进行批量查询 List<User> nameList = userService.getUserList(keyList); //把批查结果放入一个map Map<Long,String> map = new HashMap<>(); for(User user:nameList){ map.put(user.getId(),user.getName()); } for (RequestTest requestTest : requestTests){ //把放在map中的结果集放回给对应的线程 //future是对应每个请求的,因为是每个请求线程都传了自己的future是对应的过来 requestTest.getFuture().complete(map.get(requestTest.getKey())); } }, 0, 5, TimeUnit.MILLISECONDS); } //请求合并 @GetMapping("/requestMerge/getUser") public String getUserRequestMerge(Long key) throws InterruptedException, ExecutionException { long currentMillis = System.currentTimeMillis(); //CompletableFuture可以使一个线程执行操作后,主动返回值给另一个线程 CompletableFuture<String> future = new CompletableFuture<>(); RequestTest requestTest = new RequestTest(); //把future(把future可以认为是线程间的"传话人")放到等待队列中去,让定时调度的线程池执行并返回值 requestTest.setFuture(future); requestTest.setKey(key); //把requestTest加入等待队列(LinkedBlockingDeque) queue.add(requestTest); //future(传话人)阻塞直到有值返回 String userName = future.get(); System.out.printf("用户名为:" + userName + "---线程名为:"+Thread.currentThread().getName()+ "---执行时间为:"+(System.currentTimeMillis() - currentMillis)+"\n"); return userName; }
到这里我们就完成了一个请求合并的demo,接着我们测试运行结果。
这里我用了jemeter(jemeter的用法可以网上找一找)进行测试,对单查询接口和请求合并接口分别进行了20000次的请求,以下是结果对比:
(1)单查询运行结果:
可以看到系统开辟了许多线程来处理请求,jemeter的聚合报告如下:
(2)请求合并运行结果:
可以看到多条线程被合并成了一条线程来处理,jemeter的聚合报告如下:
通过以上数据,我们可以看到,请求合并比单查询的吞吐量要大,并且在运行过程中,流量浮动的范围比较小。
至此,我们就完成了利用请求合并发挥高并发下批量处理的优势。
心得
本文我利用了LinkedBlockingDeque阻塞队列、ScheduledExecutorService定时执行线程池和CompletableFuture线程通信来完成了请求合并的demo。
并且通过实验证明高并发下批量处理比单个处理更有优势。
当然,我的demo并不完善。而且请求合并也有一些弊端,比如如果定时线程池的间隔时间比较长,反而会造成请求堆积时间太长,用户不能快速得到响应。
同时在请求数量比较小时,请求合并的场景也是没有必要的。
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。