SpringCloud解决Feign异步回调问题(SpringBoot+Async+Future实现)
作者:弱水提沧
近期,需要对之前的接口进行优化,缩短接口的响应时间,但是springcloud中的feign是不支持传递异步化的回调结果的,因此有了以下的解决方案,记录一下,仅供参考。
一、背景
对于一个页面上的所有实例有一个查询权限的接口,同时有四个操作类型的需要查询且接口仅支持单个实例单个操作类型操作。
简单来说,假设实例查询退订权限需要1秒钟,那么四个操作共需4秒钟,一共20个实例的话,那么实际所需时间为80秒。
这样显然是不符合要求的。经过本文的优化后,可以达到20秒。
二、设计方案
需要指出,由于各种限制原因,无法直接在工程中进行rest接口的调用,否则是直接可以@Async异步调用的。
(一)使用框架
产品工程使用SpringCloud微服务,同时通过Feign调用权限的微服务,服务都是使用SpringBoot实现。
(二)实现方案
由于Feign的特殊性,目前并不支持异步返回Future,直接通过Future是会报错的。
因此可以在权限微服务中异步执行查询操作权限,同时再异步起一个线程去等待所有的异步结果,这一个线程是阻塞的,同时由阻塞得到的最终结果,封装成自己想要的返回体,返回给调用方即可,这样相当于大致只需要等待一个实例的操作时间即可。
三、示例代码
项目代码我就不放上来了,我将写一个Demo作为示例,基本的思想都在里面了。
(一)被调用方
1. 配置异步化及异步线程池 - AsyncConfig类
@Configuration @EnableAsync public class AsyncConfig implements AsyncConfigurer { @Bean("taskPoolExecutor") public Executor taskPoolExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(20); executor.setMaxPoolSize(30); executor.setQueueCapacity(200); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix("taskPoolExecutor-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return new SimpleAsyncUncaughtExceptionHandler(); } }
2. Controller层
@RestController @Slf4j public class AsyncController { @Autowired private AsyncService asyncService; /** * 异步测试方法 * @param ids * @return * @throws Exception */ @GetMapping("/future") public List<User> testFuture(int[] ids) throws Exception { List<Future<User>> futures = new ArrayList<>(); futures.add(asyncService.testFuture(ids[0])); //异步方法 异步执行四个用户的查询操作 futures.add(asyncService.testFuture(ids[1])); futures.add(asyncService.testFuture(ids[2])); futures.add(asyncService.testFuture(ids[3])); return asyncService.getReturnValueMange(futures); 异步方法 异步获取所有的异步回调值 } /** * 同步测试方法 * @param i * @return * @throws InterruptedException */ @GetMapping("/syncTest") public User testsyncTest( Integer i) throws InterruptedException { User user = asyncService.testSync(i); return user; } }
这里需要解释说明下,使用一个list作为future结果的集合,同时采用getReturnValueMange(future)方法来获取所有的异步执行结果,具体可以看下面Service层的代码逻辑。
3. AsyncService层
@Slf4j @Service public class AsyncService { @Autowired private AsyncUnit asyncUnit; //异步实现util public Future<User> testFuture(int i) throws InterruptedException { log.info("start..."); long currentTime = System.currentTimeMillis(); Future<User> user = asyncUnit.testUser(i); log.info("done...{}", String.valueOf(System.currentTimeMillis()- currentTime)); return user; } //获取异步方法结果 public List<User> getReturnValueMange(List<Future<User>> futures) throws Exception { Future<List<User>> userFuture = asyncUnit.getReturnValueMange(futures); return userFuture.get(); //get()方法为阻塞方法,等待异步返回值 } //同步方法-作为比较方法 public User testSync(int i) throws InterruptedException { log.info("start..."); Thread.sleep(2000); return new User(i); }
4. AsyncUtil 层
@Service @Slf4j public class AsyncUnit { //异步方法,通过sleep 2秒钟模拟 @Async("taskPoolExecutor") public Future<User> testUser(int age){ log.info("异步执行start...{}",Thread.currentThread().getName()); long currentTime = System.currentTimeMillis(); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("异步执行done...{}, {}",Thread.currentThread().getName(), String.valueOf(System.currentTimeMillis()- currentTime)); return new AsyncResult<>(new User(age)); } @Async("taskPoolExecutor") public Future<List<User>> getReturnValueMange(List<Future<User>> futures) throws Exception{ log.info("异步执行start..getReturnValueMange.{}",Thread.currentThread().getName()); long currentTime = System.currentTimeMillis(); List<User> users = new ArrayList<>(); for (Future<User> future : futures) { users.add(future.get()); } log.info(""+users.size()); log.info("异步执行done..getReturnValueMange.{}, {}",Thread.currentThread().getName(), String.valueOf(System.currentTimeMillis()- currentTime)); return new AsyncResult<>(users); } }
(二)调用方
1. Config 层
@Configuration public class FeignConfig { @Bean public RequestInterceptor requestInterceptor() { return new FeignRequestInterceptor(); } }
2. Controller层
@RestController @Slf4j public class AsyncController { @Autowired private RemoteClient remoteClient; //异步测试方法,查询五次 模拟所需时间 @GetMapping("/future") public List<Integer> testFuture() throws InterruptedException, ExecutionException { String res = ""; log.info(" asynccontroller...start..."); long currentTime = System.currentTimeMillis(); int[] ids = new int[]{1,2,3,4,5}; List<User> users = remoteClient.testFuture(ids); List<Integer> resList = new ArrayList<>(); resList.add(users.get(1).getAge()); log.info(" asynccontroller...done...{}", String.valueOf(System.currentTimeMillis()- currentTime)); return resList; } //同步测试方法,通过执行5次同步方法模拟查询 @GetMapping("/syncTest") public String testsyncTest() { long currentTime = System.currentTimeMillis(); List<Integer> ages = new ArrayList<>(); for(int i=0; i<5; i++){ User user = remoteClient.testsyncTest(i); ages.add(user.getAge()); } log.info("done...{}", String.valueOf(System.currentTimeMillis()- currentTime)); return ages.get(0)+ages.get(1)+ages.get(2)+ages.get(3)+ages.get(4)+""; } }
3. FeignClient (RemoteClient)层
@FeignClient(name = "ASYNC") public interface RemoteClient { @GetMapping("/future") List<User> testFuture(@RequestParam("ids") int[] ids); @GetMapping("/futureValue") List<String> getReturnValueMange(@RequestBody List<Future<User>> futures); @GetMapping("/syncTest") User testsyncTest(@RequestParam("i") Integer i); }
被调用方的服务名为ASYNC,通过Feign调用,Feign的详细说明就不细说,具体可以自行查询。
(三)公共类 User
@Data @NoArgsConstructor @AllArgsConstructor public class User { private String name; private String id; private int age; private String school; public User (int age){ this.age = age; } }
四、实现效果测试
调用方端口8305 被调用方8036 eureka端口 8761,将服务启动如图所示。
1. 使用Postman进行接口测试,首先是同步方法的执行。
控制台日志
2. 使用Postman进行异步方法调用
控制台日志
可以看到异步执行之后,查询结果由10秒缩短到2秒,这个优化的时间为实例的个数为倍数作为缩短。
五、总结
本文的优化方法基于Feign无法异步返回调用值得情况下采取的折中方法,如果遇到不在意返回值的异步返回可以直接进行异步执行,这样的话可以在毫秒级就执行结束。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。