Mysql

关注公众号 jb51net

关闭
首页 > 数据库 > Mysql > MySQL和Elasticsearch数据同步

MySQL和Elasticsearch数据同步方案详解

作者:匆匆忙忙游刃有余

在现代电商系统中,MySQL 作为关系型数据库负责数据的持久化存储,而 Elasticsearch 则作为搜索引擎提供高效的全文检索能力,本文就来详细的介绍一下MySQL和Elasticsearch数据同步方案,感兴趣的可以了解一下

在现代电商系统中,MySQL 作为关系型数据库负责数据的持久化存储,而 Elasticsearch 则作为搜索引擎提供高效的全文检索能力。保证两者之间的数据一致性是系统设计的关键挑战。本文将详细介绍主流的同步方案、实现方式及其优缺点。

一、同步方案对比

同步方案实时性实现复杂度一致性保证性能影响适用场景
同步双写极高简单强一致性金融交易、核心订单
异步双写较高中等最终一致性电商商品、用户信息
Canal + MQ较高最终一致性大规模数据同步
Logstash 定时简单弱一致性报表、分析数据
Debezium最终一致性复杂数据同步

二、详细实现方案

1. 同步双写

原理:在业务代码中同时写入 MySQL 和 Elasticsearch,确保两者数据同步更新。

实现代码

@Service
@Transactional
public class ProductServiceImpl implements ProductService {
    
    @Autowired
    private ProductMapper productMapper;
    
    @Autowired
    private ElasticsearchClient esClient;
    
    @Override
    public void createProduct(Product product) {
        // 1. 写入 MySQL
        productMapper.insert(product);
        
        // 2. 同步写入 ES
        try {
            ProductIndex productIndex = convertToIndex(product);
            IndexRequest<ProductIndex> request = IndexRequest.of(i -> i
                .index("product_index")
                .id(product.getId().toString())
                .document(productIndex)
            );
            esClient.index(request);
        } catch (Exception e) {
            // 处理 ES 写入失败的情况
            log.error("ES同步失败: {}", e.getMessage());
            // 可以选择抛出异常回滚事务,或者记录失败日志后续补偿
            throw new RuntimeException("数据同步失败", e);
        }
    }
    
    @Override
    public void updateProduct(Product product) {
        // 类似 createProduct,同时更新 MySQL 和 ES
        productMapper.updateById(product);
        // ES 更新逻辑...
    }
    
    private ProductIndex convertToIndex(Product product) {
        // 实体转换逻辑
        ProductIndex index = new ProductIndex();
        index.setId(product.getId());
        index.setTitle(product.getTitle());
        index.setPrice(product.getPrice());
        // 其他字段转换...
        return index;
    }
}

优缺点

2. 异步双写

原理:通过消息队列解耦,业务代码只负责写 MySQL,然后发送消息到 MQ,由消费者异步更新 ES。

实现代码

// 生产者端
@Service
public class ProductServiceImpl implements ProductService {
    
    @Autowired
    private ProductMapper productMapper;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Override
    @Transactional
    public void createProduct(Product product) {
        // 1. 写入 MySQL
        productMapper.insert(product);
        
        // 2. 发送消息到队列
        ProductEvent event = new ProductEvent();
        event.setType("CREATE");
        event.setProductId(product.getId());
        rabbitTemplate.convertAndSend("product-event-exchange", "product.create", event);
    }
}

// 消费者端
@Component
public class ProductSyncConsumer {
    
    @Autowired
    private ProductMapper productMapper;
    
    @Autowired
    private ElasticsearchClient esClient;
    
