java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Spring Batch大数据量处理

Spring Batch大数据量处理之从入门到精通实践

作者:程序员鸭梨

文章介绍了SpringBatch的基本架构和核心配置,接着通过CSV导入数据库和数据库导出到文件两个简单任务示例进行讲解,最后介绍了如何处理多步骤复杂任务

一、Spring Batch 基础架构

1.1 核心配置

@Configuration
@EnableBatchProcessing
public class BatchConfig {

    @Autowired
    private JobRepository jobRepository;

    @Autowired
    private PlatformTransactionManager transactionManager;

    @Bean
    public JobLauncher jobLauncher() throws Exception {
        TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }

    @Bean
    public JobExplorer jobExplorer(DataSource dataSource) throws Exception {
        JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
        factoryBean.setDataSource(dataSource);
        factoryBean.afterPropertiesSet();
        return factoryBean.getObject();
    }

    @Bean
    public JobRegistry jobRegistry() {
        return new MapJobRegistry();
    }

    @Bean
    public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor() {
        JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
        postProcessor.setJobRegistry(jobRegistry());
        return postProcessor;
    }
}

1.2 数据库表结构

-- Spring Batch 元数据表
-- BATCH_JOB_INSTANCE: 作业实例
-- BATCH_JOB_EXECUTION: 作业执行
-- BATCH_JOB_EXECUTION_PARAMS: 作业参数
-- BATCH_STEP_EXECUTION: 步骤执行
-- BATCH_JOB_EXECUTION_CONTEXT: 作业上下文
-- BATCH_STEP_EXECUTION_CONTEXT: 步骤上下文

