python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python MySQL数据同步

使用Python高效实现MySQL数据同步的几种方案

作者:detayun

在数据驱动的现代应用中,数据库同步是确保数据一致性和可用性的关键环节,MySQL作为最流行的开源关系型数据库之一,其数据同步需求广泛存在于主从复制、数据迁移、备份恢复等场景,本文将详细介绍如何使用Python实现高效可靠的MySQL数据同步方案,需要的朋友可以参考下

引言

在数据驱动的现代应用中,数据库同步是确保数据一致性和可用性的关键环节。MySQL作为最流行的开源关系型数据库之一,其数据同步需求广泛存在于主从复制、数据迁移、备份恢复等场景。本文将详细介绍如何使用Python实现高效可靠的MySQL数据同步方案,涵盖基础同步方法、增量同步策略以及错误处理机制。

一、准备工作

1. 环境配置

首先确保已安装:

pip install pymysql sqlalchemy sshtunnel  # 基本依赖
pip install pandas mysql-connector-python  # 高级功能可选

2. 数据库连接配置

创建配置文件db_config.py

SOURCE_DB = {
    'host': 'source_host',
    'user': 'username',
    'password': 'password',
    'database': 'db_name',
    'port': 3306,
    'charset': 'utf8mb4'
}

TARGET_DB = {
    'host': 'target_host',
    'user': 'username',
    'password': 'password',
    'database': 'db_name',
    'port': 3306
}

二、基础同步方法

方法1:使用PyMySQL全量同步

import pymysql
from db_config import SOURCE_DB, TARGET_DB

def full_sync(source_config, target_config):
    try:
        # 连接源数据库
        source_conn = pymysql.connect(**source_config)
        with source_conn.cursor() as src_cursor:
            src_cursor.execute("SHOW TABLES")
            tables = src_cursor.fetchall()
            
            # 连接目标数据库
            target_conn = pymysql.connect(**target_config)
            
            for (table,) in tables:
                print(f"同步表: {table}")
                
                # 获取表结构
                src_cursor.execute(f"SHOW CREATE TABLE {table}")
                create_table_sql = src_cursor.fetchone()[1]
                
                # 在目标库重建表(先删除旧表)
                with target_conn.cursor() as tgt_cursor:
                    tgt_cursor.execute(f"DROP TABLE IF EXISTS {table}")
                    tgt_cursor.execute(create_table_sql)
                
                # 获取数据并插入
                src_cursor.execute(f"SELECT * FROM {table}")
                rows = src_cursor.fetchall()
                if rows:
                    columns = [desc[0] for desc in src_cursor.description]
                    placeholders = ', '.join(['%s'] * len(columns))
                    insert_sql = f"INSERT INTO {table} ({', '.join(columns)}) VALUES ({placeholders})"
                    
                    with target_conn.cursor() as tgt_cursor:
                        tgt_cursor.executemany(insert_sql, rows)
                    target_conn.commit()
                    
    except Exception as e:
        print(f"同步失败: {str(e)}")
    finally:
        source_conn.close() if 'source_conn' in locals() else None
        target_conn.close() if 'target_conn' in locals() else None

# 执行全量同步
full_sync(SOURCE_DB, TARGET_DB)

方法2:使用SQLAlchemy(ORM方式)

from sqlalchemy import create_engine, MetaData
from sqlalchemy.orm import sessionmaker
from db_config import SOURCE_DB, TARGET_DB

def orm_sync():
    # 创建引擎
    source_engine = create_engine(
        f"mysql+pymysql://{SOURCE_DB['user']}:{SOURCE_DB['password']}@"
        f"{SOURCE_DB['host']}:{SOURCE_DB['port']}/{SOURCE_DB['database']}"
    )
    target_engine = create_engine(
        f"mysql+pymysql://{TARGET_DB['user']}:{TARGET_DB['password']}@"
        f"{TARGET_DB['host']}:{TARGET_DB['port']}/{TARGET_DB['database']}"
    )
    
    # 获取源库元数据
    source_meta = MetaData(bind=source_engine)
    source_meta.reflect()
    
    # 创建目标会话
    TargetSession = sessionmaker(bind=target_engine)
    target_session = TargetSession()
    
    try:
        for table_name, table in source_meta.tables.items():
            print(f"处理表: {table_name}")
            
            # 清空目标表(生产环境应考虑更安全的策略)
            target_session.execute(f"TRUNCATE TABLE {table_name}")
            
            # 查询源数据
            result = source_engine.execute(table.select())
            rows = result.fetchall()
            
            if rows:
                # 批量插入
                insert_stmt = table.insert().values(rows)
                target_session.execute(insert_stmt)
                target_session.commit()
                
    except Exception as e:
        target_session.rollback()
        print(f"同步错误: {str(e)}")
    finally:
        target_session.close()

三、增量同步策略

1. 基于时间戳的增量同步

