一篇非常好的Spring Integration 教程
作者:Full Stack Developme
Spring Integration 是 Spring 家族里专门用来搞定企业应用集成的框架,核心就是让不同系统之间能松耦合地通过消息来通信,本文给大家介绍Spring Integration 教程,感兴趣的朋友一起看看吧
一、什么是 Spring Integration
Spring Integration 是 Spring 生态系统中的一个扩展模块,用于实现企业应用集成 (EAI, Enterprise Application Integration)。它基于 Spring 框架,提供了一套声明式的适配器,用于集成不同的系统和服务。
核心特点:
- 基于消息驱动的架构
- 支持多种传输协议(HTTP, TCP, JMS, AMQP, FTP, File 等)
- 提供开箱即用的端点适配器
- 支持企业集成模式 (EIP, Enterprise Integration Patterns)
二、核心概念
1. Message
// 消息由消息头和消息体组成
public interface Message<T> {
T getPayload();
MessageHeaders getHeaders();
}
// 创建消息
Message<String> message = MessageBuilder.withPayload("Hello")
.setHeader("key", "value")
.build();2. Message Channel
消息通道用于在发送者和接收者之间传递消息。
// 点对点通道
@Bean
public MessageChannel directChannel() {
return new DirectChannel();
}
// 发布订阅通道
@Bean
public MessageChannel publishSubscribeChannel() {
return new PublishSubscribeChannel();
}
// 队列通道
@Bean
public MessageChannel queueChannel() {
return new QueueChannel(10);
}3. Message Endpoint
消息端点负责处理消息。
三、快速入门示例
Maven 依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<!-- 可选:特定协议支持 -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-http</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-file</artifactId>
</dependency>基础配置示例
@Configuration
@EnableIntegration
public class IntegrationConfig {
// 定义消息通道
@Bean
public MessageChannel inputChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel outputChannel() {
return new DirectChannel();
}
// 定义集成流程
@Bean
public IntegrationFlow simpleFlow() {
return IntegrationFlow.from(inputChannel())
.transform(String.class, s -> s.toUpperCase())
.filter(s -> s.startsWith("A"))
.handle(System.out::println)
.get();
}
}使用 @MessagingGateway
// 定义网关接口
@MessagingGateway(defaultRequestChannel = "inputChannel")
public interface SimpleGateway {
void sendMessage(String message);
@Gateway(requestChannel = "requestChannel", replyChannel = "replyChannel")
String sendAndReceive(String message);
}
// 使用网关
@Service
public class MessageService {
@Autowired
private SimpleGateway gateway;
public void send(String message) {
gateway.sendMessage(message);
}
}四、常用企业集成模式
1. 消息转换器 (Transformer)
@Bean
public IntegrationFlow transformerFlow() {
return IntegrationFlow.from("inputChannel")
.transform(new GenericTransformer<String, User>() {
@Override
public User transform(String source) {
return new User(source);
}
})
.channel("outputChannel")
.get();
}2. 消息过滤器 (Filter)
@Bean
public IntegrationFlow filterFlow() {
return IntegrationFlow.from("inputChannel")
.filter(payload -> payload instanceof User)
.filter("payload.age > 18") // SpEL 表达式
.channel("adultChannel")
.get();
}3. 消息路由器 (Router)
@Bean
public IntegrationFlow routerFlow() {
return IntegrationFlow.from("inputChannel")
.route(payload -> {
if (payload instanceof Order) return "orderChannel";
if (payload instanceof Payment) return "paymentChannel";
return "errorChannel";
})
.get();
}4. 消息拆分器 (Splitter) 和聚合器 (Aggregator)
@Bean
public IntegrationFlow splitterAggregatorFlow() {
return IntegrationFlow.from("inputChannel")
.split() // 拆分消息
.channel("splitChannel")
.aggregate() // 聚合消息
.channel("outputChannel")
.get();
}五、常用适配器示例
1. 文件适配器
@Configuration
public class FileIntegrationConfig {
// 读取文件
@Bean
public IntegrationFlow fileReaderFlow() {
return IntegrationFlow.from(
Files.inboundAdapter(new File("/input"))
.patternFilter("*.txt"),
e -> e.poller(Pollers.fixedDelay(1000))
)
.transform(File.class, File::getAbsolutePath)
.handle(System.out::println)
.get();
}
// 写入文件
@Bean
public IntegrationFlow fileWriterFlow() {
return IntegrationFlow.from("fileInputChannel")
.handle(Files.outboundAdapter(new File("/output"))
.autoCreateDirectory(true))
.get();
}
}2. HTTP 适配器
@Configuration
public class HttpIntegrationConfig {
// HTTP 入站网关
@Bean
public IntegrationFlow httpInboundFlow() {
return IntegrationFlow.from(
Http.inboundGateway("/api/message")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(String.class)
.replyTimeout(30000)
)
.transform(String.class, s -> "Processed: " + s)
.get();
}
// HTTP 出站网关
@Bean
public IntegrationFlow httpOutboundFlow() {
return IntegrationFlow.from("requestChannel")
.handle(Http.outboundGateway("https://api.example.com/data")
.httpMethod(HttpMethod.GET)
.expectedResponseType(String.class))
.channel("responseChannel")
.get();
}
}3. JMS 适配器
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jms</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
</dependency>@Configuration
public class JmsIntegrationConfig {
@Bean
public ConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory("tcp://localhost:61616");
}
// JMS 入站适配器
@Bean
public IntegrationFlow jmsInboundFlow() {
return IntegrationFlow.from(
Jms.inboundAdapter(connectionFactory())
.destination("queue.in")
)
.transform(String.class, String::toUpperCase)
.handle(message -> System.out.println("Received: " + message))
.get();
}
// JMS 出站适配器
@Bean
public IntegrationFlow jmsOutboundFlow() {
return IntegrationFlow.from("jmsOutputChannel")
.handle(Jms.outboundAdapter(connectionFactory())
.destination("queue.out"))
.get();
}
}六、高级特性
1. 错误处理
@Bean
public IntegrationFlow errorHandlingFlow() {
return IntegrationFlow.from("inputChannel")
.transform(...)
.handle(..., e -> e
.advice(ExpressionEvaluatingRequestHandlerAdvice.class)
.advice(advice -> advice
.onFailureExpression("payload.message")
.trapException(true))
)
.get();
}
// 全局错误通道
@Bean
public IntegrationFlow errorFlow() {
return IntegrationFlow.from(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
.handle(message -> {
Exception exception = (Exception) message.getPayload();
log.error("Error: ", exception);
})
.get();
}2. 消息历史
@Configuration
@EnableIntegration
@EnableMessageHistory
public class HistoryConfig {
@Bean
public IntegrationFlow historyFlow() {
return IntegrationFlow.from("inputChannel")
.transform(...)
.enrichHeaders(s -> s.header(MessageHistory.HEADER_NAME, new MessageHistory()))
.handle(...)
.get();
}
}3. 控制总线
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlow.from("controlBus")
.controlBus()
.get();
}
// 使用控制总线
@Component
public class ControlBusService {
@Autowired
@Qualifier("controlBus")
private MessageChannel controlBus;
public void stopChannel() {
controlBus.send(MessageBuilder.withPayload("@myChannel.stop()").build());
}
}七、完整示例:文件处理系统
@SpringBootApplication
@EnableIntegration
public class FileProcessingApplication {
public static void main(String[] args) {
SpringApplication.run(FileProcessingApplication.class, args);
}
}
@Configuration
public class FileProcessingFlow {
private static final Logger log = LoggerFactory.getLogger(FileProcessingFlow.class);
// 文件输入目录
@Value("${input.directory:/input}")
private String inputDirectory;
// 处理成功目录
@Value("${success.directory:/success}")
private String successDirectory;
// 处理失败目录
@Value("${failed.directory:/failed}")
private String failedDirectory;
@Bean
public IntegrationFlow fileProcessingFlow() {
return IntegrationFlow.from(
Files.inboundAdapter(new File(inputDirectory))
.patternFilter("*.csv")
.preventDuplicates(true)
.autoCreateDirectory(true),
e -> e.poller(Pollers.fixedDelay(5000)
.maxMessagesPerPoll(5)
.advice(expressionAdvice()))
)
.channel(MessageChannels.queue("processingChannel", 10))
.transform(Files.toStringTransformer()) // 文件转字符串
.split(s -> s.delimiters("\n")) // 按行拆分
.filter(line -> !line.trim().isEmpty())
.transform(line -> parseCsvLine(line)) // 解析CSV
.aggregate(aggregatorSpec -> aggregatorSpec
.releaseStrategy(new SimpleSequenceSizeReleaseStrategy())
.correlationStrategy(message -> "batch"))
.handle(message -> processBatch((List<Map<String, String>>) message.getPayload()))
.handle(Files.outboundAdapter(new File(successDirectory))
.autoCreateDirectory(true)
.fileNameGenerator(message -> generateFileName(message)))
.get();
}
// 错误处理:失败的文件移动到失败目录
@Bean
public IntegrationFlow errorHandlingFlow() {
return IntegrationFlow.from(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
.handle(message -> {
Message<?> failedMessage = (Message<?>) message.getHeaders().get("inputMessage");
File failedFile = (File) failedMessage.getPayload();
FileUtils.moveFileToDirectory(failedFile, new File(failedDirectory), true);
log.error("Failed to process file: {}", failedFile.getName());
})
.get();
}
private Map<String, String> parseCsvLine(String line) {
// CSV解析逻辑
return new HashMap<>();
}
private void processBatch(List<Map<String, String>> batch) {
// 批量处理逻辑
log.info("Processing batch of {} records", batch.size());
}
private String generateFileName(Message<?> message) {
return "processed_" + System.currentTimeMillis() + ".json";
}
@Bean
public Advice expressionAdvice() {
return new ExpressionEvaluatingRequestHandlerAdvice();
}
}八、最佳实践
- 合理使用通道类型:DirectChannel 用于同步,QueueChannel 用于缓冲,PublishSubscribeChannel 用于广播
- 避免阻塞操作:使用 QueueChannel 时注意配置合适的大小和 poller
- 错误处理:始终配置错误通道,记录异常并适当重试
- 监控和管理:使用 Spring Boot Actuator 监控集成端点
management:
endpoints:
web:
exposure:
include: integration测试:使用 @SpringIntegrationTest 进行集成测试
@SpringBootTest
@SpringIntegrationTest(noAutoStartup = {"inputChannel"})
class IntegrationFlowTest {
@Test
void testFlow() {
// 测试逻辑
}
}到此这篇关于一篇非常好的Spring Integration 教程的文章就介绍到这了,更多相关Spring Integration 教程内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