    @RabbitListener(queues = "product-sync-queue")
    public void handleProductEvent(ProductEvent event) {
        try {
            Product product = productMapper.selectById(event.getProductId());
            if (product == null) {
                // 删除 ES 中的数据
                deleteFromEs(event.getProductId());
                return;
            }
            
            // 同步到 ES
            ProductIndex index = convertToIndex(product);
            IndexRequest<ProductIndex> request = IndexRequest.of(i -> i
                .index("product_index")
                .id(product.getId().toString())
                .document(index)
            );
            esClient.index(request);
            
            log.info("产品 {} 同步到 ES 成功", event.getProductId());
        } catch (Exception e) {
            log.error("同步 ES 失败: {}", e.getMessage());
            // 可以根据需要进行重试或记录到死信队列
        }
    }
}

优缺点

3. Canal + 消息队列 方案

原理:利用 Canal 监听 MySQL 的 binlog,解析数据变更并发送到消息队列,再由消费者同步到 ES。

3.1 环境准备

MySQL 配置

# 开启 binlog
log-bin=mysql-bin
# 选择 ROW 模式
binlog-format=ROW
# 服务器唯一ID
server-id=1
# 开启 binlog 实时更新
sync_binlog=1

Canal Server 配置

# canal-server/conf/example/instance.properties
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset=UTF-8
canal.instance.filter.regex=.*\..*

Canal Adapter 配置

# canal-adapter/conf/application.yml
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: kafka
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    kafka.bootstrap.servers: 127.0.0.1:9092
    kafka.enable.auto.commit: false
    kafka.auto.commit.interval.ms: 1000
    kafka.auto.offset.reset: latest
    kafka.request.timeout.ms: 40000
    kafka.session.timeout.ms: 30000
    kafka.isolation.level: read_committed
    kafka.max.poll.records: 1000
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/shop?useUnicode=true
      username: root
      password: root
  canalAdapters:
  - instance: example  # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: es
        hosts: 127.0.0.1:9200
        properties:
          mode: rest
          cluster.name: elasticsearch

表映射配置

# canal-adapter/conf/es/mytest_user.yml
dataSourceKey: defaultDS
destination: example
groupId:
topic: example
database: shop
table: tb_product
esMapping:
  _index: product_index
  _type: _doc
  _id: _id
  upsert: true
  sql: |
    select 
      p.id as _id, 
      p.title, 
      p.sub_title as subTitle, 
      p.price, 
      p.sales, 
      c.name as categoryName 
    from tb_product p 
    left join tb_category c on p.cid1 = c.id
  commitBatch: 3000

3.2 自定义消费者实现

如果需要更灵活的处理,可以自定义 Kafka 消费者:

@Component
public class ProductSyncConsumer {
    
    @Autowired
    private ElasticsearchClient esClient;
    
    @KafkaListener(topics = "example")
    public void processMessage(String message) {
        try {
            // 解析 Canal 消息
            CanalMessage canalMsg = JSON.parseObject(message, CanalMessage.class);
            
            for (CanalData data : canalMsg.getData()) {
                // 根据操作类型处理
                switch (canalMsg.getEventType()) {
                    case INSERT:
                    case UPDATE:
                        syncToEs(data);
                        break;
                    case DELETE:
                        deleteFromEs(data);
                        break;
                }
            }
        } catch (Exception e) {
            log.error("处理 Canal 消息失败: {}", e.getMessage());
        }
    }
    
    private void syncToEs(CanalData data) throws IOException {
        // 构建 ES 文档
        ProductIndex index = new ProductIndex();
        index.setId(Long.valueOf(data.get("id").toString()));
        index.setTitle(data.get("title").toString());
        // 其他字段映射...
        
        // 写入 ES
        esClient.index(i -> i
            .index("product_index")
            .id(index.getId().toString())
            .document(index)
        );
    }
}

优缺点

三、数据一致性保障策略

1. 幂等性设计

确保重复同步不会导致数据异常:

// ES 操作幂等性实现
public void syncProduct(Long productId) {
    // 使用文档ID作为唯一标识
    IndexRequest<ProductIndex> request = IndexRequest.of(i -> i
        .index("product_index")
        .id(productId.toString())
        .document(buildProductIndex(productId))
        // 设置乐观锁版本控制
        .versionType(VersionType.EXTERNAL)
        .version(getCurrentVersion(productId))
    );
    
    try {
        esClient.index(request);
    } catch (VersionConflictException e) {
        // 版本冲突,需要重新获取最新数据
        log.warn("版本冲突,重新同步: {}", productId);
        // 重试逻辑...
    }
}

