java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java高并发下请求合并处理

Java高并发下请求合并处理方式

作者:Soda_lw

这篇文章主要介绍了Java高并发下请求合并处理方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

Java高并发下请求合并处理

场景描述

在大并发量下每秒有一万个请求向后端查询数据,这样我们就需要向后端请求一万次,甚至查询一万次数据库。

我们要做的请求合并就是每隔一段时间(10ms)将这段时间内的请求合并到一起进行批量查询,减少查询数据库的次数。

思考

1、如何存放一段时间内的请求?这里我们可以用队列。

2、如何每隔一段时间执行任务?用定时任务线程池。

3、每个请求都是单独的线程,如何保证各个请求能得到自己的查询结果?这里我们使用callable返回查询结果,在没有查到结果前阻塞线程。

下面来看看具体实现的demo

package cn.codingxiaxw.combine;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.stream.Collectors;
@Service
public class QueryService {
//用来存放请求的队列,我们将请求封装成了一个Request对象
 private LinkedBlockingQueue<Request> queue = new LinkedBlockingQueue<>() ;
       //这个是我们的单个的查询方法,假设每隔请求都根据唯一的code进行查询
  public Map<String,Object> query(String code){
        //这个request是我们自定义的内部类
        Request request = new Request();
        request.code = code;
        CompletableFuture<Map<String,Object>> future = new CompletableFuture<>();
        request.future = future;
        queue.add(request);
        //阻塞 直到返回结果
        try {
            return future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return null;
    }
 //这个是个模拟批量查询的方法
    public List<Map<String,Object>> batchQuery(List<String> codes){
        return null;
    }
    //封装的请求
    class Request {
        String code;
        CompletableFuture<Map<String,Object>> future;
    }
    @PostConstruct
    public void init(){
    //在init方法中初始化一个定时任务线程,去定时执行我们的查询任务.具体的任务实现是我们根据唯一code查询出来的结果集,以code为key转成map,然后我们队列中的每个Request对象都有自己的唯一code,我们根据code一一对应,给相应的future返回对应的查询结果。
        ScheduledExecutorService poolExecutor = new ScheduledThreadPoolExecutor(1);
        poolExecutor.scheduleAtFixedRate(()->{
            int size = queue.size();
            //如果没有请求直接返回
            if(size==0)
                return ;
            List<Request> list = new ArrayList<>();
            for (int i = 0; i < size;i++){
                Request request = queue.poll();
                list.add(request);
            }
            System.out.println("批量处理:"+size);
            List<String> codes = list.stream().map(s->s.code).collect(Collectors.toList());
            //合并之后的结果集
            List<Map<String, Object>> batchResult = batchQuery(codes);
            Map<String,Map<String,Object>> responseMap = new HashMap<>();
            for (Map<String,Object> result : batchResult) {
                String code = result.get("code").toString();
                responseMap.put(code,result);
            }
            //返回对应的请求结果
            for (Request request : list) {
                Map<String, Object> response = responseMap.get(request.code);
                request.future.complete(response);
            }
        },0,10,TimeUnit.MILLISECONDS);
    }
}

利用请求合并发挥高并发下批量处理的优势

需求分析

我们经常会写一些查询接口,假设现在我们需要写一个查询用户信息的接口,要求传入用户Id,返回用户名称。

那么最简化的流程就是:用户发送请求->controller层->service层->dao层->数据库。

每次请求就相当于请求一条用户信息。

当这个接口被用户频繁请求时,此接口就不断的在做“请求”到“返回”的操作,服务端同时会开辟许多线程帮我们执行这些操作,这么多的线程会消耗许多系统资源,服务端承受了巨大压力。

//单查询接口
@GetMapping("/getUser")
    public String getUser(Long key){
        long currentMillis = System.currentTimeMillis();
        //单查询service,大量线程怼到这个service上去
        String userName = userService.getUser(key);
        System.out.printf("##############################################\n");
        System.out.printf("用户名为:" + userName + "---线程名为:" + Thread.currentThread().getName() +
                "---执行时间为:" + (System.currentTimeMillis() - currentMillis) + "\n");
        return userName;
    }

那么我们有什么方式可以优化这种操作呢?

我目前能想到的就是利用缓存(缓存热点数据)、消息队列(接收请求慢慢消费达到流量削峰)、多个服务实例(分散请求压力提高计算能力)等方式应对高并发场景。

在本文中,我利用另一种思路:把多个请求合并为一个请求,把单查询变为批量查询,这样就能有效减少开辟线程的数量。

具体实现

首先我们定义一个用户请求类Request:

//用户请求类
public class RequestTest {
    //请求条件
    private Long key;
    //传话人
    private CompletableFuture<String> future;
    public CompletableFuture<String> getFuture() {
        return future;
    }
    public void setFuture(CompletableFuture<String> future) {
        this.future = future;
    }
    public Long getKey() {
        return key;
    }
    public void setKey(Long key) {
        this.key = key;
    }
}

接着是请求合并的主要代码:

//存放请求的队列
    LinkedBlockingDeque<RequestTest> queue = new LinkedBlockingDeque<>();
    //初始化方法
    @PostConstruct
    public void init() {
        //定时执行的线程池,每隔5毫秒执行一次(间隔时间可以由业务决定),把所有堆积的请求
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        executorService.scheduleAtFixedRate(() -> {
            //在这里具体执行批量查询逻辑
            int size = queue.size();
            if (size == 0) {
                //若没有请求堆积,直接返回,等10毫秒再执行一次
                return;
            }
            //若有请求堆积把所有请求都拿出来
            List<RequestTest> requestTests = new ArrayList<>();
            for (int i = 0; i < size; i++) {
                //把请求拿出来
                RequestTest poll = queue.poll();
                requestTests.add(poll);
            }
            //至此请求已经被合并了
            System.out.printf("##############################################\n");
            System.out.printf("请求合并了" + requestTests.size() + "条!\n");
            //组装批量查询条件
            List<Long> keyList = new ArrayList<>();
            for (RequestTest requestTest : requestTests) {
                keyList.add(requestTest.getKey());
            }
            //进行批量查询
            List<User> nameList = userService.getUserList(keyList);
            //把批查结果放入一个map
            Map<Long,String> map = new HashMap<>();
            for(User user:nameList){
                map.put(user.getId(),user.getName());
            }
            for (RequestTest requestTest : requestTests){
                //把放在map中的结果集放回给对应的线程
                //future是对应每个请求的,因为是每个请求线程都传了自己的future是对应的过来
                requestTest.getFuture().complete(map.get(requestTest.getKey()));
            }
        }, 0, 5, TimeUnit.MILLISECONDS);
    }
    //请求合并
    @GetMapping("/requestMerge/getUser")
    public String getUserRequestMerge(Long key) throws InterruptedException, ExecutionException {
        long currentMillis = System.currentTimeMillis();
        //CompletableFuture可以使一个线程执行操作后,主动返回值给另一个线程
        CompletableFuture<String> future = new CompletableFuture<>();
        RequestTest requestTest = new RequestTest();
        //把future(把future可以认为是线程间的"传话人")放到等待队列中去,让定时调度的线程池执行并返回值
        requestTest.setFuture(future);
        requestTest.setKey(key);
        //把requestTest加入等待队列(LinkedBlockingDeque)
        queue.add(requestTest);
        //future(传话人)阻塞直到有值返回
        String userName = future.get();
        System.out.printf("用户名为:" + userName + "---线程名为:"+Thread.currentThread().getName()+
                "---执行时间为:"+(System.currentTimeMillis() - currentMillis)+"\n");
        return userName;
    }

到这里我们就完成了一个请求合并的demo,接着我们测试运行结果。

这里我用了jemeter(jemeter的用法可以网上找一找)进行测试,对单查询接口和请求合并接口分别进行了20000次的请求,以下是结果对比:

(1)单查询运行结果:

可以看到系统开辟了许多线程来处理请求,jemeter的聚合报告如下:

(2)请求合并运行结果:

可以看到多条线程被合并成了一条线程来处理,jemeter的聚合报告如下:

通过以上数据,我们可以看到,请求合并比单查询的吞吐量要大,并且在运行过程中,流量浮动的范围比较小。

至此,我们就完成了利用请求合并发挥高并发下批量处理的优势。

心得

本文我利用了LinkedBlockingDeque阻塞队列、ScheduledExecutorService定时执行线程池和CompletableFuture线程通信来完成了请求合并的demo。

并且通过实验证明高并发下批量处理比单个处理更有优势。

当然,我的demo并不完善。而且请求合并也有一些弊端,比如如果定时线程池的间隔时间比较长,反而会造成请求堆积时间太长,用户不能快速得到响应。

同时在请求数量比较小时,请求合并的场景也是没有必要的。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文