Spring中的@Async原理分析

 更新时间:2024年01月09日 09:00:12   作者:it_lihongmin  
这篇文章主要介绍了Spring中的@Async原理分析,自定义new ThreadPoolExecutor并调用invokeAll等进行并发编程,后面发现只要在方法上添加@Async注解,并使用@EnableAsync进行开启默认会使用SimpleAsyncTaskExecutor类型,需要的朋友可以参考下

Java技术迷

前言

之前编程都是自定义new ThreadPoolExecutor(。。。),并调用invokeAll等进行并发编程。

后面发现只要在方法上添加@Async注解,并使用@EnableAsync进行开启,并且@since为Spring 3.1版本。

我使用的Spring 5版本的,默认会使用SimpleAsyncTaskExecutor类型。就是一个大坑。

1、@Async

1
2
3
4
5
6
7
8
9
10
11
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
  
    Class<? extends Annotation> annotation() default Annotation.class;
  
    boolean proxyTargetClass() default false;
  
    AdviceMode mode() default AdviceMode.PROXY;
  
    int order() default Ordered.LOWEST_PRECEDENCE;
}

与之前分析@EnableTransactionManagement一样,属性都差不多。使用@Import方式将AsyncConfigurationSelector注册为bean。

实现了ImportSelector接口 

1
2
3
4
5
6
7
8
9
10
public String[] selectImports(AdviceMode adviceMode) {
    switch (adviceMode) {
        case PROXY:
            return new String[] {ProxyAsyncConfiguration.class.getName()};
        case ASPECTJ:
            return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
        default:
            return null;
    }
}

@EnableAsync上没有配置mode,则默认使用jdk方式实现。返回ProxyAsyncConfiguration将其注入为bean。

2、ProxyAsyncConfiguration

1)、实现ImportAware

则在ProxyAsyncConfiguration初始化为bean时,会进行回调,实现方法如下:

1
2
3
4
5
6
7
8
9
public void setImportMetadata(AnnotationMetadata importMetadata) {
    this.enableAsync = AnnotationAttributes.fromMap(
            importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));
    if (this.enableAsync == null) {
        throw new IllegalArgumentException(
                "@EnableAsync is not present on importing class " +
           importMetadata.getClassName());
    }
}

获取@EnableAsync注解上的配置信息,并保存到 enableAsync属性中。

2)、AsyncAnnotationBeanPostProcessor

将 AsyncAnnotationBeanPostProcessor初始化为bean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
    Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
    AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
    bpp.configure(this.executor, this.exceptionHandler);
    Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
    if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class,
         "annotation")) {
        bpp.setAsyncAnnotationType(customAsyncAnnotation);
    }
    bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
    bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
    return bpp;
}

3、AsyncAnnotationBeanPostProcessor

实现了很多Aware接口,注入了BeanFactory和BeanClassLoader,主要是在setBeanFactory方法中:

1
2
3
4
5
6
7
8
9
10
public void setBeanFactory(BeanFactory beanFactory) {
    super.setBeanFactory(beanFactory);
  
    AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
    if (this.asyncAnnotationType != null) {
        advisor.setAsyncAnnotationType(this.asyncAnnotationType);
    }
    advisor.setBeanFactory(beanFactory);
    this.advisor = advisor;
}

new 了一个AsyncAnnotationAdvisor,而线程池和异常处理器是从初始化 ProxyAsyncConfiguration时传入的,默认都为null。构造器如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public AsyncAnnotationAdvisor(@Nullable Supplier<Executor> executor,
    @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
  
    Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
    asyncAnnotationTypes.add(Async.class);
    try {
        asyncAnnotationTypes.add((Class<? extends Annotation>)
        ClassUtils.forName("javax.ejb.Asynchronous",
        AsyncAnnotationAdvisor.class.getClassLoader()));
    } catch (ClassNotFoundException ex) {
    // If EJB 3.1 API not present, simply ignore.
    }
    this.advice = buildAdvice(executor, exceptionHandler);
    this.pointcut = buildPointcut(asyncAnnotationTypes);
}

buildAdvice:构建拦截器

