java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > 基于DeferredResult的异步服务

Java中基于DeferredResult的异步服务详解

作者:爱喝咖啡的程序员

这篇文章主要介绍了Java中基于DeferredResult的异步服务详解,DeferredResult字面意思是"延迟结果",它允许Spring MVC收到请求后,立即释放(归还)容器线程,以便容器可以接收更多的外部请求,提升吞吐量,需要的朋友可以参考下

一. 简介

Servlet3.0提供了基于servlet的异步处理api,Spring MVC只是将这些api进行了一系列的封装,从而实现了DeferredResult。

DeferredResult字面意思是"延迟结果",它允许Spring MVC收到请求后,立即释放(归还)容器线程,以便容器可以接收更多的外部请求,提升吞吐量,与此同时,DeferredResult将陷入阻塞,直到我们主动将结果set到DeferredResult,最后,DeferredResult会重新申请容器线程,并将本次请求返回给客户端。

二. 使用

1. 监听器 onTimeout()

当deferredResult被创建出来之后,执行setResult()之前,这之间的时间超过设定值时(比如下方案例中设置为5秒超时),则被判定为超时。

DeferredResult<String> deferredResult = new DeferredResult<String>(5 * 1000L);
// 设置超时事件
deferredResult.onTimeout(() -> {
    System.out.println("异步线程执行超时, 异步线程的名称: " + Thread.currentThread().getName());
    deferredResult.setResult("异步线程执行超时");
});

2. 监听器 onError()

当onTimeout()或onCompletion()等回调函数中的代码报错时,则会执行监听器onError()的回调函数。

PS: DeferredResult之外的代码报错不会影响到onError()。

DeferredResult<String> deferredResult = new DeferredResult<String>(5 * 1000L);
// 设置异常事件
deferredResult.onError((throwable) -> {
    System.out.println("异步请求出现错误,异步线程的名称: " + Thread.currentThread().getName() + "异常: " + throwable);
    deferredResult.setErrorResult("异步线程执行出错");
});

3. 监听器 onCompletion()

代码任意位置调用了同一个DeferredResult的setResult()后,则会被DeferredResult的onCompletion()监听器捕获到。

Spring会任选一条容器线程来执行onCompletion( )中的代码(由于请求线程已被释放(归还),所以此处可能再次由同一条请求线程来处理,也可能由其他线程来处理)。

DeferredResult<String> deferredResult = new DeferredResult<String>(5 * 1000L);
// 设置完成事件
deferredResult.onCompletion(() -> {
    System.out.println("异步线程执行完毕,异步线程的名称: " + Thread.currentThread().getName());
});

完整的代码为:

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@RestController
public class DemoController {
    // 自定义线程池
    public static ExecutorService exec = Executors.newCachedThreadPool();
    @GetMapping("/demo")
    public DeferredResult<String> demoResult() {
        System.out.println("容器线程: " + Thread.currentThread().getName());
        // 创建DeferredResult对象,设置超时时长 20秒
        DeferredResult<String> deferredResult = new DeferredResult<String>(5 * 1000L);
        // 设置超时事件
        deferredResult.onTimeout(() -> {
            System.out.println("异步线程执行超时, 异步线程的名称: " + Thread.currentThread().getName());
            // throw new RuntimeException("超时事件报错了!");
            deferredResult.setResult("异步线程执行超时");
        });
        // 设置异常事件
        deferredResult.onError((throwable) -> {
            System.out.println("异步请求出现错误,异步线程的名称: " + Thread.currentThread().getName() + "异常: " + throwable);
            deferredResult.setErrorResult("异步线程执行出错");
        });
        // 设置完成事件
        deferredResult.onCompletion(() -> {
            System.out.println("异步线程执行完毕,异步线程的名称: " + Thread.currentThread().getName());
        });
        exec.execute(() -> {
            System.out.println("[线程池] 异步线程的名称: " + Thread.currentThread().getName());
            deferredResult.setResult("异步线程执行完毕");
        });
        System.out.println("Servlet thread release");
        return deferredResult;
    }
}

三. 拓展

有些业务场景下,我们希望新的请求触发(激活)之前陷入阻塞的请求,此外可以通过不同的key来区分不同的请求。

