SpringBoot执行有返回值的异步任务问题
作者:章鱼先森cd
SpringBoot执行有返回值异步任务
Springboot如何使用多线程处理异步任务,并且是代返回值的。
比如,我一个Controller层的接口,调用到Service层,在对应的Service方法中有三个方法,这三个方法都是去调Dao层查询数据库数据,每个查询平均耗时5s,最后将这三个查询的结果进行合并计算。如果不采用异步查询的至少需要15s的时间来处理这个请求。此时我们可以使用异步任务方式来操作,可以减少10s左右的时间。
在Springboot中使用异步需要开启异步支持(@EnableAsync),代码如下
@SpringBootApplication @EnableAsync public class PublisherApplication { public static void main(String[] args) { SpringApplication.run(PublisherApplication.class, args); } }
然后增加一个配置类,不过这个Springboot也会默认配置,一般我们使用线程池是都不使用默认的,而是使用自定义的:
/** * @author Mr. Zhang * @description 异步线程池 * @date 2019-04-19 14:21 * @website https://www.zhangguimin.cn */ @Configuration public class AsyncConfig { private static final int THREADS = Runtime.getRuntime().availableProcessors(); @Bean("taskExecutor") public Executor execute() { System.out.println(THREADS); Executor executor = new ThreadPoolExecutor(THREADS, 2 * THREADS, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1024)); return executor; } }
此配置为IO密集型服务器参数,即最大线程数为CPU核数*2(THREADS为CPU核数),其他拒绝策略默认即可。
在之后的异步任务中使用此出定义的线程池taskExecutor。
然后就是异步任务,在对应的任务方法上标注@Async(“taskExecutor”)即可,表示使用taskExecutor线程池中线程执行异步任务。代码参考:
@Service public class TaskServer { @Async("taskExecutor") public Future<Integer> asyncTask(CountDownLatch latch) { try { Thread.sleep(3000L); } catch (InterruptedException e) { e.printStackTrace(); }finally { latch.countDown(); } return new AsyncResult<>(10); } @Async("taskExecutor") public Future<Integer> asyncTask2(CountDownLatch latch) { try { Thread.sleep(3000L); } catch (InterruptedException e) { e.printStackTrace(); }finally { latch.countDown(); } return new AsyncResult<>(20); } @Async("taskExecutor") public Future<Integer> asyncTask3(CountDownLatch latch) { try { Thread.sleep(3000L); } catch (InterruptedException e) { e.printStackTrace(); }finally { latch.countDown(); } return new AsyncResult<>(30); } }
注意的是,需要在类上标注为组件注解,此处为@Service。
在这三个方法中执行sleep,使线程都是sleep 3秒的时间。然后参数为CountDownLatch,在每次执行完就countDown(),然后返回值为Future,如果执行的异步方法没有返回值可以不需要CountDownLatch和返回值就行。
在对应的测试类中执行
@RunWith(SpringRunner.class) @SpringBootTest public class PublisherApplicationTests { @Autowired private TaskServer taskServer; @Test public void test() { CountDownLatch latch = new CountDownLatch(3); LocalTime time = LocalTime.now(); Future<Integer> integerFuture = taskServer.asyncTask(latch); Future<Integer> integerFuture1 = taskServer.asyncTask2(latch); Future<Integer> integerFuture2 = taskServer.asyncTask3(latch); try { latch.await(); LocalTime end =LocalTime.now(); System.out.println(Duration.between(time, end)); Integer integer = integerFuture.get(); Integer integer1 = integerFuture1.get(); Integer integer2 = integerFuture2.get(); int i = integer + integer1 + integer2; System.out.println(i); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
注意CountDownLatch latch = new CountDownLatch(3);有三个异步方法传入的值为3,在调用的异步方法后面需要latch.await();
一定不要在latch.await();前去get()结果值,这样会导致异步方法强制执行。
另外,有的人也使用while循环,在所有结果出来前一致在循环等待,类似自旋锁的原理,对资源消耗较高,还有,如果一个任务出现错误,就可能会造成一只循环。而CountDownLatch可在await()方法中传入等待时间,如果超过这个时间就会结束等待,直接完成下面的操作。
SpringBoot开启有返回值的异步调用:三步搞定
1、线程池配置
package com.listen.demo.config; /** * @author liuxd * @version 1.0 * @date 2019-12-25 15:46 */ import com.listen.demo.service.MyTaskServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; @Configuration //@ComponentScan({"com.listen.demo"}) @EnableAsync public class ExecutorConfig { private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class); @Bean public Executor asyncServiceExecutor() { logger.info("start asyncServiceExecutor"); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //配置核心线程数 executor.setCorePoolSize(5); //配置最大线程数 executor.setMaxPoolSize(5); //配置队列大小 executor.setQueueCapacity(99999); //配置线程池中的线程的名称前缀 executor.setThreadNamePrefix("async-service-"); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //执行初始化 executor.initialize(); return executor; } @Bean public MyTaskServer myTaskServer(){ return new MyTaskServer(); } }
2、业务操作类
package com.listen.demo.service; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.stereotype.Service; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; /** * @author liuxd * @version 1.0 * @date 2019-12-25 15:48 */ @Service public class MyTaskServer { @Async("asyncServiceExecutor") public Future<Integer> asyncTask(CountDownLatch countDownLatch) throws Exception { Thread.sleep(1000L); countDownLatch.countDown(); return new AsyncResult<>(10); } @Async("asyncServiceExecutor") public Future<Integer> asyncTask2(CountDownLatch countDownLatch) throws Exception { Thread.sleep(2000L); countDownLatch.countDown(); return new AsyncResult<>(20); } @Async("asyncServiceExecutor") public Future<Integer> asyncTask3(CountDownLatch countDownLatch) throws Exception { Thread.sleep(3000L); countDownLatch.countDown(); return new AsyncResult<>(30); } }
3、异步+等待+有返回的测试类
package com.listen.demo; import com.listen.demo.config.ExecutorConfig; import com.listen.demo.service.MyTaskServer; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; /** * @author liuxd * @version 1.0 * @date 2019-12-25 16:03 */ public class TestAsyncTask { public static void main(String[] args) throws Exception { System.out.println("主线程:" + Thread.currentThread().getName() + "开始执行调用任务..."); AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ExecutorConfig.class); MyTaskServer myTaskServer = context.getBean(MyTaskServer.class); CountDownLatch countDownLatch = new CountDownLatch(3); Future<Integer> future1 = myTaskServer.asyncTask(countDownLatch); Future<Integer> future2 = myTaskServer.asyncTask2(countDownLatch); Future<Integer> future3 = myTaskServer.asyncTask3(countDownLatch); countDownLatch.await(); Integer num1 = future1.get(); Integer num2 = future2.get(); Integer num3 = future3.get(); int data = num1 + num2 + num3; System.out.println("最终汇总计算结果:" + data); context.close(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("主线程:" + Thread.currentThread().getName() + "程序结束!!"); } }
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。