1
2
3
4
5
6
7
protected Advice buildAdvice(@Nullable Supplier<Executor> executor,
    @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
  
    AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
    interceptor.configure(executor, exceptionHandler);
    return interceptor;
}

初始化了一个AnnotationAsyncExecutionInterceptor 拦截器,后续进行分析。使用有参构造,但是异步任务的线程池为null。

buildPointcut:根据Async构建拦截匹配点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
    ComposablePointcut result = null;
    // asyncAnnotationTypes默认只要Async类型
    for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
        Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
        Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
        if (result == null) {
            // result肯定是null,先添加Class类型的切点匹配器
            result = new ComposablePointcut(cpc);
        } else {
            result.union(cpc);
        }
        // 再添加Method类型的切点拦截器
        result = result.union(mpc);
    }
    return (result != null ? result : Pointcut.TRUE);
}

默认情况下 asyncAnnotationTypes中只要Async类型,则初始化了配置Async的类和方法的 匹配拦截器(AnnotationMatchingPointcut),并且都添加到ComposablePointcut中。

一切初始化完成后,在每个bean的生命周期都会进行回调 postProcessAfterInitialization方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public Object postProcessAfterInitialization(Object bean, String beanName) {
    if (this.advisor == null || bean instanceof AopInfrastructureBean) {
        // Ignore AOP infrastructure such as scoped proxies.
        return bean;
    }
  
    if (bean instanceof Advised) {
        Advised advised = (Advised) bean;
        if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
            // Add our local Advisor to the existing proxy's Advisor chain...
            if (this.beforeExistingAdvisors) {
                advised.addAdvisor(0, this.advisor);
            }
            else {
                advised.addAdvisor(this.advisor);
            }
            return bean;
        }
    }
  
    if (isEligible(bean, beanName)) {
        ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
        if (!proxyFactory.isProxyTargetClass()) {
            evaluateProxyInterfaces(bean.getClass(), proxyFactory);
        }
        proxyFactory.addAdvisor(this.advisor);
        customizeProxyFactory(proxyFactory);
        return proxyFactory.getProxy(getProxyClassLoader());
    }
  
    // No proxy needed.
    return bean;
}
1
2
3
4
5
6
protected ProxyFactory prepareProxyFactory(Object bean, String beanName) {
    ProxyFactory proxyFactory = new ProxyFactory();
    proxyFactory.copyFrom(this);
    proxyFactory.setTarget(bean);
    return proxyFactory;
}

4、AnnotationAsyncExecutionInterceptor

显然核心实现在 invoke方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public Object invoke(final MethodInvocation invocation) throws Throwable {
    Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
    Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
    final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
  
    AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
    if (executor == null) {
        throw new IllegalStateException(
            "No executor specified and no default executor set on " +
            " AsyncExecutionInterceptor either");
    }
  
    Callable<Object> task = () -> {
        try {
            Object result = invocation.proceed();
            if (result instanceof Future) {
                return ((Future<?>) result).get();
            }
        } catch (ExecutionException ex) {
            handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
        } catch (Throwable ex) {
            handleError(ex, userDeclaredMethod, invocation.getArguments());
        }
        return null;
    };
    return doSubmit(task, executor, invocation.getMethod().getReturnType());
}

先获取执行的方法信息,再判断执行的异步线程池,再讲任务提交给线程池。

1)、获取线程池(determineAsyncExecutor)

之前初始化的时候,传入的线程池为null,则:

