Spring中的@Async原理分析
前言
之前编程都是自定义new ThreadPoolExecutor(。。。),并调用invokeAll等进行并发编程。
后面发现只要在方法上添加@Async注解,并使用@EnableAsync进行开启,并且@since为Spring 3.1版本。
我使用的Spring 5版本的,默认会使用SimpleAsyncTaskExecutor类型。就是一个大坑。
1、@Async
1 2 3 4 5 6 7 8 9 10 11 | @Import (AsyncConfigurationSelector. class ) public @interface EnableAsync { Class<? extends Annotation> annotation() default Annotation. class ; boolean proxyTargetClass() default false ; AdviceMode mode() default AdviceMode.PROXY; int order() default Ordered.LOWEST_PRECEDENCE; } |
与之前分析@EnableTransactionManagement一样,属性都差不多。使用@Import方式将AsyncConfigurationSelector注册为bean。
实现了ImportSelector接口
1 2 3 4 5 6 7 8 9 10 | public String[] selectImports(AdviceMode adviceMode) { switch (adviceMode) { case PROXY: return new String[] {ProxyAsyncConfiguration. class .getName()}; case ASPECTJ: return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME}; default : return null ; } } |
@EnableAsync上没有配置mode,则默认使用jdk方式实现。返回ProxyAsyncConfiguration将其注入为bean。
2、ProxyAsyncConfiguration
1)、实现ImportAware
则在ProxyAsyncConfiguration初始化为bean时,会进行回调,实现方法如下:
1 2 3 4 5 6 7 8 9 | public void setImportMetadata(AnnotationMetadata importMetadata) { this .enableAsync = AnnotationAttributes.fromMap( importMetadata.getAnnotationAttributes(EnableAsync. class .getName(), false )); if ( this .enableAsync == null ) { throw new IllegalArgumentException( "@EnableAsync is not present on importing class " + importMetadata.getClassName()); } } |
获取@EnableAsync注解上的配置信息,并保存到 enableAsync属性中。
2)、AsyncAnnotationBeanPostProcessor
将 AsyncAnnotationBeanPostProcessor初始化为bean
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | @Bean (name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME) @Role (BeanDefinition.ROLE_INFRASTRUCTURE) public AsyncAnnotationBeanPostProcessor asyncAdvisor() { Assert.notNull( this .enableAsync, "@EnableAsync annotation metadata was not injected" ); AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor(); bpp.configure( this .executor, this .exceptionHandler); Class<? extends Annotation> customAsyncAnnotation = this .enableAsync.getClass( "annotation" ); if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync. class , "annotation" )) { bpp.setAsyncAnnotationType(customAsyncAnnotation); } bpp.setProxyTargetClass( this .enableAsync.getBoolean( "proxyTargetClass" )); bpp.setOrder( this .enableAsync.<Integer>getNumber( "order" )); return bpp; } |
3、AsyncAnnotationBeanPostProcessor
实现了很多Aware接口,注入了BeanFactory和BeanClassLoader,主要是在setBeanFactory方法中:
1 2 3 4 5 6 7 8 9 10 | public void setBeanFactory(BeanFactory beanFactory) { super .setBeanFactory(beanFactory); AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor( this .executor, this .exceptionHandler); if ( this .asyncAnnotationType != null ) { advisor.setAsyncAnnotationType( this .asyncAnnotationType); } advisor.setBeanFactory(beanFactory); this .advisor = advisor; } |
new 了一个AsyncAnnotationAdvisor,而线程池和异常处理器是从初始化 ProxyAsyncConfiguration时传入的,默认都为null。构造器如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | public AsyncAnnotationAdvisor( @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>( 2 ); asyncAnnotationTypes.add(Async. class ); try { asyncAnnotationTypes.add((Class<? extends Annotation>) ClassUtils.forName( "javax.ejb.Asynchronous" , AsyncAnnotationAdvisor. class .getClassLoader())); } catch (ClassNotFoundException ex) { // If EJB 3.1 API not present, simply ignore. } this .advice = buildAdvice(executor, exceptionHandler); this .pointcut = buildPointcut(asyncAnnotationTypes); } |
buildAdvice:构建拦截器
1 2 3 4 5 6 7 | protected Advice buildAdvice( @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor( null ); interceptor.configure(executor, exceptionHandler); return interceptor; } |
初始化了一个AnnotationAsyncExecutionInterceptor 拦截器,后续进行分析。使用有参构造,但是异步任务的线程池为null。
buildPointcut:根据Async构建拦截匹配点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) { ComposablePointcut result = null ; // asyncAnnotationTypes默认只要Async类型 for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) { Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true ); Pointcut mpc = new AnnotationMatchingPointcut( null , asyncAnnotationType, true ); if (result == null ) { // result肯定是null,先添加Class类型的切点匹配器 result = new ComposablePointcut(cpc); } else { result.union(cpc); } // 再添加Method类型的切点拦截器 result = result.union(mpc); } return (result != null ? result : Pointcut.TRUE); } |
默认情况下 asyncAnnotationTypes中只要Async类型,则初始化了配置Async的类和方法的 匹配拦截器(AnnotationMatchingPointcut),并且都添加到ComposablePointcut中。
一切初始化完成后,在每个bean的生命周期都会进行回调 postProcessAfterInitialization方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | public Object postProcessAfterInitialization(Object bean, String beanName) { if ( this .advisor == null || bean instanceof AopInfrastructureBean) { // Ignore AOP infrastructure such as scoped proxies. return bean; } if (bean instanceof Advised) { Advised advised = (Advised) bean; if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) { // Add our local Advisor to the existing proxy's Advisor chain... if ( this .beforeExistingAdvisors) { advised.addAdvisor( 0 , this .advisor); } else { advised.addAdvisor( this .advisor); } return bean; } } if (isEligible(bean, beanName)) { ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName); if (!proxyFactory.isProxyTargetClass()) { evaluateProxyInterfaces(bean.getClass(), proxyFactory); } proxyFactory.addAdvisor( this .advisor); customizeProxyFactory(proxyFactory); return proxyFactory.getProxy(getProxyClassLoader()); } // No proxy needed. return bean; } |
1 2 3 4 5 6 | protected ProxyFactory prepareProxyFactory(Object bean, String beanName) { ProxyFactory proxyFactory = new ProxyFactory(); proxyFactory.copyFrom( this ); proxyFactory.setTarget(bean); return proxyFactory; } |
4、AnnotationAsyncExecutionInterceptor
显然核心实现在 invoke方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | public Object invoke( final MethodInvocation invocation) throws Throwable { Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null ); Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass); final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod); AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod); if (executor == null ) { throw new IllegalStateException( "No executor specified and no default executor set on " + " AsyncExecutionInterceptor either" ); } Callable<Object> task = () -> { try { Object result = invocation.proceed(); if (result instanceof Future) { return ((Future<?>) result).get(); } } catch (ExecutionException ex) { handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments()); } catch (Throwable ex) { handleError(ex, userDeclaredMethod, invocation.getArguments()); } return null ; }; return doSubmit(task, executor, invocation.getMethod().getReturnType()); } |
先获取执行的方法信息,再判断执行的异步线程池,再讲任务提交给线程池。
1)、获取线程池(determineAsyncExecutor)
之前初始化的时候,传入的线程池为null,则:
1 2 3 4 | public AsyncExecutionAspectSupport( @Nullable Executor defaultExecutor) { this .defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor( this .beanFactory)); this .exceptionHandler = SingletonSupplier.of(SimpleAsyncUncaughtExceptionHandler:: new ); } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | protected Executor getDefaultExecutor( @Nullable BeanFactory beanFactory) { if (beanFactory != null ) { try { // Search for TaskExecutor bean... not plain Executor since that would // match with ScheduledExecutorService as well, which is unusable for // our purposes here. TaskExecutor is more clearly designed for it. return beanFactory.getBean(TaskExecutor. class ); } catch (NoUniqueBeanDefinitionException ex) { logger.debug( "Could not find unique TaskExecutor bean" , ex); try { return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor. class ); } catch (NoSuchBeanDefinitionException ex2) { if (logger.isInfoEnabled()) { logger.info( "More than one TaskExecutor bean found within the context, and none is named " + "'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " + "as an alias) in order to use it for async processing: " + ex.getBeanNamesFound()); } } } catch (NoSuchBeanDefinitionException ex) { logger.debug( "Could not find default TaskExecutor bean" , ex); try { return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor. class ); } catch (NoSuchBeanDefinitionException ex2) { logger.info( "No task executor bean found for async processing: " + "no bean of type TaskExecutor and no bean named 'taskExecutor' either" ); } // Giving up -> either using local default executor or none at all... } } return null ; } |
beanFactory.getBean(TaskExecutor.class)
最后是获取了BeanFactory中的TaskExecutor的子类的bean(可能不存在)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | protected AsyncTaskExecutor determineAsyncExecutor(Method method) { AsyncTaskExecutor executor = this .executors.get(method); if (executor == null ) { Executor targetExecutor; String qualifier = getExecutorQualifier(method); if (StringUtils.hasLength(qualifier)) { targetExecutor = findQualifiedExecutor( this .beanFactory, qualifier); } else { targetExecutor = this .defaultExecutor.get(); } if (targetExecutor == null ) { return null ; } executor = (targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor)); this .executors.put(method, executor); } return executor; } |
使用本地缓存ConcurrentHashMap, key为Methed, value为线程池。
1、先获取执行的方法的@Async的value值
1 2 3 4 5 6 7 8 9 | protected String getExecutorQualifier(Method method) { // Maintainer's note: changes made here should also be made in // AnnotationAsyncExecutionAspect#getExecutorQualifier Async async = AnnotatedElementUtils.findMergedAnnotation(method, Async. class ); if (async == null ) { async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async. class ); } return (async != null ? async.value() : null ); } |
如果获取到配置的值(如定义方法时为:@Async("order") ),则获取正在的线程池
1 2 3 4 5 6 7 | protected Executor findQualifiedExecutor( @Nullable BeanFactory beanFactory, String qualifier) { if (beanFactory == null ) { throw new IllegalStateException( "BeanFactory must be set on " + getClass().getSimpleName() + " to access qualified executor '" + qualifier + "'" ); } return BeanFactoryAnnotationUtils.qualifiedBeanOfType(beanFactory, Executor. class , qualifier); } |
2、如果@Async上没有配置,则获取默认值
targetExecutor = this.defaultExecutor.get();
就是之前从BeanFactory中获取TaskExecutor.class类型的实现,当前版本为spring5,,获取到的类型为SimpleAsyncTaskExecutor
2)、执行任务(doSubmit)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) { if (completableFuturePresent) { Future<Object> result = AsyncExecutionAspectSupport.CompletableFutureDelegate .processCompletableFuture(returnType, task, executor); if (result != null ) { return result; } } if (ListenableFuture. class .isAssignableFrom(returnType)) { return ((AsyncListenableTaskExecutor)executor).submitListenable(task); } else if (Future. class .isAssignableFrom(returnType)) { return executor.submit(task); } else { executor.submit(task); return null ; } } |
根据我们定义的方法的返回值进行处理,返回值可以是 null、Future、Spring的AsyncResult是ListenableFuture的子类。
5、SimpleAsyncTaskExecutor
如果使用@Async没有配置线程池,并且没有给AnnotationAsyncExecutionInterceptor设置线程池,则调用时就是一个坑,每次创建一个线程。
submit()方法:
1 2 3 4 5 6 | @Override public <T> Future<T> submit(Callable<T> task) { FutureTask<T> future = new FutureTask<>(task); execute(future, TIMEOUT_INDEFINITE); return future; } |
execute()执行方法:
1 2 3 4 5 6 7 8 9 10 11 | @Override public void execute(Runnable task, long startTimeout) { Assert.notNull(task, "Runnable must not be null" ); Runnable taskToUse = ( this .taskDecorator != null ? this .taskDecorator.decorate(task) : task); if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) { this .concurrencyThrottle.beforeAccess(); doExecute( new ConcurrencyThrottlingRunnable(taskToUse)); } else { doExecute(taskToUse); } } |
doExecute()方法:
1 2 3 4 5 | protected void doExecute(Runnable task) { Thread thread = ( this .threadFactory != null ? this .threadFactory.newThread(task) : createThread(task)); thread.start(); } |
1 2 3 4 5 6 | public Thread createThread(Runnable runnable) { Thread thread = new Thread(getThreadGroup(), runnable, nextThreadName()); thread.setPriority(getThreadPriority()); thread.setDaemon(isDaemon()); return thread; } |
是否初始化了线程工厂,有则用工厂进行new,否则还是new。也就是说只要使用默认SimpleAsyncTaskExecutor线程池,每次执行任务就new一个新的线程。
到此这篇关于Spring中的@Async原理分析的文章就介绍到这了,更多相关@Async原理内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
微信公众号搜索 “ 脚本之家 ” ,选择关注
程序猿的那些事、送书等活动等着你
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若内容造成侵权/违法违规/事实不符,请将相关资料发送至 reterry123@163.com 进行投诉反馈,一经查实,立即处理!
相关文章
springboot多模块项目mvn打包遇到存在依赖但却无法发现符号问题
在SpringBoot多模块项目中,如果遇到依赖存在但无法发现符号的问题,常见原因可能是pom.xml配置问题,例如,如果某个模块仅作为依赖而不是启动工程,不应在其pom中配置spring-boot-maven-plugin插件,因为这将影响jar包的生成方式2024-09-09java 常规轮询长轮询Long polling实现示例详解
这篇文章主要为大家介绍了java 常规轮询长轮询Long polling实现示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪2022-12-12
最新评论