Java多线程并发之线程池任务请求拦截测试实例
作者:Terisadeng
一、需求
前端会传入一个存储编码的list,后台接收到编码通过计算返回每个编码对应的值,每个编码计算出来的值是固定不变的。
二、设计方案
因为前端请求响应有一个时常要求,比如100ms。
而这个计算比较耗时,因此为了请求能够快速响应,在第一个请求过来时判断redis缓存是否存储编码对应的计算值,如果没有就直接返回空,前端根据这个空值使用补偿方案的默认值。
后台通过线程池执行计算方法,然后存入redis,这样下次用户带着相同的编码请求就可以直接从缓存获取,不用重复计算。
这里的问题在于,当并发量高的情况下,比如50个用户带着相同的编码调用计算方法,而实际上计算方法只需要调用一次就可以了。
因此我们需要在将任务提交到线程池之前判断线程池中执行线程的数量来决定是否要将任务提交到线程池。
另外这里千万不能使用直接创建线程的方式,这会导致并发情况下突然创建大量线程,导致系统cpu飙升卡死。
三、测试
1、线程池实现类,提供全局唯一的线程池实例
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 固定大小的线程池 */ public class DisCardThreadPool { private static DisCardThreadPool disCardThreadPool=new DisCardThreadPool(); /* * 将构造方法访问修饰符设为私有,禁止任意实例化。 */ private DisCardThreadPool() { } /** * 核心线程数 */ int corePoolSize = 1; /** * 最大线程数 */ int maximumPoolSize = 1; /** * 空闲线程存活时间 */ long keepAliveTime = 10; /* * 线程池单例创建方法 */ public static DisCardThreadPool newInstance() { return disCardThreadPool; } private final ThreadPoolExecutor mThreadPool=new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardPolicy()); public void execute(Runnable r){ mThreadPool.execute(r); } /** * 队列中等待执行的任务数目 * @return */ public synchronized int getQueue(){ return mThreadPool.getQueue().size(); } /* * 获取线程池中剩余线程数目 * 获取的结果不准确 */ public synchronized int getActiveCount(){ return mThreadPool.getActiveCount(); }
2、测试类
这里通过CountDownLatch类同时启动多个线程来模拟并发请求
import com.teriste.service.threadpool.DisCardThreadPool; import org.junit.Test; import java.io.File; import java.io.IOException; import java.util.concurrent.CountDownLatch; /** * 模拟并发向线程池提交任务。 * 需求:使得线程池满之后其他请求都不执行 */ public class MultiThreadConcurrencyTest { //获取线程池实例 private static DisCardThreadPool threadPool=DisCardThreadPool.newInstance(); @Test public void test(){ //创建大小20的计数器,使得20个线程同时执行,模拟并发 CountDownLatch countDownLatch=new CountDownLatch(20); for (int i=0;i<20;i++){ InvokeThread thread=new InvokeThread(countDownLatch); System.out.println("创建线程:"+thread.getName()); thread.start(); //启动一个线程,计数器就减一,同时在线程的run方法中阻塞线程,等待计数器唤醒 countDownLatch.countDown(); } try { //阻塞主线程,防止子线程还没执行主线程结束导致子线程无法执行 Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } //线程池执行线程 public static void invokeThread(){ //当排队队列有线程等待时不继续添加线程 synchronized (MultiThreadConcurrencyTest.class){ //System.out.println("等待队列大小:"+threadPool.getQueue()); //官方api指出getActiveCount()无法获取准确的存获线程数 //因为这里是根据队列中待执行任务数来判断,因此如果线程池大小为1,实际上会有两个线程被执行, //一个线程是进入线程池,还有一个线程判断此时队列待执行线程数是0会进入待执行队列,因此最终执行线程数是线程池大小+1 System.out.println("排队队列中的线程个数:"+threadPool.getQueue()); if (threadPool.getQueue()<=0){ threadPool.execute(new WorkThread()); } } } } //调用线程池执行任务的类,模拟外部请求实体发起请求 class InvokeThread extends Thread{ private CountDownLatch countDownLatch; public InvokeThread(CountDownLatch countDownLatch){ this.countDownLatch=countDownLatch; } @Override public void run(){ try { //等待计数器唤醒 countDownLatch.await(); //向线程池提交线程 MultiThreadConcurrencyTest.invokeThread(); } catch (InterruptedException e) { e.printStackTrace(); } } } //任务类 class WorkThread implements Runnable{ @Override public void run() { String path="E:\\测试"; File file=new File(path); if(!file.exists()){ file.mkdirs();//创建目录 } String fileName=Thread.currentThread().getName()+System.currentTimeMillis(); File newFile=new File(path,fileName); try { newFile.createNewFile(); } catch (IOException e) { e.printStackTrace(); } //在线程池中有可能是不同的线程使用相同的名称 //因为线程池中上个结束的线程继续使用来执行下个线程 System.out.println("当前执行的线程的名称:"+Thread.currentThread().getName()+fileName); } }
3、如果放开对排队队列的判断可以看到,当线程池满了之后执行的是ArrayBlockingQueue.offer(E e);方法:
这说明我们可以通过继承ArrayBlockingQueue类实现自己的排队队列,当线程池满了之后调用offer方法时,我们直接丢弃任务什么都不做,这样就可以准确实现上面的方案,并且可以去掉对队列中待执行线程的判断,从而不需要加锁,提高执行效率。
下面是自定义队列的实现:
import java.util.Collection; import java.util.concurrent.ArrayBlockingQueue; /** * 线程池使用该类时执行插入方法时不会向队列中插入数据,会直接丢弃或记录日志 */ public class EmptyArrayBlockingQueue<E> extends ArrayBlockingQueue{ public EmptyArrayBlockingQueue(int capacity) { super(capacity); } public EmptyArrayBlockingQueue(int capacity, boolean fair) { super(capacity, fair); } public EmptyArrayBlockingQueue(int capacity, boolean fair, Collection c) { super(capacity, fair, c); } /** * 注意这里重写的父类方法参数是泛型参数 * 由于Java的类型擦除,在编译时会自动变为Object类型 * 因此这里使用Object类型实际上就是重写的父类方法 * @param e * @return */ @Override public boolean offer(Object e) { /**不执行将线程加入队列的操作,这样队列永远为空 超过线程池核心线程数的线程实际上在这里都被丢弃了 可以增加记录日志的操作 */ return true; } }
下面是修改后的线程池类:
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 固定大小的线程池 */ public class DisCardThreadPool { private static DisCardThreadPool disCardThreadPool=new DisCardThreadPool(); /* * 将构造方法访问修饰符设为私有,禁止任意实例化。 */ private DisCardThreadPool() { } /** * 核心线程数 */ int corePoolSize = 1; /** * 最大线程数 */ int maximumPoolSize = 1; /** * 空闲线程存活时间 */ long keepAliveTime = 10; /* * 线程池单例创建方法 */ public static DisCardThreadPool newInstance() { return disCardThreadPool; } private final ThreadPoolExecutor mThreadPool=new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime, TimeUnit.MILLISECONDS,new EmptyArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardPolicy()); public void execute(Runnable r){ mThreadPool.execute(r); } /** * 队列中等待执行的任务数目 * @return */ public synchronized int getQueue(){ return mThreadPool.getQueue().size(); } /* * 获取线程池中剩余线程数目 * 获取的结果不准确 */ public synchronized int getActiveCount(){ return mThreadPool.getActiveCount(); } }
下面是测试类:
import com.teriste.service.threadpool.DisCardThreadPool; import org.junit.Test; import java.io.File; import java.io.IOException; import java.util.concurrent.CountDownLatch; /** * 模拟并发向线程池提交任务。 * 需求:使得线程池满之后其他请求都不执行 */ public class MultiThreadConcurrencyTest { //获取线程池实例 private static DisCardThreadPool threadPool=DisCardThreadPool.newInstance(); @Test public void test(){ //创建大小20的计数器,使得20个线程同时执行,模拟并发 CountDownLatch countDownLatch=new CountDownLatch(20); for (int i=0;i<20;i++){ InvokeThread thread=new InvokeThread(countDownLatch); System.out.println("创建线程:"+thread.getName()); thread.start(); //启动一个线程,计数器就减一,同时在线程的run方法中阻塞线程,等待计数器唤醒 countDownLatch.countDown(); } try { //阻塞主线程,防止子线程还没执行主线程结束导致子线程无法执行 Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } //线程池执行线程 public static void invokeThread(){ //当排队队列有线程等待时不继续添加线程 synchronized (MultiThreadConcurrencyTest.class){ //System.out.println("等待队列大小:"+threadPool.getQueue()); //官方api指出getActiveCount()无法获取准确的存获线程数 //因为这里是根据队列中待执行任务数来判断,因此如果线程池大小为1,实际上会有两个线程被执行, //一个线程是进入线程池,还有一个线程判断此时队列待执行线程数是0会进入待执行队列,因此最终执行线程数是线程池大小+1 System.out.println("排队队列中的线程个数:"+threadPool.getQueue()); //if (threadPool.getQueue()<=0){ threadPool.execute(new WorkThread()); //} } } } //调用线程池执行任务的类,模拟外部请求实体发起请求 class InvokeThread extends Thread{ private CountDownLatch countDownLatch; public InvokeThread(CountDownLatch countDownLatch){ this.countDownLatch=countDownLatch; } @Override public void run(){ try { //等待计数器唤醒 countDownLatch.await(); //向线程池提交线程 MultiThreadConcurrencyTest.invokeThread(); } catch (InterruptedException e) { e.printStackTrace(); } } } //任务类 class WorkThread implements Runnable{ @Override public void run() { String path="E:\\测试"; File file=new File(path); if(!file.exists()){ file.mkdirs();//创建目录 } String fileName=Thread.currentThread().getName()+System.currentTimeMillis(); File newFile=new File(path,fileName); try { newFile.createNewFile(); } catch (IOException e) { e.printStackTrace(); } //在线程池中有可能是不同的线程使用相同的名称 //因为线程池中上个结束的线程继续使用来执行下个线程 System.out.println("当前执行的线程的名称:"+Thread.currentThread().getName()+fileName); } }
测试结果:
从测试结果可以看到,队列中永远没有线程被加入,即使线程池已满,也不会导致被加入排队队列,实现了只有线程池存在空闲线程的时候才会接受新任务的需求。
到此这篇关于Java多线程并发之线程池任务请求拦截测试实例的文章就介绍到这了,更多相关Java线程池任务请求拦截测试内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!