Mysql

关注公众号 jb51net

关闭
首页 > 数据库 > Mysql > MySQL同步Elasticsearch

MySQL同步Elasticsearch的6种方案小结

作者:苏三说技术

在分布式架构中,MySQL与Elasticsearch(ES)的协同已成为解决高并发查询与复杂检索的标配组合,本文整理了MySQL同步ES的6种主流方案,大家可以根据自己的需要进行选择

引言

在分布式架构中,MySQL与Elasticsearch(ES)的协同已成为解决高并发查询与复杂检索的标配组合。

然而,如何实现两者间的高效数据同步,是架构设计中绕不开的难题。

这篇文章跟大家一起聊聊MySQL同步ES的6种主流方案,结合代码示例与场景案例,帮助开发者避开常见陷阱,做出最优技术选型。

方案一:同步双写

场景:适用于对数据实时性要求极高,且业务逻辑简单的场景,如金融交易记录同步。

在业务代码中同时写入MySQL与ES。

代码如下:

@Transactional  
public void createOrder(Order order) {  
    // 写入MySQL  
    orderMapper.insert(order);  
    // 同步写入ES  
    IndexRequest request = new IndexRequest("orders")  
        .id(order.getId())  
        .source(JSON.toJSONString(order), XContentType.JSON);  
    client.index(request, RequestOptions.DEFAULT);  
}

痛点

方案二:异步双写

场景:电商订单状态更新后需同步至ES供客服系统检索。

我们可以使用MQ进行解耦。

架构图如下

代码示例如下

// 生产者端  
public void updateProduct(Product product) {  
    productMapper.update(product);  
    kafkaTemplate.send("product-update", product.getId());  
}  

// 消费者端  
@KafkaListener(topics = "product-update")  
public void syncToEs(String productId) {  
    Product product = productMapper.selectById(productId);  
    esClient.index(product);  
}

优势

缺陷

方案三:Logstash定时拉取

场景:用户行为日志的T+1分析场景。

该方案低侵入但高延迟。

配置示例如下

input {  
  jdbc {  
    jdbc_driver => "com.mysql.jdbc.Driver"  
    jdbc_url => "jdbc:mysql://localhost:3306/log_db"  
    schedule => "*/5 * * * *"  # 每5分钟执行  
    statement => "SELECT * FROM user_log WHERE update_time > :sql_last_value"  
  }  
}  
output {  
  elasticsearch {  
    hosts => ["es-host:9200"]  
    index => "user_logs"  
  }  
}

适用性分析

方案四:Canal监听Binlog

场景:社交平台动态实时搜索(如微博热搜更新)。

技术栈:Canal + RocketMQ + ES

该方案高实时,并且低侵入。

架构流程如下

关键配置

# canal.properties  
canal.instance.master.address=127.0.0.1:3306  
canal.mq.topic=canal.es.sync

避坑指南

方案五:DataX批量同步

场景:将历史订单数据从分库分表MySQL迁移至ES。

该方案是大数据迁移的首选。

配置文件如下

{  
  "job": {  
    "content": [{  
      "reader": {  
        "name": "mysqlreader",  
        "parameter": { "splitPk": "id", "querySql": "SELECT * FROM orders" }  
      },  
      "writer": {  
        "name": "elasticsearchwriter",  
        "parameter": { "endpoint": "http://es-host:9200", "index": "orders" }  
      }  
    }]  
  }  
}

性能调优

方案六:Flink流处理

场景:商品价格变更时,需关联用户画像计算实时推荐评分。

该方案适合于复杂的ETL场景。

代码片段如下

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
env.addSource(new CanalSource())  
   .map(record -> parseToPriceEvent(record))  
   .keyBy(event -> event.getProductId())  
   .connect(userProfileBroadcastStream)  
   .process(new PriceRecommendationProcess())  
   .addSink(new ElasticsearchSink());

优势

总结

对于文章上面给出的这6种技术方案,我们在实际工作中,该如何做选型呢?

下面用一张表格做对比:

方案实时性侵入性复杂度适用阶段
同步双写秒级小型单体项目
MQ异步秒级中型分布式系统
Logstash分钟级离线分析
Canal毫秒级高并发生产环境
DataX小时级历史数据迁移
Flink毫秒级极高实时数仓

苏三的建议

到此这篇关于MySQL同步Elasticsearch的6种方案小结的文章就介绍到这了,更多相关MySQL同步Elasticsearch内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文