java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Spring Batch 数据处理

Spring Batch 数据处理的实现

作者:程序员鸭梨

本文主要介绍了Spring Batch 数据处理的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

一、Spring Batch 核心概念

Spring Batch 是 Spring 生态系统中用于批处理的框架,它提供了强大的批处理功能,支持大规模数据处理。

1.1 核心概念

1.2 Spring Batch 的优势

二、Spring Batch 配置

2.1 基本配置

@Configuration
@EnableBatchProcessing
public class BatchConfig {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Bean
    public ItemReader<User> userItemReader() {
        // 从数据库读取数据
        return new JdbcCursorItemReaderBuilder<User>()
            .name("userItemReader")
            .dataSource(dataSource)
            .sql("SELECT id, name, email FROM users WHERE status = 'ACTIVE'")
            .rowMapper(new BeanPropertyRowMapper<>(User.class))
            .build();
    }
    @Bean
    public ItemProcessor<User, UserDTO> userItemProcessor() {
        return user -> {
            UserDTO dto = new UserDTO();
            dto.setId(user.getId());
            dto.setName(user.getName().toUpperCase());
            dto.setEmail(user.getEmail().toLowerCase());
            return dto;
        };
    }
    @Bean
    public ItemWriter<UserDTO> userItemWriter() {
        // 写入到文件
        return items -> {
            for (UserDTO item : items) {
                System.out.println("Processing user: " + item.getName());
                // 写入到文件或其他目标
            }
        };
    }
    @Bean
    public Step processUserStep() {
        return stepBuilderFactory.get("processUserStep")
            .<User, UserDTO>chunk(10)
            .reader(userItemReader())
            .processor(userItemProcessor())
            .writer(userItemWriter())
            .build();
    }
    @Bean
    public Job processUserJob() {
        return jobBuilderFactory.get("processUserJob")
            .incrementer(new RunIdIncrementer())
            .flow(processUserStep())
            .end()
            .build();
    }
}

2.2 数据源配置

@Configuration
public class DataSourceConfig {
    @Bean
    public DataSource dataSource() {
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl("jdbc:mysql://localhost:3306/batch_db");
        config.setUsername("root");
        config.setPassword("password");
        config.setMaximumPoolSize(10);
        return new HikariDataSource(config);
    }
    @Bean
    public JdbcTemplate jdbcTemplate(DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    }
}

2.3 作业仓库配置

@Configuration
public class JobRepositoryConfig {
    @Bean
    public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception {
        JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
        factory.setDataSource(dataSource);
        factory.setTransactionManager(transactionManager);
        factory.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE");
        factory.setTablePrefix("BATCH_");
        factory.setMaxVarCharLength(1000);
        return factory.getObject();
    }
    @Bean
    public PlatformTransactionManager transactionManager(DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }
    @Bean
    public JobLauncher jobLauncher(JobRepository jobRepository) throws Exception {
        SimpleJobLauncher launcher = new SimpleJobLauncher();
        launcher.setJobRepository(jobRepository);
        launcher.setTaskExecutor(new SimpleAsyncTaskExecutor());
        return launcher;
    }
}

三、ItemReader 实现

3.1 数据库读取

@Bean
public ItemReader<Customer> customerItemReader(DataSource dataSource) {
    return new JdbcPagingItemReaderBuilder<Customer>()
        .name("customerItemReader")
        .dataSource(dataSource)
        .selectClause("SELECT id, first_name, last_name, email, phone")
        .fromClause("FROM customers")
        .whereClause("WHERE last_updated > :lastUpdated")
        .parameterValues(Collections.singletonMap("lastUpdated", LocalDateTime.now().minusDays(1)))
        .sortKeys(Collections.singletonMap("id", Order.ASCENDING))
        .rowMapper(new BeanPropertyRowMapper<>(Customer.class))
        .pageSize(100)
        .build();
}

3.2 文件读取

@Bean
public ItemReader<Product> productItemReader() {
    return new FlatFileItemReaderBuilder<Product>()
        .name("productItemReader")
        .resource(new ClassPathResource("products.csv"))
        .delimited()
        .names("id", "name", "price", "quantity")
        .fieldSetMapper(fieldSet -> {
            Product product = new Product();
            product.setId(fieldSet.readLong("id"));
            product.setName(fieldSet.readString("name"));
            product.setPrice(fieldSet.readBigDecimal("price"));
            product.setQuantity(fieldSet.readInt("quantity"));
            return product;
        })
        .build();
}