1
2
3
4
public AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor) {
    this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
    this.exceptionHandler = SingletonSupplier.of(SimpleAsyncUncaughtExceptionHandler::new);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
    if (beanFactory != null) {
        try {
            // Search for TaskExecutor bean... not plain Executor since that would
            // match with ScheduledExecutorService as well, which is unusable for
            // our purposes here. TaskExecutor is more clearly designed for it.
            return beanFactory.getBean(TaskExecutor.class);
        }
        catch (NoUniqueBeanDefinitionException ex) {
            logger.debug("Could not find unique TaskExecutor bean", ex);
            try {
                return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
            }
            catch (NoSuchBeanDefinitionException ex2) {
                if (logger.isInfoEnabled()) {
                    logger.info("More than one TaskExecutor bean found within the context, and none is named " +
                            "'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +
                            "as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());
                }
            }
        }
        catch (NoSuchBeanDefinitionException ex) {
            logger.debug("Could not find default TaskExecutor bean", ex);
            try {
                return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
            }
            catch (NoSuchBeanDefinitionException ex2) {
                logger.info("No task executor bean found for async processing: " +
                        "no bean of type TaskExecutor and no bean named 'taskExecutor' either");
            }
            // Giving up -> either using local default executor or none at all...
        }
    }
    return null;
}

beanFactory.getBean(TaskExecutor.class)

最后是获取了BeanFactory中的TaskExecutor的子类的bean(可能不存在)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
    AsyncTaskExecutor executor = this.executors.get(method);
    if (executor == null) {
        Executor targetExecutor;
        String qualifier = getExecutorQualifier(method);
        if (StringUtils.hasLength(qualifier)) {
            targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
        } else {
            targetExecutor = this.defaultExecutor.get();
        }
        if (targetExecutor == null) {
            return null;
        }
        executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
            (AsyncListenableTaskExecutor) targetExecutor : new
                 TaskExecutorAdapter(targetExecutor));
        this.executors.put(method, executor);
    }
    return executor;
}

使用本地缓存ConcurrentHashMap, key为Methed, value为线程池。

1、先获取执行的方法的@Async的value值

1
2
3
4
5
6
7
8
9
protected String getExecutorQualifier(Method method) {
    // Maintainer's note: changes made here should also be made in
    // AnnotationAsyncExecutionAspect#getExecutorQualifier
    Async async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class);
    if (async == null) {
        async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class);
    }
    return (async != null ? async.value() : null);
}

如果获取到配置的值(如定义方法时为:@Async("order") ),则获取正在的线程池

1
2
3
4
5
6
7
protected Executor findQualifiedExecutor(@Nullable BeanFactory beanFactory, String qualifier) {
    if (beanFactory == null) {
        throw new IllegalStateException("BeanFactory must be set on " + getClass().getSimpleName() +
                    " to access qualified executor '" + qualifier + "'");
    }
    return BeanFactoryAnnotationUtils.qualifiedBeanOfType(beanFactory, Executor.class, qualifier);
}

2、如果@Async上没有配置,则获取默认值

 targetExecutor = this.defaultExecutor.get();

就是之前从BeanFactory中获取TaskExecutor.class类型的实现,当前版本为spring5,,获取到的类型为SimpleAsyncTaskExecutor

2)、执行任务(doSubmit)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor,
    Class<?> returnType) {
     
    if (completableFuturePresent) {
        Future<Object> result = AsyncExecutionAspectSupport.CompletableFutureDelegate
        .processCompletableFuture(returnType, task, executor);
        if (result != null) {
            return result;
        }
    }
  
    if (ListenableFuture.class.isAssignableFrom(returnType)) {
        return ((AsyncListenableTaskExecutor)executor).submitListenable(task);
    } else if (Future.class.isAssignableFrom(returnType)) {
        return executor.submit(task);
    } else {
        executor.submit(task);
        return null;
    }
}

根据我们定义的方法的返回值进行处理,返回值可以是 null、Future、Spring的AsyncResult是ListenableFuture的子类。

5、SimpleAsyncTaskExecutor

如果使用@Async没有配置线程池,并且没有给AnnotationAsyncExecutionInterceptor设置线程池,则调用时就是一个坑,每次创建一个线程。

submit()方法:

1
2
3
4
5
6
@Override
public <T> Future<T> submit(Callable<T> task) {
    FutureTask<T> future = new FutureTask<>(task);
    execute(future, TIMEOUT_INDEFINITE);
    return future;
}

execute()执行方法:

1
2
3
4
5
6
7
8
9
10
11
@Override
public void execute(Runnable task, long startTimeout) {
    Assert.notNull(task, "Runnable must not be null");
    Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task);
    if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
        this.concurrencyThrottle.beforeAccess();
        doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
    } else {
        doExecute(taskToUse);
    }
}

