java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > SpringBoot定时任务实现数据库数据同步

SpringBoot定时任务实现数据库数据同步全过程

作者:小小初霁

文章详细介绍了从简单到企业级数据库同步需求的技术方案,包括选型、实现步骤、优化方案、异常处理策略、生产环境配置建议等

一、技术方案选型

1. 核心组件

2. 架构示意图

[定时触发器] -> [数据抽取] -> [数据转换] -> [数据加载] -> [结果通知]

二、基础实现步骤

1. 添加依赖

<!-- Spring Boot Starter Web (包含Scheduler) -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- Spring Data JPA -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>

<!-- 数据库驱动(示例使用MySQL) -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.28</version>
</dependency>

2. 启用定时任务

@SpringBootApplication
@EnableScheduling
public class DataSyncApplication {
    public static void main(String[] args) {
        SpringApplication.run(DataSyncApplication.class, args);
    }
}

3. 实现定时任务类

@Component
public class DataSyncScheduler {
    
    private static final Logger logger = LoggerFactory.getLogger(DataSyncScheduler.class);
    
    @Autowired
    private SourceRepository sourceRepo;
    
    @Autowired
    private TargetRepository targetRepo;

    // 每天凌晨1点执行
    @Scheduled(cron = "0 0 1 * * ?")
    @Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.READ_COMMITTED)
    public void syncDataDaily() {
        try {
            logger.info("开始数据同步任务...");
            
            // 1. 获取增量数据
            LocalDateTime lastSyncTime = getLastSyncTime();
            List<SourceEntity> newData = sourceRepo.findByUpdateTimeAfter(lastSyncTime);
            
            // 2. 数据转换
            List<TargetEntity> transformedData = transformData(newData);
            
            // 3. 批量保存
            targetRepo.saveAll(transformedData);
            
            // 4. 更新同步时间
            updateLastSyncTime(LocalDateTime.now());
            
            logger.info("数据同步完成,处理记录数:{}", newData.size());
        } catch (Exception e) {
            logger.error("数据同步任务异常:", e);
            // 添加重试或报警逻辑
        }
    }
    
    // 数据转换方法
    private List<TargetEntity> transformData(List<SourceEntity> sourceList) {
        return sourceList.stream()
                .map(entity -> new TargetEntity(
                        entity.getId(),
                        entity.getName(),
                        entity.getData(),
                        LocalDateTime.now()))
                .collect(Collectors.toList());
    }
    
    // 获取上次同步时间(示例)
    private LocalDateTime getLastSyncTime() {
        // 可从数据库或缓存获取
        return LocalDateTime.now().minusDays(1);
    }
    
    // 更新同步时间
    private void updateLastSyncTime(LocalDateTime time) {
        // 持久化存储逻辑
    }
}

三、其他优化方案

1. 分布式锁机制

// 使用Redis实现分布式锁
@Scheduled(cron = "${sync.cron}")
public void distributedSyncTask() {
    String lockKey = "data_sync_lock";
    String requestId = UUID.randomUUID().toString();
    try {
        // 尝试获取锁
        boolean locked = redisTemplate.opsForValue()
                .setIfAbsent(lockKey, requestId, 30, TimeUnit.MINUTES);
        
        if (locked) {
            // 执行同步逻辑
            performSync();
        }
    } finally {
        // 释放锁
        if (requestId.equals(redisTemplate.opsForValue().get(lockKey))) {
            redisTemplate.delete(lockKey);
        }
    }
}

2. 分页批量处理

private void batchSyncWithPagination() {
    int pageSize = 1000;
    int page = 0;
    
    Page<SourceEntity> dataPage;
    do {
        dataPage = sourceRepo.findAll(PageRequest.of(page, pageSize));
        List<TargetEntity> targetList = transformData(dataPage.getContent());
        targetRepo.saveAll(targetList);
        page++;
    } while (dataPage.hasNext());
}

3. 事务优化配置

# application.yml
spring:
  jpa:
    properties:
      hibernate:
        jdbc:
          batch_size: 500
        order_inserts: true
        order_updates: true

4. 性能监控配置

@Aspect
@Component
public class SyncMonitorAspect {
    
    @Around("@annotation(org.springframework.scheduling.annotation.Scheduled)")
    public Object monitorTask(ProceedingJoinPoint joinPoint) throws Throwable {
        long start = System.currentTimeMillis();
        try {
            return joinPoint.proceed();
        } finally {
            long duration = System.currentTimeMillis() - start;
            Metrics.timer("sync.task.duration")
                    .tag("task", joinPoint.getSignature().getName())
                    .record(duration, TimeUnit.MILLISECONDS);
        }
    }
}

四、异常处理策略

1. 重试机制

@Retryable(maxAttempts = 3, backoff = @Backoff(delay = 5000))
public void performSync() {
    // 同步逻辑
}

@Recover
public void recoverSync(Exception e) {
    // 报警通知
    alertService.sendAlert("数据同步失败:" + e.getMessage());
}

2. 死信队列处理

// 使用Spring Retry + RabbitMQ实现
@Bean
public RetryOperationsInterceptor retryInterceptor() {
    return RetryInterceptorBuilder.stateless()
            .maxAttempts(3)
            .backOffOptions(1000, 2.0, 5000)
            .recoverer(new RepublishMessageRecoverer(rabbitTemplate, "dead-letter-exchange"))
            .build();
}

五、生产环境建议

六、完整配置示例

# application.properties
# 定时任务配置
sync.cron=0 0 2 * * *
sync.batch-size=1000
sync.max-retry=3

# 数据源配置
spring.datasource.source.url=jdbc:mysql://source-db:3306/db
spring.datasource.source.username=user
spring.datasource.source.password=pass

spring.datasource.target.url=jdbc:mysql://target-db:3306/db
spring.datasource.target.username=user
spring.datasource.target.password=pass

七、常见问题排查

任务未执行

数据不一致

性能瓶颈

内存溢出

建议使用Arthas进行运行时诊断:https://arthas.aliyun.com

总结

通过以上方案,可以实现从简单到企业级的数据库同步需求。实际应用中应根据数据量级、同步频率和业务需求选择合适的实现策略,并建立完善的监控告警体系。

这些仅为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文