3.3 自定义读取器

public class CustomItemReader implements ItemReader<String> {
    private final List<String> items;
    private int index = 0;
    public CustomItemReader(List<String> items) {
        this.items = items;
    }
    @Override
    public String read() {
        if (index < items.size()) {
            return items.get(index++);
        }
        return null;
    }
}
@Bean
public ItemReader<String> customItemReader() {
    List<String> items = Arrays.asList("item1", "item2", "item3", "item4", "item5");
    return new CustomItemReader(items);
}

四、ItemProcessor 实现

4.1 基本处理器

public class ProductProcessor implements ItemProcessor<Product, ProductDTO> {
    @Override
    public ProductDTO process(Product item) {
        ProductDTO dto = new ProductDTO();
        dto.setId(item.getId());
        dto.setName(item.getName());
        dto.setPrice(item.getPrice());
        dto.setQuantity(item.getQuantity());
        dto.setTotalValue(item.getPrice().multiply(BigDecimal.valueOf(item.getQuantity())));
        return dto;
    }
}
@Bean
public ItemProcessor<Product, ProductDTO> productProcessor() {
    return new ProductProcessor();
}

4.2 条件处理

public class OrderProcessor implements ItemProcessor<Order, Order> {
    @Override
    public Order process(Order item) {
        if (item.getStatus().equals(OrderStatus.PENDING)) {
            item.setStatus(OrderStatus.PROCESSED);
            item.setProcessedAt(LocalDateTime.now());
            return item;
        }
        return null; // 跳过非待处理订单
    }
}
@Bean
public ItemProcessor<Order, Order> orderProcessor() {
    return new OrderProcessor();
}

4.3 复合处理器

public class CompositeItemProcessor<T, R> implements ItemProcessor<T, R> {
    private final List<ItemProcessor> processors;
    public CompositeItemProcessor(List<ItemProcessor> processors) {
        this.processors = processors;
    }
    @Override
    public R process(T item) {
        Object result = item;
        for (ItemProcessor processor : processors) {
            result = processor.process(result);
            if (result == null) {
                return null;
            }
        }
        return (R) result;
    }
}
@Bean
public ItemProcessor<Customer, CustomerDTO> customerProcessor() {
    List<ItemProcessor> processors = new ArrayList<>();
    processors.add(new ValidationProcessor());
    processors.add(new TransformationProcessor());
    processors.add(new EnrichmentProcessor());
    return new CompositeItemProcessor<>(processors);
}

五、ItemWriter 实现

5.1 数据库写入

@Bean
public ItemWriter<CustomerDTO> customerItemWriter(DataSource dataSource) {
    return new JdbcBatchItemWriterBuilder<CustomerDTO>()
        .dataSource(dataSource)
        .sql("INSERT INTO customer_processed (id, first_name, last_name, email, processed_at) VALUES (:id, :firstName, :lastName, :email, :processedAt)")
        .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
        .build();
}

5.2 文件写入

@Bean
public ItemWriter<ProductDTO> productItemWriter() {
    return new FlatFileItemWriterBuilder<ProductDTO>()
        .name("productItemWriter")
        .resource(new FileSystemResource("output/products-processed.csv"))
        .delimited()
        .names("id", "name", "price", "quantity", "totalValue")
        .headerCallback(writer -> writer.write("ID,Name,Price,Quantity,Total Value"))
        .build();
}

5.3 自定义写入器

public class CustomItemWriter implements ItemWriter<User> {
    private final Logger logger = LoggerFactory.getLogger(CustomItemWriter.class);
    @Override
    public void write(List<? extends User> items) {
        for (User item : items) {
            logger.info("Writing user: {}", item.getName());
            // 写入到外部系统或其他目标
        }
    }
}
@Bean
public ItemWriter<User> customItemWriter() {
    return new CustomItemWriter();
}

六、作业执行与监控

6.1 作业启动