def incremental_sync(last_sync_time):
    try:
        source_conn = pymysql.connect(**SOURCE_DB)
        target_conn = pymysql.connect(**TARGET_DB)
        
        with source_conn.cursor() as src_cursor, target_conn.cursor() as tgt_cursor:
            # 假设所有表都有update_time字段
            src_cursor.execute("SHOW TABLES")
            tables = [table[0] for table in src_cursor.fetchall()]
            
            for table in tables:
                # 查询增量数据
                query = f"""
                SELECT * FROM {table} 
                WHERE update_time > '{last_sync_time}'
                """
                src_cursor.execute(query)
                new_rows = src_cursor.fetchall()
                
                if new_rows:
                    columns = [desc[0] for desc in src_cursor.description]
                    placeholders = ', '.join(['%s'] * len(columns))
                    insert_sql = f"""
                    INSERT INTO {table} ({', '.join(columns)}) 
                    VALUES ({placeholders})
                    ON DUPLICATE KEY UPDATE
                    """ + ', '.join([f"{col}=VALUES({col})" for col in columns[1:]])
                    
                    tgt_cursor.executemany(insert_sql, new_rows)
                    target_conn.commit()
            
            # 更新最后同步时间(实际应持久化存储)
            current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            
    except Exception as e:
        print(f"增量同步失败: {str(e)}")
    finally:
        source_conn.close()
        target_conn.close()

2. 使用Binlog实现实时同步

对于需要实时同步的场景,可以使用mysql-replication库监听Binlog:

from pymysqlreplication import BinLogStreamReader
import pymysql

def binlog_sync():
    mysql_settings = {
        'host': SOURCE_DB['host'],
        'port': SOURCE_DB['port'],
        'user': SOURCE_DB['user'],
        'passwd': SOURCE_DB['password']
    }
    
    target_conn = pymysql.connect(**TARGET_DB)
    
    stream = BinLogStreamReader(
        mysql_settings,
        server_id=100,
        blocking=True,
        only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent]
    )
    
    try:
        for binlogevent in stream:
            binlogevent.dump()
            for row in binlogevent.rows:
                table = binlogevent.table
                event_type = binlogevent.__class__.__name__
                
                # 根据事件类型处理数据
                if event_type == "WriteRowsEvent":
                    # 处理插入
                    pass
                elif event_type == "UpdateRowsEvent":
                    # 处理更新
                    pass
                elif event_type == "DeleteRowsEvent":
                    # 处理删除
                    pass
                    
    except KeyboardInterrupt:
        print("手动停止同步")
    finally:
        stream.close()
        target_conn.close()

四、高级优化技巧

1. 多线程加速同步

from concurrent.futures import ThreadPoolExecutor
import pymysql

def sync_table(table_name, source_config, target_config):
    try:
        source_conn = pymysql.connect(**source_config)
        target_conn = pymysql.connect(**target_config)
        
        with source_conn.cursor() as src_cursor, target_conn.cursor() as tgt_cursor:
            # 实现单表同步逻辑...
            
    except Exception as e:
        print(f"表{table_name}同步失败: {str(e)}")

def parallel_sync():
    source_conn = pymysql.connect(**SOURCE_DB)
    with source_conn.cursor() as cursor:
        cursor.execute("SHOW TABLES")
        tables = [table[0] for table in cursor.fetchall()]
    
    with ThreadPoolExecutor(max_workers=4) as executor:
        for table in tables:
            executor.submit(sync_table, table, SOURCE_DB, TARGET_DB)

2. 数据校验机制

def verify_sync(source_config, target_config):
    source_conn = pymysql.connect(**source_config)
    target_conn = pymysql.connect(**target_config)
    
    mismatches = []
    
    with source_conn.cursor() as src_cursor, target_conn.cursor() as tgt_cursor:
        src_cursor.execute("SHOW TABLES")
        tables = [table[0] for table in src_cursor.fetchall()]
        
        for table in tables:
            # 计算源表记录数
            src_cursor.execute(f"SELECT COUNT(*) FROM {table}")
            src_count = src_cursor.fetchone()[0]
            
            # 计算目标表记录数
            tgt_cursor.execute(f"SELECT COUNT(*) FROM {table}")
            tgt_count = tgt_cursor.fetchone()[0]
            
            if src_count != tgt_count:
                mismatches.append((table, "记录数不匹配", src_count, tgt_count))
            
            # 可选:抽样校验数据内容...
    
    if mismatches:
        print("发现数据不一致:")
        for item in mismatches:
            print(item)
        return False
    return True

五、生产环境建议

  1. 连接池管理:使用DBUtilsSQLAlchemy的连接池
  2. 断点续传:记录同步进度,支持中断后恢复
  3. 监控告警:集成Prometheus监控同步指标
  4. 安全加固
    • 使用SSH隧道加密传输
    • 最小权限原则配置数据库用户
    • 敏感信息使用环境变量或密钥管理服务

六、完整示例项目结构

mysql_sync/
├── config/
│   ├── db_config.py       # 数据库配置
│   └── logger_config.py   # 日志配置
├── core/
│   ├── sync_engine.py     # 核心同步逻辑
│   ├── verifier.py        # 数据校验
│   └── utils.py           # 工具函数
├── scripts/
│   ├── full_sync.py       # 全量同步脚本
│   └── incremental.py     # 增量同步脚本
└── tests/
    └── test_sync.py        # 单元测试

结论

Python提供了灵活多样的方式来实现MySQL数据同步,从简单的全量复制到复杂的实时同步均可覆盖。根据实际业务需求,可以选择:

建议在实际部署前进行充分的测试,特别是在数据一致性要求严格的场景下,务必添加完善的数据校验机制。

以上就是使用Python高效实现MySQL数据同步的几种方案的详细内容,更多关于Python MySQL数据同步的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文