-- 创建自定义监控表
CREATE TABLE batch_job_monitoring (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    job_name VARCHAR(100) NOT NULL,
    job_instance_id BIGINT,
    job_execution_id BIGINT,
    start_time TIMESTAMP,
    end_time TIMESTAMP,
    status VARCHAR(20),
    read_count BIGINT DEFAULT 0,
    write_count BIGINT DEFAULT 0,
    skip_count BIGINT DEFAULT 0,
    error_message TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

二、简单任务示例

2.1 CSV 导入数据库

@Configuration
public class CsvToDatabaseJobConfig {

    @Autowired
    private JobRepository jobRepository;

    @Autowired
    private PlatformTransactionManager transactionManager;

    @Bean
    public Job csvToDatabaseJob() {
        return new JobBuilder("csvToDatabaseJob", jobRepository)
            .start(csvToDatabaseStep())
            .listener(jobExecutionListener())
            .build();
    }

    @Bean
    public Step csvToDatabaseStep() {
        return new StepBuilder("csvToDatabaseStep", jobRepository)
            .<ProductInput, Product>chunk(1000, transactionManager)
            .reader(csvItemReader())
            .processor(productItemProcessor())
            .writer(databaseItemWriter())
            .faultTolerant()
            .skipLimit(10)
            .skip(ValidationException.class)
            .retryLimit(3)
            .retry(TransientDataAccessException.class)
            .listener(stepExecutionListener())
            .build();
    }

    @Bean
    public FlatFileItemReader<ProductInput> csvItemReader() {
        return new FlatFileItemReaderBuilder<ProductInput>()
            .name("csvItemReader")
            .resource(new FileSystemResource("input/products.csv"))
            .delimited()
            .names("id", "name", "description", "price", "category")
            .fieldSetMapper(new BeanWrapperFieldSetMapper<>() {{
                setTargetType(ProductInput.class);
            }})
            .linesToSkip(1) // 跳过表头
            .build();
    }

    @Bean
    public ItemProcessor<ProductInput, Product> productItemProcessor() {
        return input -> {
            Product product = new Product();
            product.setId(input.getId());
            product.setName(input.getName().trim());
            product.setDescription(input.getDescription());
            product.setPrice(new BigDecimal(input.getPrice()));
            product.setCategory(input.getCategory());
            product.setCreatedAt(LocalDateTime.now());
            return product;
        };
    }

    @Bean
    public JdbcBatchItemWriter<Product> databaseItemWriter() {
        return new JdbcBatchItemWriterBuilder<Product>()
            .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
            .sql("INSERT INTO products (id, name, description, price, category, created_at) " +
                 "VALUES (:id, :name, :description, :price, :category, :createdAt) " +
                 "ON DUPLICATE KEY UPDATE " +
                 "name = VALUES(name), description = VALUES(description), " +
                 "price = VALUES(price), category = VALUES(category)")
            .dataSource(dataSource)
            .build();
    }
}

2.2 数据库导出到文件

@Configuration
public class DatabaseToFileJobConfig {

    @Bean
    public Job exportOrdersJob() {
        return new JobBuilder("exportOrdersJob", jobRepository)
            .start(exportOrdersStep())
            .build();
    }

    @Bean
    public Step exportOrdersStep() {
        return new StepBuilder("exportOrdersStep", jobRepository)
            .<Order, OrderOutput>chunk(500, transactionManager)
            .reader(orderItemReader())
            .processor(orderItemProcessor())
            .writer(orderItemWriter())
            .build();
    }

    @Bean
    public JdbcPagingItemReader<Order> orderItemReader() {
        return new JdbcPagingItemReaderBuilder<Order>()
            .name("orderItemReader")
            .dataSource(dataSource)
            .queryProvider(new PagingQueryProvider() {
                @Override
                public void init(DataSource dataSource) {}
                
                @Override
                public String getSortKey() {
                    return "id";
                }
                
                @Override
                public String getSelectClause() {
                    return "SELECT id, user_id, total_amount, status, created_at";
                }
                
                @Override
                public String getFromClause() {
                    return "FROM orders";
                }
                
                @Override
                public String getWhereClause() {
                    return "WHERE created_at >= :startDate AND created_at <= :endDate";
                }
            })
            .parameterValues(Map.of("startDate", startDate, "endDate", endDate))
            .pageSize(1000)
            .rowMapper(new OrderRowMapper())
            .build();
    }

    @Bean
    public FlatFileItemWriter<OrderOutput> orderItemWriter() {
        return new FlatFileItemWriterBuilder<OrderOutput>()
            .name("orderItemWriter")
            .resource(new FileSystemResource("output/orders.csv"))
            .delimited()
            .delimiter(",")
            .names("orderId", "userId", "amount", "status", "createdDate")
            .headerCallback(writer -> writer.write("OrderID,UserID,Amount,Status,CreatedDate"))
            .footerCallback(writer -> writer.write("Total records exported"))
            .build();
    }
}

三、复杂任务处理

3.1 多步骤任务

@Configuration
public class ComplexBatchJobConfig {

    @Bean
    public Job orderProcessingJob() {
        return new JobBuilder("orderProcessingJob", jobRepository)
            .start(validateOrderStep())
            .next(processPaymentStep())
            .next(updateInventoryStep())
            .next(sendNotificationStep())
            .on("FAILED").to(errorHandlingStep())
            .from(sendNotificationStep()).on("*").to(cleanupStep())
            .end()
            .build();
    }

    @Bean
    public Step validateOrderStep() {
        return new StepBuilder("validateOrderStep", jobRepository)
            .<Order, ValidatedOrder>chunk(100, transactionManager)
            .reader(pendingOrderReader())
            .processor(orderValidator())
            .writer(validatedOrderWriter())
            .build();
    }

    @Bean
    public Step processPaymentStep() {
        return new StepBuilder("processPaymentStep", jobRepository)
            .tasklet((contribution, chunkContext) -> {
                // 处理支付逻辑
                JobParameters params = chunkContext.getStepContext().getJobParameters();
                String batchId = params.getString("batchId");
                
                paymentService.processBatchPayments(batchId);
                
                return RepeatStatus.FINISHED;
            }, transactionManager)
            .build();
    }

    @Bean
    public Step updateInventoryStep() {
        return new StepBuilder("updateInventoryStep", jobRepository)
            .<OrderItem, InventoryUpdate>chunk(200, transactionManager)
            .reader(orderItemReader())
            .processor(inventoryProcessor())
            .writer(inventoryWriter())
            .build();
    }

    @Bean
    public Flow splitFlow() {
        return new FlowBuilder<SimpleFlow>("splitFlow")
            .split(taskExecutor())
            .add(flow1(), flow2(), flow3())
            .build();
    }

    @Bean
    public Flow flow1() {
        return new FlowBuilder<Simple

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文