比如apollo在实现时就利用了DeferredResult。客户端向服务器端发送轮询请求,服务端收到请求后,会立刻释放容器线程,并阻塞本次请求,若apollo托管的配置文件没有发生任何改变,则轮询请求会超时(返回304)。当有新的配置发布时,服务端会调用DeferredResult setResult()方法,进入onCompletion(),并使尚未超时的轮寻请求正常返回(200)。

大概如下: 

@SpringBootApplication
public class DemoApplication implements WebMvcConfigurer {
 
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
}
 
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.Collection;
@RestController
public class ApolloController {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    //guava中的Multimap,多值map,对map的增强,一个key可以保持多个value
    private Multimap<String, DeferredResult<String>> watchRequests = Multimaps.synchronizedSetMultimap(HashMultimap.create());
    //模拟长轮询
    @RequestMapping(value = "/watch/{namespace}", method = RequestMethod.GET, produces = "text/html")
    public DeferredResult<String> watch(@PathVariable("namespace") String namespace) {
        logger.info("Request received");
        DeferredResult<String> deferredResult = new DeferredResult<>();
        //当deferredResult完成时(不论是超时还是异常还是正常完成),移除watchRequests中相应的watch key
        deferredResult.onCompletion(new Runnable() {
            @Override
            public void run() {
                System.out.println("remove key:" + namespace);
                watchRequests.remove(namespace, deferredResult);
            }
        });
        watchRequests.put(namespace, deferredResult);
        logger.info("Servlet thread released");
        return deferredResult;
    }
    //模拟发布namespace配置
    @RequestMapping(value = "/publish/{namespace}", method = RequestMethod.GET, produces = "text/html")
    public Object publishConfig(@PathVariable("namespace") String namespace) {
        if (watchRequests.containsKey(namespace)) {
            Collection<DeferredResult<String>> deferredResults = watchRequests.get(namespace);
            Long time = System.currentTimeMillis();
            //通知所有watch这个namespace变更的长轮训配置变更结果
            for (DeferredResult<String> deferredResult : deferredResults) {
                deferredResult.setResult(namespace + " changed:" + time);
            }
        }
        return "success";
    }
}

当请求超时的时候会产生AsyncRequestTimeoutException,我们定义一个全局异常捕获类:

 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.context.request.async.AsyncRequestTimeoutException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ControllerAdvice
class GlobalControllerExceptionHandler {
    protected static final Logger logger = LoggerFactory.getLogger(GlobalControllerExceptionHandler.class);
    @ResponseStatus(HttpStatus.NOT_MODIFIED)//返回304状态码
    @ResponseBody
    @ExceptionHandler(AsyncRequestTimeoutException.class) //捕获特定异常
    public void handleAsyncRequestTimeoutException(AsyncRequestTimeoutException e, HttpServletRequest request) {
        System.out.println("handleAsyncRequestTimeoutException");
    }
}

然后我们通过postman工具发送请求//localhost:8080/watch/mynamespace,请求会挂起,60秒后,DeferredResult超时,客户端正常收到了304状态码,表明在这个期间配置没有变更过。

然后我们在模拟配置变更的情况,再次发起请求//localhost:8080/watch/mynamespace,等待个10秒钟(不要超过60秒),然后调用//localhost:8080/publish/mynamespace,发布配置变更。这时postman会立刻收到response响应结果:  

mynamespace changed:1538880050147

表明在轮训期间有配置变更过。

这里我们用了一个MultiMap来存放所有轮训的请求,Key对应的是namespace,value对应的是所有watch这个namespace变更的异步请求DeferredResult,需要注意的是:在DeferredResult完成的时候记得移除MultiMap中相应的key,避免内存溢出请求。

采用这种长轮询的好处是,相比一直循环请求服务器,实例一多的话会对服务器产生很大的压力,http长轮询的方式会在服务器变更的时候主动推送给客户端,其他时间客户端是挂起请求的,这样同时满足了性能和实时性。

四. DeferredResult与Callable的区别

DeferredResult和Callable都可以在Controller层的方法中直接返回,请求收到后,释放容器线程,在另一个线程中通过异步的方式执行任务,最后将请求返回给客户端。

不同之处在于,使用Callable时,当其它线程中的任务执行完毕后,请求会立刻返回给客户端,而DeferredResult则需要用户在代码中手动set值到DeferredResult,否则即便异步线程中的任务执行完毕,DeferredResult仍然不会向客户端返回任何结果。

到此这篇关于Java中基于DeferredResult的异步服务详解的文章就介绍到这了,更多相关基于DeferredResult的异步服务内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文