Spring Batch大数据量处理之从入门到精通实践
作者:程序员鸭梨
文章介绍了SpringBatch的基本架构和核心配置,接着通过CSV导入数据库和数据库导出到文件两个简单任务示例进行讲解,最后介绍了如何处理多步骤复杂任务
一、Spring Batch 基础架构
1.1 核心配置
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired
private JobRepository jobRepository;
@Autowired
private PlatformTransactionManager transactionManager;
@Bean
public JobLauncher jobLauncher() throws Exception {
TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
@Bean
public JobExplorer jobExplorer(DataSource dataSource) throws Exception {
JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
factoryBean.setDataSource(dataSource);
factoryBean.afterPropertiesSet();
return factoryBean.getObject();
}
@Bean
public JobRegistry jobRegistry() {
return new MapJobRegistry();
}
@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor() {
JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
postProcessor.setJobRegistry(jobRegistry());
return postProcessor;
}
}
1.2 数据库表结构
-- Spring Batch 元数据表
-- BATCH_JOB_INSTANCE: 作业实例
-- BATCH_JOB_EXECUTION: 作业执行
-- BATCH_JOB_EXECUTION_PARAMS: 作业参数
-- BATCH_STEP_EXECUTION: 步骤执行
-- BATCH_JOB_EXECUTION_CONTEXT: 作业上下文
-- BATCH_STEP_EXECUTION_CONTEXT: 步骤上下文
-- 创建自定义监控表
CREATE TABLE batch_job_monitoring (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
job_name VARCHAR(100) NOT NULL,
job_instance_id BIGINT,
job_execution_id BIGINT,
start_time TIMESTAMP,
end_time TIMESTAMP,
status VARCHAR(20),
read_count BIGINT DEFAULT 0,
write_count BIGINT DEFAULT 0,
skip_count BIGINT DEFAULT 0,
error_message TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
二、简单任务示例
2.1 CSV 导入数据库
@Configuration
public class CsvToDatabaseJobConfig {
@Autowired
private JobRepository jobRepository;
@Autowired
private PlatformTransactionManager transactionManager;
@Bean
public Job csvToDatabaseJob() {
return new JobBuilder("csvToDatabaseJob", jobRepository)
.start(csvToDatabaseStep())
.listener(jobExecutionListener())
.build();
}
@Bean
public Step csvToDatabaseStep() {
return new StepBuilder("csvToDatabaseStep", jobRepository)
.<ProductInput, Product>chunk(1000, transactionManager)
.reader(csvItemReader())
.processor(productItemProcessor())
.writer(databaseItemWriter())
.faultTolerant()
.skipLimit(10)
.skip(ValidationException.class)
.retryLimit(3)
.retry(TransientDataAccessException.class)
.listener(stepExecutionListener())
.build();
}
@Bean
public FlatFileItemReader<ProductInput> csvItemReader() {
return new FlatFileItemReaderBuilder<ProductInput>()
.name("csvItemReader")
.resource(new FileSystemResource("input/products.csv"))
.delimited()
.names("id", "name", "description", "price", "category")
.fieldSetMapper(new BeanWrapperFieldSetMapper<>() {{
setTargetType(ProductInput.class);
}})
.linesToSkip(1) // 跳过表头
.build();
}
@Bean
public ItemProcessor<ProductInput, Product> productItemProcessor() {
return input -> {
Product product = new Product();
product.setId(input.getId());
product.setName(input.getName().trim());
product.setDescription(input.getDescription());
product.setPrice(new BigDecimal(input.getPrice()));
product.setCategory(input.getCategory());
product.setCreatedAt(LocalDateTime.now());
return product;
};
}
@Bean
public JdbcBatchItemWriter<Product> databaseItemWriter() {
return new JdbcBatchItemWriterBuilder<Product>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO products (id, name, description, price, category, created_at) " +
"VALUES (:id, :name, :description, :price, :category, :createdAt) " +
"ON DUPLICATE KEY UPDATE " +
"name = VALUES(name), description = VALUES(description), " +
"price = VALUES(price), category = VALUES(category)")
.dataSource(dataSource)
.build();
}
}
2.2 数据库导出到文件
@Configuration
public class DatabaseToFileJobConfig {
@Bean
public Job exportOrdersJob() {
return new JobBuilder("exportOrdersJob", jobRepository)
.start(exportOrdersStep())
.build();
}
@Bean
public Step exportOrdersStep() {
return new StepBuilder("exportOrdersStep", jobRepository)
.<Order, OrderOutput>chunk(500, transactionManager)
.reader(orderItemReader())
.processor(orderItemProcessor())
.writer(orderItemWriter())
.build();
}
@Bean
public JdbcPagingItemReader<Order> orderItemReader() {
return new JdbcPagingItemReaderBuilder<Order>()
.name("orderItemReader")
.dataSource(dataSource)
.queryProvider(new PagingQueryProvider() {
@Override
public void init(DataSource dataSource) {}
@Override
public String getSortKey() {
return "id";
}
@Override
public String getSelectClause() {
return "SELECT id, user_id, total_amount, status, created_at";
}
@Override
public String getFromClause() {
return "FROM orders";
}
@Override
public String getWhereClause() {
return "WHERE created_at >= :startDate AND created_at <= :endDate";
}
})
.parameterValues(Map.of("startDate", startDate, "endDate", endDate))
.pageSize(1000)
.rowMapper(new OrderRowMapper())
.build();
}
@Bean
public FlatFileItemWriter<OrderOutput> orderItemWriter() {
return new FlatFileItemWriterBuilder<OrderOutput>()
.name("orderItemWriter")
.resource(new FileSystemResource("output/orders.csv"))
.delimited()
.delimiter(",")
.names("orderId", "userId", "amount", "status", "createdDate")
.headerCallback(writer -> writer.write("OrderID,UserID,Amount,Status,CreatedDate"))
.footerCallback(writer -> writer.write("Total records exported"))
.build();
}
}
三、复杂任务处理
3.1 多步骤任务
@Configuration
public class ComplexBatchJobConfig {
@Bean
public Job orderProcessingJob() {
return new JobBuilder("orderProcessingJob", jobRepository)
.start(validateOrderStep())
.next(processPaymentStep())
.next(updateInventoryStep())
.next(sendNotificationStep())
.on("FAILED").to(errorHandlingStep())
.from(sendNotificationStep()).on("*").to(cleanupStep())
.end()
.build();
}
@Bean
public Step validateOrderStep() {
return new StepBuilder("validateOrderStep", jobRepository)
.<Order, ValidatedOrder>chunk(100, transactionManager)
.reader(pendingOrderReader())
.processor(orderValidator())
.writer(validatedOrderWriter())
.build();
}
@Bean
public Step processPaymentStep() {
return new StepBuilder("processPaymentStep", jobRepository)
.tasklet((contribution, chunkContext) -> {
// 处理支付逻辑
JobParameters params = chunkContext.getStepContext().getJobParameters();
String batchId = params.getString("batchId");
paymentService.processBatchPayments(batchId);
return RepeatStatus.FINISHED;
}, transactionManager)
.build();
}
@Bean
public Step updateInventoryStep() {
return new StepBuilder("updateInventoryStep", jobRepository)
.<OrderItem, InventoryUpdate>chunk(200, transactionManager)
.reader(orderItemReader())
.processor(inventoryProcessor())
.writer(inventoryWriter())
.build();
}
@Bean
public Flow splitFlow() {
return new FlowBuilder<SimpleFlow>("splitFlow")
.split(taskExecutor())
.add(flow1(), flow2(), flow3())
.build();
}
@Bean
public Flow flow1() {
return new FlowBuilder<Simple
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