doExecute()方法:

1
2
3
4
5
protected void doExecute(Runnable task) {
    Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task)
        : createThread(task));
    thread.start();
}
1
2
3
4
5
6
public Thread createThread(Runnable runnable) {
    Thread thread = new Thread(getThreadGroup(), runnable, nextThreadName());
    thread.setPriority(getThreadPriority());
    thread.setDaemon(isDaemon());
    return thread;
}

是否初始化了线程工厂,有则用工厂进行new,否则还是new。也就是说只要使用默认SimpleAsyncTaskExecutor线程池,每次执行任务就new一个新的线程。

到此这篇关于Spring中的@Async原理分析的文章就介绍到这了,更多相关@Async原理内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

蓄力AI

微信公众号搜索 “ 脚本之家 ” ,选择关注

程序猿的那些事、送书等活动等着你

原文链接:https://blog.csdn.net/it_lihongmin/article/details/102998125

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若内容造成侵权/违法违规/事实不符,请将相关资料发送至 reterry123@163.com 进行投诉反馈,一经查实,立即处理!

相关文章

  • springboot中restful风格请求的使用方法示例

    springboot中restful风格请求的使用方法示例

    RESTful是一种web软件风格,它不是标准也不是协议,它不一定要采用,只是一种风格,它倡导的是一个资源定位(url)及资源操作的风格,下面这篇文章主要给大家介绍了关于springboot中restful风格请求的使用方法,需要的朋友可以参考下
    2023-02-02
  • SpringBoot+RabbitMQ 实现死信队列的示例

    SpringBoot+RabbitMQ 实现死信队列的示例

    本文主要介绍了SpringBoot+RabbitMQ 实现死信队列的示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-06-06
  • springboot多模块项目mvn打包遇到存在依赖但却无法发现符号问题

    springboot多模块项目mvn打包遇到存在依赖但却无法发现符号问题

    在SpringBoot多模块项目中,如果遇到依赖存在但无法发现符号的问题,常见原因可能是pom.xml配置问题,例如,如果某个模块仅作为依赖而不是启动工程,不应在其pom中配置spring-boot-maven-plugin插件,因为这将影响jar包的生成方式
    2024-09-09
  • Java实现五子棋游戏

    Java实现五子棋游戏

    这篇文章主要为大家详细介绍了Java实现五子棋游戏,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-04-04
  • 浅谈ArrayList和LinkedList到底谁更快

    浅谈ArrayList和LinkedList到底谁更快

    今天给大家带来的是关于Java的相关知识,文章围绕着ArrayList和LinkedList到底谁更快展开,文中有非常详细的介绍,需要的朋友可以参考下
    2021-06-06
  • 如何使用Java爬虫批量爬取图片

    如何使用Java爬虫批量爬取图片

    这篇文章主要介绍了如何使用Java爬虫批量爬取图片,对于爬虫的入门来说,图片相对来说是比较容易获取的,因为大部分图片都不是敏感数据,所以不会遇到什么反爬措施,对于入门爬虫来说是比较合适的,需要的朋友可以参考下
    2023-04-04
  • java中下拉框select和单选按钮的回显操作

    java中下拉框select和单选按钮的回显操作

    这篇文章主要介绍了java中下拉框select和单选按钮的回显操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-10-10
  • SpringBoot 指标监控actuator的专题

    SpringBoot 指标监控actuator的专题

    未来每一个微服务在云上部署以后,我们都需要对其进行监控、追踪、审计、控制等。SpringBoot就抽取了Actuator场景,使得我们每个微服务快速引用即可获得生产级别的应用监控、审计等功能,通读本篇对大家的学习或工作具有一定的价值,需要的朋友可以参考下
    2021-11-11
  • java 常规轮询长轮询Long polling实现示例详解

    java 常规轮询长轮询Long polling实现示例详解

    这篇文章主要为大家介绍了java 常规轮询长轮询Long polling实现示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-12-12
  • 使用Mybatis如何实现删除多个数据

    使用Mybatis如何实现删除多个数据

    这篇文章主要介绍了使用Mybatis如何实现删除多个数据,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-03-03

最新评论