SpringBatch结合SpringBoot简单使用实现工资发放批处理操作方式
作者:回炉重造P
最近有接触到批处理相关的需求,学习了下SpringBatch的使用方法。SpringBatch能把复杂的批处理任务进行step分解,并能通过reader和writer满足不同来源数据的处理需求,支持在step定义时设置异常重试策略等,比较方便拓展。
简单记录下基于SpringBoot写的使用demo。
需求
两张表,user_with_role和role_num,分别有user信息和工资流水信息,role_num冗余。
user_with_role表,包含员工信息和role字段信息
CREATE TABLE `user_with_role` ( `id` INT NOT NULL AUTO_INCREMENT, `username` VARCHAR(45) NULL, `role` VARCHAR(45) NULL, PRIMARY KEY (`id`));
role_num表,包含发钱信息
CREATE TABLE `role_num1` ( `id` INT NOT NULL AUTO_INCREMENT, `role` VARCHAR(45) NULL, `account` INT NULL, `username` VARCHAR(45) NULL, `inputtime` VARCHAR(45) NULL, PRIMARY KEY (`id`));
pom设置和application配置文件
在springboot的基础上使用batch,pom.xml中增加dependency
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency>
还需要加上jdbc依赖,不过这个一般都有加。
application中增加:
spring: batch: initialize-schema: always
这个配置主要是让springbatch自动在数据库中创建运行所需的表,SpringBoot2.7版本后修改为 spring.batch.jdbc.initialize-schema
了,不过我还在用老土2.3,先这样吧。
bean类
UserWithRole
@Data public class UserWithRole { private int id; private String userName; private String role; }
RoleNum
@Data public class RoleNum { private int id; private String role; private int account; private String userName; private String inputTime; }
关键job配置类
包含reader,writer,step和整体job的配置,通过@Bean注解的方法来进行spring管理,下面一步步来。
@Configuration @EnableBatchProcessing // batch job设置 // 将user_with_role中的role_user工资信息,输出到role_num表中 public class RoleCountingBatchJobConfig { //注入任务对象工厂 @Autowired private JobBuilderFactory jobBuilderFactory; //任务的执行由Step决定,注入step对象的factory @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired private DataSource dataSource; @Autowired private NamedParameterJdbcTemplate namedParameterJdbcTemplate;
SpringBatch默认会构建两个Factory用来进行job构建,DataSource默认为Spring配置的数据库,NamedParameterJdbcTemplate由Spring提供,用来将statement中的问号占位符改为利用命名参数映射更方便,不过我后面没用。
// 设置itemReader @Bean public JdbcCursorItemReader<UserWithRole> getItemReader(){ JdbcCursorItemReader<UserWithRole> itemReader = new JdbcCursorItemReader<>(); itemReader.setDataSource(dataSource);//设置数据源 // 实体映射 itemReader.setRowMapper(new RowMapper<UserWithRole>() { @Override public UserWithRole mapRow(ResultSet rs, int rowNum) throws SQLException { UserWithRole uwr = new UserWithRole(); uwr.setId(rs.getInt("id")); uwr.setUserName(rs.getString("userName")); uwr.setRole(rs.getString("role")); return uwr; } }); String sql = "select u.id as id, u.username as userName, u.role as role from user_with_role as u"; itemReader.setSql(sql); return itemReader; }
itemReader
用的是 JdbcCursorItemReader
,用来从user_with_role表中读取数据,主要是配置 RowMapper
,将行数据封装成 UserWithRole
对象,sql就是简单查询。
// 设置itemWriter @Bean public JdbcBatchItemWriter<RoleNum> getItemWriter(){ JdbcBatchItemWriter<RoleNum> itemWriter = new JdbcBatchItemWriter<>(); itemWriter.setDataSource(dataSource); itemWriter.setJdbcTemplate(namedParameterJdbcTemplate); String sql = "insert into role_num(role, account, username, inputtime) values(?, ?, ?, ?)"; itemWriter.setSql(sql); itemWriter.setItemPreparedStatementSetter(new RoleNumPreparedStatementSetter()); return itemWriter; }
itemWriter
用的是 JdbcBatchItemWriter
,用来把RoleNum对象存到数据库中,主要的参数写入逻辑写在 RoleNumPreparedStatementSetter
类中。
public class RoleNumPreparedStatementSetter implements ItemPreparedStatementSetter<RoleNum> { // 将role_num的批处理计算结果存入的sql @Override public void setValues(RoleNum roleNum, PreparedStatement preparedStatement) throws SQLException { preparedStatement.setString(1, roleNum.getRole()); preparedStatement.setInt(2, roleNum.getAccount()); preparedStatement.setString(3, roleNum.getUserName()); preparedStatement.setString(4, roleNum.getInputTime()); } }
RoleNumPreparedStatementSetter
主要是将参数通过占位符放到sql中。
// 设置process @Bean public RoleUserCountingProcess getProcess(){ return new RoleUserCountingProcess(); }
public class RoleUserCountingProcess implements ItemProcessor<UserWithRole, RoleNum> { // 将user的信息转换成roleNum信息,当作发工资 @Override public RoleNum process(UserWithRole userWithRole) throws Exception { RoleNum newRoleNum = new RoleNum(); newRoleNum.setUserName(userWithRole.getUserName()); newRoleNum.setRole(userWithRole.getRole()); newRoleNum.setInputTime(new LocalDateTime(new Date()).toString()); if(userWithRole.getRole()==null) return newRoleNum; switch (userWithRole.getRole()) { case "employee": newRoleNum.setAccount(100); break; case "manager": newRoleNum.setAccount(10000); break; case "boss": newRoleNum.setAccount(100000); break; } System.out.println(newRoleNum.getUserName() + "---" + newRoleNum.getAccount()); return newRoleNum; } }
process中定义了具体的工资写入逻辑,实现了 ItemProcess
接口的process方法,将itemReader的输出UserWithRole类和itemWriter的输入RoleNum类进行连接,从而完成process过程。
// 设置step @Bean public Step getStep(){ return stepBuilderFactory.get("user_role_convert_step") .<UserWithRole, RoleNum>chunk(10) .reader(getItemReader()) .processor(getProcess()) .writer(getItemWriter()) .build(); } // 获取job对象 @Bean public Job RoleCountingBatchJob(JobBuilderFactory jobBuilders, StepBuilderFactory stepBuilders){ return jobBuilders.get("user_role_convert_step") .start(getStep()) .build(); }
最后定义step和job整体流程。
在配置文件中加入:
spring: batch: job: enabled: false
可以阻止job在项目启动时自动执行。
controller启动jobInstance
@Controller public class BatchRunnerController { @Autowired JobLauncher jobLauncher; // 自定义job任务 @Autowired Job roleCountingBatchJob; @RequestMapping("/countingRoleUser") @ResponseBody public String countingRollUser() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException { // 设置时间parameter来区分instance JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(); jobParametersBuilder.addDate("startDate", new Date()); jobLauncher.run(roleCountingBatchJob, jobParametersBuilder.toJobParameters()); return "role user counting batch success --- " + Objects.requireNonNull(jobParametersBuilder.toJobParameters().getDate("startDate")).toString(); } }
利用 jobLauncher
来执行对应的job,date时间来区分不同的jobInstance。
实现效果
user_with_role表
批处理job正常执行
duideduide没有role所以没发到工资
总结
本文简单记录了下简单的springbatch的使用,包括item相关,process编写,step构建和最终的job使用。springbatch还有一点是可以设置出现异常的处理策略,比如容忍数次异常,调过某些异常等,在真实使用中比较灵活,有机会再补充。
好了,以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。