SpringBoot定时任务实现数据库数据同步全过程
作者:小小初霁
文章详细介绍了从简单到企业级数据库同步需求的技术方案,包括选型、实现步骤、优化方案、异常处理策略、生产环境配置建议等
一、技术方案选型
1. 核心组件
- Spring Scheduler:轻量级定时任务框架
- Spring Data JPA:数据库操作(可替换为MyBatis)
- Quartz:复杂调度需求(集群/持久化)
- Spring Batch:大批量数据处理
2. 架构示意图
[定时触发器] -> [数据抽取] -> [数据转换] -> [数据加载] -> [结果通知]
二、基础实现步骤
1. 添加依赖
<!-- Spring Boot Starter Web (包含Scheduler) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Data JPA -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!-- 数据库驱动(示例使用MySQL) -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.28</version>
</dependency>2. 启用定时任务
@SpringBootApplication
@EnableScheduling
public class DataSyncApplication {
public static void main(String[] args) {
SpringApplication.run(DataSyncApplication.class, args);
}
}3. 实现定时任务类
@Component
public class DataSyncScheduler {
private static final Logger logger = LoggerFactory.getLogger(DataSyncScheduler.class);
@Autowired
private SourceRepository sourceRepo;
@Autowired
private TargetRepository targetRepo;
// 每天凌晨1点执行
@Scheduled(cron = "0 0 1 * * ?")
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.READ_COMMITTED)
public void syncDataDaily() {
try {
logger.info("开始数据同步任务...");
// 1. 获取增量数据
LocalDateTime lastSyncTime = getLastSyncTime();
List<SourceEntity> newData = sourceRepo.findByUpdateTimeAfter(lastSyncTime);
// 2. 数据转换
List<TargetEntity> transformedData = transformData(newData);
// 3. 批量保存
targetRepo.saveAll(transformedData);
// 4. 更新同步时间
updateLastSyncTime(LocalDateTime.now());
logger.info("数据同步完成,处理记录数:{}", newData.size());
} catch (Exception e) {
logger.error("数据同步任务异常:", e);
// 添加重试或报警逻辑
}
}
// 数据转换方法
private List<TargetEntity> transformData(List<SourceEntity> sourceList) {
return sourceList.stream()
.map(entity -> new TargetEntity(
entity.getId(),
entity.getName(),
entity.getData(),
LocalDateTime.now()))
.collect(Collectors.toList());
}
// 获取上次同步时间(示例)
private LocalDateTime getLastSyncTime() {
// 可从数据库或缓存获取
return LocalDateTime.now().minusDays(1);
}
// 更新同步时间
private void updateLastSyncTime(LocalDateTime time) {
// 持久化存储逻辑
}
}三、其他优化方案
1. 分布式锁机制
// 使用Redis实现分布式锁
@Scheduled(cron = "${sync.cron}")
public void distributedSyncTask() {
String lockKey = "data_sync_lock";
String requestId = UUID.randomUUID().toString();
try {
// 尝试获取锁
boolean locked = redisTemplate.opsForValue()
.setIfAbsent(lockKey, requestId, 30, TimeUnit.MINUTES);
if (locked) {
// 执行同步逻辑
performSync();
}
} finally {
// 释放锁
if (requestId.equals(redisTemplate.opsForValue().get(lockKey))) {
redisTemplate.delete(lockKey);
}
}
}2. 分页批量处理
private void batchSyncWithPagination() {
int pageSize = 1000;
int page = 0;
Page<SourceEntity> dataPage;
do {
dataPage = sourceRepo.findAll(PageRequest.of(page, pageSize));
List<TargetEntity> targetList = transformData(dataPage.getContent());
targetRepo.saveAll(targetList);
page++;
} while (dataPage.hasNext());
}3. 事务优化配置
# application.yml
spring:
jpa:
properties:
hibernate:
jdbc:
batch_size: 500
order_inserts: true
order_updates: true4. 性能监控配置
@Aspect
@Component
public class SyncMonitorAspect {
@Around("@annotation(org.springframework.scheduling.annotation.Scheduled)")
public Object monitorTask(ProceedingJoinPoint joinPoint) throws Throwable {
long start = System.currentTimeMillis();
try {
return joinPoint.proceed();
} finally {
long duration = System.currentTimeMillis() - start;
Metrics.timer("sync.task.duration")
.tag("task", joinPoint.getSignature().getName())
.record(duration, TimeUnit.MILLISECONDS);
}
}
}四、异常处理策略
1. 重试机制
@Retryable(maxAttempts = 3, backoff = @Backoff(delay = 5000))
public void performSync() {
// 同步逻辑
}
@Recover
public void recoverSync(Exception e) {
// 报警通知
alertService.sendAlert("数据同步失败:" + e.getMessage());
}2. 死信队列处理
// 使用Spring Retry + RabbitMQ实现
@Bean
public RetryOperationsInterceptor retryInterceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(3)
.backOffOptions(1000, 2.0, 5000)
.recoverer(new RepublishMessageRecoverer(rabbitTemplate, "dead-letter-exchange"))
.build();
}五、生产环境建议
- 配置中心管理:将cron表达式放在配置中心实现动态调整
- 多数据源配置:使用AbstractRoutingDataSource实现动态数据源切换
- 版本控制:维护数据版本号实现幂等同步
- 数据校验:添加MD5校验机制保证数据一致性
- 监控告警:集成Prometheus + Grafana实现可视化监控
六、完整配置示例
# application.properties # 定时任务配置 sync.cron=0 0 2 * * * sync.batch-size=1000 sync.max-retry=3 # 数据源配置 spring.datasource.source.url=jdbc:mysql://source-db:3306/db spring.datasource.source.username=user spring.datasource.source.password=pass spring.datasource.target.url=jdbc:mysql://target-db:3306/db spring.datasource.target.username=user spring.datasource.target.password=pass
七、常见问题排查
任务未执行:
- 检查@EnableScheduling是否启用
- 确认cron表达式格式正确
- 查看线程池配置
数据不一致:
- 检查事务隔离级别
- 验证数据转换逻辑
- 添加数据校验机制
性能瓶颈:
- 优化SQL查询(添加索引)
- 调整批量提交大小
- 增加JVM内存分配
内存溢出:
- 使用分页查询代替全量加载
- 优化对象重用机制
- 增加JVM堆内存
建议使用Arthas进行运行时诊断:https://arthas.aliyun.com
总结
通过以上方案,可以实现从简单到企业级的数据库同步需求。实际应用中应根据数据量级、同步频率和业务需求选择合适的实现策略,并建立完善的监控告警体系。
这些仅为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