2. 重试机制

@Service
public class EsSyncService {
    
    @Autowired
    private ElasticsearchClient esClient;
    
    @Autowired
    private RedisTemplate redisTemplate;
    
    // 最大重试次数
    private static final int MAX_RETRY_COUNT = 3;
    
    public void syncWithRetry(ProductIndex index) {
        String key = "es:retry:" + index.getId();
        
        for (int i = 0; i < MAX_RETRY_COUNT; i++) {
            try {
                esClient.index(req -> req
                    .index("product_index")
                    .id(index.getId().toString())
                    .document(index)
                );
                // 成功后删除重试标记
                redisTemplate.delete(key);
                return;
            } catch (Exception e) {
                log.error("第{}次同步失败: {}", i+1, e.getMessage());
                
                if (i == MAX_RETRY_COUNT - 1) {
                    // 达到最大重试次数,记录失败任务
                    redisTemplate.opsForValue().set(key, JSON.toJSONString(index), 7, TimeUnit.DAYS);
                    log.error("同步失败,已记录到失败队列: {}", index.getId());
                } else {
                    // 指数退避重试
                    try {
                        Thread.sleep((long) (Math.pow(2, i) * 1000));
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
    }
    
    // 定时任务处理失败的同步任务
    @Scheduled(cron = "0 0/5 * * * ?")
    public void processFailedTasks() {
        Set<String> keys = redisTemplate.keys("es:retry:*");
        if (keys != null) {
            for (String key : keys) {
                String json = (String) redisTemplate.opsForValue().get(key);
                ProductIndex index = JSON.parseObject(json, ProductIndex.class);
                // 重新尝试同步
                syncWithRetry(index);
            }
        }
    }
}

3. 全量校验与修复

定期全量对比 MySQL 和 ES 数据,修复不一致:

@Service
public class DataConsistencyService {
    
    @Autowired
    private ProductMapper productMapper;
    
    @Autowired
    private ElasticsearchClient esClient;
    
    @Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点执行
    public void checkAndRepair() {
        log.info("开始数据一致性校验");
        
        // 分页查询 MySQL 数据
        int page = 1;
        int pageSize = 1000;
        
        while (true) {
            Page<Product> productPage = productMapper.selectPage(
                new Page<>(page, pageSize), null);
            
            for (Product product : productPage.getRecords()) {
                try {
                    // 查询 ES 数据
                    GetResponse<ProductIndex> response = esClient.get(req -> req
                        .index("product_index")
                        .id(product.getId().toString()),
                        ProductIndex.class
                    );
                    
                    if (!response.found()) {
                        // ES 中不存在,需要插入
                        syncToEs(product);
                        log.warn("修复缺失数据: {}", product.getId());
                    } else {
                        // 对比数据是否一致
                        ProductIndex esData = response.source();
                        if (!isConsistent(product, esData)) {
                            // 数据不一致,更新 ES
                            syncToEs(product);
                            log.warn("修复不一致数据: {}", product.getId());
                        }
                    }
                } catch (Exception e) {
                    log.error("校验商品 {} 失败: {}", product.getId(), e.getMessage());
                }
            }
            
            if (productPage.hasNext()) {
                page++;
            } else {
                break;
            }
        }
        
        log.info("数据一致性校验完成");
    }
    
    private boolean isConsistent(Product mysql, ProductIndex es) {
        // 比较关键字段
        return Objects.equals(mysql.getTitle(), es.getTitle()) &&
               Objects.equals(mysql.getPrice(), es.getPrice()) &&
               Objects.equals(mysql.getSales(), es.getSales());
    }
}

四、性能优化策略

1. ES 批量写入

public void batchSyncToEs(List<Product> products) {
    if (CollectionUtils.isEmpty(products)) {
        return;
    }
    
    try {
        List<BulkOperation> operations = new ArrayList<>();
        
        for (Product product : products) {
            ProductIndex index = convertToIndex(product);
            operations.add(BulkOperation.of(op -> op
                .index(idx -> idx
                    .index("product_index")
                    .id(product.getId().toString())
                    .document(index)
                )
            ));
        }
        
        BulkRequest request = BulkRequest.of(req -> req.operations(operations));
        BulkResponse response = esClient.bulk(request);
        
        if (response.errors()) {
            // 处理错误
            for (BulkResponseItem item : response.items()) {
                if (item.error() != null) {
                    log.error("批量同步失败: {} - {}", 
                              item.id(), item.error().reason());
                }
            }
        }
    } catch (Exception e) {
        log.error("批量同步异常: {}", e.getMessage());
    }
}

2. 优化 Canal 配置

# 增加批处理大小
syncBatchSize = 2000

# 优化网络参数
tcp.so.sndbuf = 1048576
tcp.so.rcvbuf = 1048576

# 调整消费线程数
canal.instance.parser.parallel = true
canal.instance.parser.parallelThreadSize = 8

3. MySQL binlog 优化

# 增加 binlog 大小限制
binlog-file-size = 1G

# 优化 binlog 刷新策略
sync_binlog = 1
innodb_flush_log_at_trx_commit = 1

# 调整 binlog 保留时间
expire_logs_days = 7

五、最佳实践建议

1. 方案选型建议

2. 监控与告警

@Service
public class SyncMonitorService {
    
    @Autowired
    private RedisTemplate redisTemplate;
    
    // 记录同步时间戳
    public void recordSyncTimestamp(String tableName, Long id) {
        String key = "sync:timestamp:" + tableName + ":" + id;
        redisTemplate.opsForValue().set(key, System.currentTimeMillis(), 24, TimeUnit.HOURS);
    }
    
    // 检查同步延迟
    @Scheduled(fixedRate = 60000)
    public void checkSyncDelay() {
        // 查询最近5分钟内更新的数据
        List<Product> recentProducts = productMapper.selectRecentUpdated(5);
        
        for (Product product : recentProducts) {
            String key = "sync:timestamp:tb_product:" + product.getId();
            Long syncTime = (Long) redisTemplate.opsForValue().get(key);
            
            if (syncTime == null) {
                // 未同步
                sendAlarm("数据未同步", product.getId());
            } else {
                long delay = System.currentTimeMillis() - syncTime;
                if (delay > 300000) { // 5分钟
                    // 同步延迟过大
                    sendAlarm("数据同步延迟:" + (delay/1000) + "秒", product.getId());
                }
            }
        }
    }
    
    private void sendAlarm(String message, Long productId) {
        // 发送告警(邮件、短信、钉钉等)
        log.error("告警: {} - 商品ID: {}", message, productId);
        // 实际告警逻辑...
    }
}

3. 数据同步异常处理流程

  1. 重试机制:指数退避策略,避免立即重试造成雪崩
  2. 死信队列:记录无法通过重试解决的异常
  3. 手动干预:提供管理界面手动触发同步
  4. 数据校验:定期全量比对,发现并修复不一致

六、总结

MySQL 和 Elasticsearch 数据同步是电商系统中的关键技术挑战。选择合适的同步方案需要综合考虑实时性要求、系统复杂度、团队技术栈等因素。在实际项目中,推荐采用 Canal + 消息队列 的方案,它提供了良好的实时性、可靠性和扩展性,同时对业务代码零侵入。

无论选择哪种方案,都需要特别关注数据一致性保障、异常处理、性能优化和监控告警等方面,确保系统在生产环境中的稳定运行。

到此这篇关于MySQL和Elasticsearch数据同步方案详解的文章就介绍到这了,更多相关MySQL和Elasticsearch数据同步内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文