@Service
public class BatchService {
    @Autowired
    private JobLauncher jobLauncher;
    @Autowired
    private Job processUserJob;
    public void runProcessUserJob() throws Exception {
        JobParameters jobParameters = new JobParametersBuilder()
            .addString("jobName", "processUserJob")
            .addLong("time", System.currentTimeMillis())
            .toJobParameters();
        JobExecution execution = jobLauncher.run(processUserJob, jobParameters);
        System.out.println("Job execution status: " + execution.getStatus());
    }
}

6.2 作业监控

@RestController
@RequestMapping("/api/batch")
public class BatchController {
    @Autowired
    private JobExplorer jobExplorer;
    @GetMapping("/jobs")
    public List<JobInstance> getJobs() {
        return jobExplorer.getJobInstances("processUserJob", 0, 10);
    }
    @GetMapping("/executions/{jobInstanceId}")
    public List<JobExecution> getExecutions(@PathVariable Long jobInstanceId) {
        JobInstance jobInstance = jobExplorer.getJobInstance(jobInstanceId);
        return jobExplorer.getJobExecutions(jobInstance);
    }
    @GetMapping("/steps/{jobExecutionId}")
    public List<StepExecution> getSteps(@PathVariable Long jobExecutionId) {
        JobExecution jobExecution = jobExplorer.getJobExecution(jobExecutionId);
        return jobExecution.getStepExecutions();
    }
}

6.3 作业调度

@Configuration
@EnableScheduling
public class BatchScheduler {
    @Autowired
    private JobLauncher jobLauncher;
    @Autowired
    private Job processUserJob;
    @Scheduled(cron = "0 0 0 * * ?") // 每天凌晨执行
    public void runDailyJob() throws Exception {
        JobParameters jobParameters = new JobParametersBuilder()
            .addString("jobName", "processUserJob")
            .addLong("time", System.currentTimeMillis())
            .toJobParameters();
        jobLauncher.run(processUserJob, jobParameters);
    }
}

七、Spring Batch 最佳实践

7.1 性能优化

7.2 错误处理

@Bean
public Step processOrderStep() {
    return stepBuilderFactory.get("processOrderStep")
        .<Order, Order>chunk(10)
        .reader(orderItemReader())
        .processor(orderItemProcessor())
        .writer(orderItemWriter())
        .faultTolerant()
        .skipLimit(10)
        .skip(OrderProcessingException.class)
        .retryLimit(3)
        .retry(ConnectionException.class)
        .build();
}

7.3 事务管理

7.4 监控与告警

八、生产环境案例分析

8.1 案例一:电商平台数据同步

某电商平台使用 Spring Batch 实现了从线下系统到线上系统的数据同步。主要功能包括:

通过 Spring Batch,该平台实现了每天同步超过 100 万条商品数据,同步时间从原来的 4 小时减少到 30 分钟,数据准确率达到 99.99%。

8.2 案例二:金融系统批处理

某银行使用 Spring Batch 实现了每日 批处理作业,包括:

通过 Spring Batch,该银行实现了每天处理超过 1000 万笔交易,批处理时间从原来的 6 小时减少到 1.5 小时,系统稳定性显著提高。

九、常见误区与解决方案

9.1 内存溢出

问题:处理大量数据时出现内存溢出
解决方案:合理设置 chunk 大小,使用分页读取,避免一次性加载所有数据

9.2 事务管理不当

问题:事务范围过大,导致锁定时间过长
解决方案:合理设置事务边界,使用局部事务

9.3 错误处理不完善

问题:错误处理机制不完善,导致作业频繁失败
解决方案:设置合理的跳过策略和重试机制

9.4 监控不足

问题:缺乏对作业执行状态的监控
解决方案:建立完善的监控体系,及时发现和解决问题

十、总结与展望

Spring Batch 是一个强大的批处理框架,它为企业级应用提供了可靠、高效的数据处理能力。通过合理配置和使用 Spring Batch,可以显著提高数据处理效率,减少人工干预,提高系统可靠性。

在云原生时代,Spring Batch 也在不断演进。未来,我们将看到 Spring Batch 与云原生技术的深度融合,如与 Kubernetes 的集成,以及对 Serverless 架构的支持,为批处理作业提供更加灵活、高效的运行环境。

记住,批处理作业的设计应该根据业务需求和数据特点进行合理规划。这其实可以更优雅一点

到此这篇关于Spring Batch 数据处理的实现的文章就介绍到这了,更多相关Spring Batch 数据处理内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文