SpringBoot CommandLineRunner的异步任务机制使用
作者:架构随笔录
在SpringBoot中,CommandLineRunner
本身并不是一个直接支持异步任务的机制。
CommandLineRunner
接口定义了一个在 Spring Boot 应用程序启动后立即同步执行的方法 run(String... args)
。
这意味着,通过实现 CommandLineRunner
接口定义的任务将在主线程中顺序执行,而不会创建新的线程来异步执行这些任务。
然而,如果你希望在 CommandLineRunner
中执行异步任务,你可以手动创建线程或使用 Spring 的异步执行功能。
以下是一些实现异步任务的方法。
1.概要分析
1.1 手动创建线程
在 CommandLineRunner
的 run
方法中,你可以直接创建并启动一个新的线程来执行异步任务。
这种方法简单直接,但需要注意线程管理和异常处理。
@Component public class MyCommandLineRunner implements CommandLineRunner { @Override public void run(String... args) throws Exception { new Thread(() -> { // 异步执行的代码 System.out.println("异步任务执行中..."); // 模拟耗时操作 try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } System.out.println("异步任务完成"); }).start(); // 主线程继续执行,不会等待异步任务完成 System.out.println("CommandLineRunner 执行完毕,主线程继续"); } }
1.2 使用 Spring 的异步执行功能
如果你希望利用 Spring 的异步支持来执行异步任务,你可以在 CommandLineRunner
中注入一个使用 @Async
注解的服务。但是,需要注意的是,由于 CommandLineRunner
的 run
方法本身是在 Spring 容器完全初始化之后同步执行的,因此即使你调用了一个异步服务方法,run
方法本身仍然会立即返回,不会等待异步任务完成。
首先,你需要在 Spring Boot 应用中启用异步支持,通过在启动类上添加 @EnableAsync
注解。
然后,你可以创建一个异步服务:
@Service public class AsyncService { @Async public void executeAsyncTask() { // 异步执行的代码 System.out.println("异步任务执行中..."); // 模拟耗时操作 try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } System.out.println("异步任务完成"); } }
在 CommandLineRunner
中注入并使用这个服务:
@Component public class MyCommandLineRunner implements CommandLineRunner { @Autowired private AsyncService asyncService; @Override public void run(String... args) throws Exception { asyncService.executeAsyncTask(); // 调用异步方法,但 CommandLineRunner 的 run 方法会立即返回 System.out.println("CommandLineRunner 执行完毕,主线程继续,不会等待异步任务完成"); } }
虽然 CommandLineRunner
本身不支持异步执行,但你可以通过手动创建线程或使用 Spring 的异步支持来在 CommandLineRunner
中执行异步任务。然而,需要注意的是,CommandLineRunner
的 run
方法本身仍然是同步执行的,它不会等待任何异步任务完成。
如果你的应用程序依赖于异步任务的结果,你可能需要采用其他机制(如 Future
、CompletableFuture
或消息队列)来管理异步任务的执行和结果。
2.核心原理分析
org.springframework.boot.CommandLineRunner
是 Spring Boot 框架中的一个核心接口,其原理分析可以从以下几个方面进行。
2.1 接口定义与功能
CommandLineRunner
是一个函数式接口(Functional Interface),它只定义了一个抽象方法 run(String... args)
。
这个方法在 Spring Boot 应用程序启动完成后被调用,允许开发者执行一些初始化操作或启动后的任务。这些任务可能包括数据初始化、缓存预热、日志记录等。
2.2 执行时机
当 Spring Boot 应用程序启动时,Spring 容器会完成一系列的初始化操作,包括 Bean 的加载和依赖注入等。
在所有 Spring Bean 都初始化完成后,Spring Boot 会查找所有实现了 CommandLineRunner
接口的 Bean,并依次调用它们的 run
方法。
这意味着 CommandLineRunner
的执行时机是在 Spring 上下文准备好之后,但在应用程序对外提供服务之前。
2.3 命令行参数传递
CommandLineRunner
的 run
方法接收一个 String... args
参数,这个参数包含了启动应用程序时传递给它的命令行参数。
这使得开发者可以根据命令行参数的不同来执行不同的初始化逻辑。
2.4 实现与注册
要使用 CommandLineRunner
,开发者需要创建一个类并实现这个接口,然后重写 run
方法以定义自己的初始化逻辑。
为了让 Spring 容器能够扫描到这个实现类并将其注册为一个 Bean,通常会在类上添加 @Component
或其他类似的注解(如 @Service
、@Repository
等)。此外,也可以通过编程方式在配置类中显式地注册这个 Bean。
2.5 执行顺序
如果应用程序中有多个实现了 CommandLineRunner
接口的类,那么它们的 run
方法将按照一定的顺序执行。
默认情况下,执行顺序取决于 Spring 容器注册这些 Bean 的顺序。但是,开发者可以通过 @Order
注解或实现 Ordered
接口来指定执行顺序。
@Order
注解的值越小,优先级越高,相应的 run
方法就会越早执行。
2.6 与 ApplicationRunner 的区别
值得注意的是,Spring Boot 还提供了另一个类似的接口 ApplicationRunner
,它也用于在应用程序启动后执行初始化任务。与 CommandLineRunner
不同的是,ApplicationRunner
的 run
方法接收一个 ApplicationArguments
参数而不是 String... args
。
ApplicationArguments
提供了对命令行参数的更高级别访问,包括选项和非选项参数等。此外,如果同时存在 CommandLineRunner
和 ApplicationRunner
的实现,那么 CommandLineRunner
的实现会先于 ApplicationRunner
的实现被调用。
2.7 应用场景
CommandLineRunner
适用于需要在应用程序启动后立即执行的任务场景,如数据初始化、配置加载、缓存预热等。通过使用 CommandLineRunner
,开发者可以确保这些任务在应用程序对外提供服务之前完成,从而提高应用程序的性能和用户体验。
综上所述,org.springframework.boot.CommandLineRunner
是 Spring Boot 框架中用于执行启动后任务的强大机制,它通过简单的接口定义和灵活的注册方式,为开发者提供了方便、高效的初始化操作手段。
3.部分源码分析
3.1 启动Spring Boot应用程序
/** * 启动应用程序。 * * @param args 命令行参数 * @return ConfigurableApplicationContext 应用程序上下文 */ public ConfigurableApplicationContext run(String... args) { // 记录应用程序启动时间 long startTime = System.nanoTime(); // 创建引导上下文 DefaultBootstrapContext bootstrapContext = createBootstrapContext(); ConfigurableApplicationContext context = null; // 配置无头模式属性 configureHeadlessProperty(); // 获取启动监听器 SpringApplicationRunListeners listeners = getRunListeners(args); // 通知监听器应用程序即将启动 listeners.starting(bootstrapContext, this.mainApplicationClass); try { // 创建并配置应用参数 ApplicationArguments applicationArguments = new DefaultApplicationArguments(args); ConfigurableEnvironment environment = prepareEnvironment(listeners, bootstrapContext, applicationArguments); // 配置忽略BeanInfo的设置 configureIgnoreBeanInfo(environment); // 打印启动横幅 Banner printedBanner = printBanner(environment); // 创建应用程序上下文 context = createApplicationContext(); // 设置应用启动器 context.setApplicationStartup(this.applicationStartup); // 准备应用程序上下文 prepareContext(bootstrapContext, context, environment, listeners, applicationArguments, printedBanner); // 刷新上下文,使配置生效 refreshContext(context); // 启动后配置 afterRefresh(context, applicationArguments); // 计算应用程序启动时间 Duration timeTakenToStartup = Duration.ofNanos(System.nanoTime() - startTime); // 如果启用了启动信息日志,则记录启动信息 if (this.logStartupInfo) { new StartupInfoLogger(this.mainApplicationClass).logStarted(getApplicationLog(), timeTakenToStartup); } // 通知监听器应用程序已启动 listeners.started(context, timeTakenToStartup); // 调用应用程序运行者 callRunners(context, applicationArguments); } catch (Throwable ex) { // 处理启动失败 handleRunFailure(context, ex, listeners); throw new IllegalStateException(ex); } try { // 计算应用程序就绪时间 Duration timeTakenToReady = Duration.ofNanos(System.nanoTime() - startTime); // 通知监听器应用程序已就绪 listeners.ready(context, timeTakenToReady); } catch (Throwable ex) { // 处理就绪失败 handleRunFailure(context, ex, null); throw new IllegalStateException(ex); } // 返回应用程序上下文 return context; }
3.2 调用所有的Runner实现类
/** * 调用所有的Runner实现类。 * * 本方法的目的是遍历ApplicationContext中所有的Runner实例,并根据它们的类型分别调用相应的方法。 * Runner和CommandLineRunner是Spring Boot提供的一组接口,用于在应用程序启动后执行一些自定义的初始化逻辑。 * 这里通过判断Runner的类型,来决定是调用ApplicationRunner还是CommandLineRunner的方法,从而实现对不同类型Runner的兼容处理。 * * @param context Spring应用上下文,用于获取BeanProvider以获取Runner实例。 * @param args 命令行参数,传递给每个Runner的调用方法。 */ private void callRunners(ApplicationContext context, ApplicationArguments args) { // 通过BeanProvider获取所有Runner类型的bean,并以有序的方式遍历它们 context.getBeanProvider(Runner.class).orderedStream().forEach((runner) -> { // 如果runner是ApplicationRunner类型,则调用callRunner方法,并传入ApplicationRunner和命令行参数 if (runner instanceof ApplicationRunner) { callRunner((ApplicationRunner) runner, args); } // 如果runner是CommandLineRunner类型,则同样调用callRunner方法,并传入CommandLineRunner和命令行参数 if (runner instanceof CommandLineRunner) { callRunner((CommandLineRunner) runner, args); } }); }
4.典型的应用场景分析
Seata中的TM(Transaction Manager,事务管理器)和RM(Resource Manager,资源管理器)是分布式事务框架中的关键角色,它们各自承担着特定的职责,以确保分布式事务的一致性和完整性。
4.1 TM(Transaction Manager,事务管理器)
4.1.1 定义与职责
- (1)TM负责定义全局事务的范围,即开始全局事务、提交或回滚全局事务。它是分布式事务的发起者和终结者,类似于本地事务中的begin…commit…rollback操作,但针对的是全局的分布式事务。
- (2)在Seata框架中,TM与业务系统集成在一起,作为客户端存在。当业务操作需要跨多个服务或数据库时,TM会启动一个全局事务,并管理这个事务的生命周期。
4.1.2 工作流程
- (1)TM向TC(Transaction Coordinator,事务协调者)请求开启一个全局事务,TC生成一个全局唯一的事务ID(XID)并返回给TM。
- (2)TM携带XID进行远程服务调用,XID在微服务调用链中传播,确保所有参与的分支事务都能被正确关联到全局事务中。
- (3)当业务操作完成后,TM根据业务逻辑向TC发起全局事务的提交或回滚请求。
- (4)TC根据各分支事务的执行结果,决定全局事务的提交或回滚,并通知所有RM进行相应的操作。
4.2 RM(Resource Manager,资源管理器)
4.2.1 定义与职责
- (1)RM负责管理分支事务处理的资源,如数据库连接、消息队列等。它是分布式事务中具体执行操作的组件。
- (2)RM与TC进行通信,注册分支事务、报告分支事务的状态,并根据TC的指令驱动分支事务的提交或回滚。
- (3)在Seata框架中,RM同样与业务系统集成在一起,作为客户端存在。每个参与全局事务的服务或数据库操作,都会有一个对应的RM来管理。
4.2.2 工作流程
- (1)在执行具体业务操作之前,RM会向TC注册分支事务,并将其纳入全局事务的管辖范围。
- (2)RM执行本地事务操作,如数据库更新、消息发送等,并确保这些操作是可回滚和持久化的。
- (3)RM将分支事务的执行结果(提交或回滚)上报给TC。
- (4)当TC收到TM的全局事务提交或回滚请求时,它会根据各分支事务的状态决定全局事务的结果,并通知所有RM进行相应的提交或回滚操作。
4.3 Seata RM和TM与Seata Server之间的RPC通信
在Seata中,TM(Transaction Manager,事务管理器)与Seata Server(即TC,Transaction Coordinator,事务协调者)之间的通信是通过RPC(Remote Procedure Call,远程过程调用)实现的。
RPC是一种允许程序在网络上调用远程计算机上程序的技术,就像调用本地计算机上的程序一样。
4.3.1 TM与Seata Server之间的RPC通信
(1)建立连接
- TM在启动时会尝试与Seata Server建立长连接。这个连接是通过Netty等网络通信框架实现的,Netty提供了高效、异步的网络通信能力。
- 在建立连接的过程中,TM会向Seata Server发送注册请求,包括应用ID、事务组名称等信息,以便Seata Server能够识别和管理该TM。
(2)事务管理
- 一旦连接建立,TM就可以通过RPC调用Seata Server提供的服务来管理全局事务。
- 例如,当业务操作需要跨多个服务或数据库时,TM会向Seata Server请求开启一个全局事务,并获取一个全局唯一的事务ID(XID)。
- 在执行远程服务调用时,TM会将XID携带在调用中,以便参与的RM能够识别并将分支事务注册到全局事务中。
- 当业务操作完成后,TM会根据业务逻辑向Seata Server发起全局事务的提交或回滚请求。
(3)通信协议
- Seata使用自定义的通信协议来进行RPC通信,该协议支持事务的创建、提交、回滚等操作。
- 通信过程中,Seata还实现了心跳检测、超时重试等机制来确保通信的可靠性和稳定性。
(4)性能优化
- 为了提高RPC通信的性能,Seata采用了多种优化策略,如使用Netty的主从Reactor多线程模型来处理并发请求、采用批量发送请求来减少网络开销等。
4.3.2 RM与Seata Server之间的RPC通信
在Seata中,RM负责管理分支事务处理的资源,如数据库连接等。
当RM执行分支事务时,它需要与Seata Server进行通信,以注册分支事务、报告分支事务的状态,并根据Seata Server的指令驱动分支事务的提交或回滚。
这种通信是通过RPC机制实现的,它允许RM远程调用Seata Server上的服务。
(1)建立连接
- 当RM启动时,它会根据配置尝试与Seata Server建立长连接。这个连接是通过Netty等网络通信框架实现的,Netty提供了高效、异步的网络通信能力。
- 在建立连接的过程中,RM会向Seata Server发送注册请求,包括应用ID、事务组名称等信息,以便Seata Server能够识别和管理该RM。
(2)分支事务注册
- 当RM执行一个分支事务时,它会向Seata Server注册该分支事务。注册过程中,RM会携带全局事务ID(XID)等信息,以便Seata Server能够将该分支事务关联到相应的全局事务中。
- Seata Server在收到注册请求后,会为该分支事务分配一个唯一的分支事务ID,并将其注册到全局事务中。
(3)状态报告与指令执行
- 在分支事务执行过程中,RM会定期向Seata Server报告分支事务的状态,如正在执行、已提交、已回滚等。
- 当全局事务需要提交或回滚时,Seata Server会根据各分支事务的状态和结果,向RM发送相应的提交或回滚指令。
- RM在收到指令后,会执行相应的操作,如提交本地事务、回滚本地事务等,并将执行结果报告给Seata Server。
(4)心跳检测与异常处理
- 为了保持连接的活跃状态,RM会定期向Seata Server发送心跳消息。
- 如果Seata Server在一段时间内没有收到RM的心跳消息,它可能会认为RM已经离线,并采取相应的异常处理措施,如重试连接、记录日志等。
Seata RM与Seata Server之间的RPC通信是Seata分布式事务框架中的重要组成部分。通过高效的RPC通信机制,RM能够远程调用Seata Server提供的服务来管理分支事务,确保分布式事务的一致性和完整性。同时,Seata还通过多种优化策略来提高RPC通信的性能和可靠性。
4.4 Seata Server利用SpringBoot CommandLineRunner启动服务端通信渠道
在Seata Server中,ServerRunner
类是一个重要的组件,它继承自Spring Boot的CommandLineRunner
接口。
这意味着在Spring Boot应用启动后,ServerRunner
的run()
方法会被自动执行。这种方法通常用于在应用启动后立即执行一段特定的代码,比如初始化资源、启动服务等。
ServerRunner
类的主要职责是初始化Netty通信渠道,即NettyRemotingServer
。
Netty是一个高性能的异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端。
在Seata中,Netty用于节点间的通信,包括事务协调器(TC)、事务管理器(TM)和资源管理器(RM)之间的通信。
以下是ServerRunner
类中run()
方法可能包含的逻辑的一个简单示例:
@Override public void run(String... args) throws Exception { // 初始化Netty通信服务器 NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(); nettyRemotingServer.setPort(serverPort); // 设置服务器端口 nettyRemotingServer.setHost(serverHost); // 设置服务器主机地址 nettyRemotingServer.start(); // 启动服务器 // 其他初始化逻辑,比如注册服务等 }
在这个例子中,run()
方法首先创建了一个NettyRemotingServer
实例,并设置了服务器的主机地址和端口号。然后,它调用start()
方法来启动Netty服务器,这样Seata Server就可以监听来自其他节点的请求了。
总的来说
ServerRunner
类在Seata Server中扮演着重要的角色,它负责初始化Netty通信渠道,为Seata节点间的通信提供基础设施。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。