python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python备份SQLite数据库

使用Python备份SQLite数据库的完整过程(附详细代码)

作者:weixin_pk138132

SQLite数据库的迁移是指将SQLite数据库中的数据转移到另一个数据库系统(如MySQL、PostgreSQL等)中的过程,这篇文章主要介绍了使用Python备份SQLite数据库的相关资料,需要的朋友可以参考下

1. 引言:为什么备份 SQLite 数据库至关重要?

数据是任何应用程序的核心资产。无论是个人项目还是商业应用,数据的丢失都可能带来灾难性后果。对于 SQLite 数据库而言,备份更是不可或缺的实践。

1.1 SQLite 数据库的特点

1.2 备份的常见场景与挑战

对于 SQLite 这种文件型数据库,最简单的备份方式似乎是直接复制文件。然而,如果数据库在复制过程中处于活跃写入状态,直接复制可能导致备份文件不一致或损坏。这就是 sqlite3.Connection.backup() 方法发挥作用的地方。

2. 核心方法:sqlite3.Connection.backup()

Python sqlite3 模块提供了一个名为 backup() 的强大方法,它允许你在数据库运行时进行安全、一致的备份。

2.1backup()方法的工作原理

backup() 方法实现了 SQLite 数据库的在线备份 API。它通过以下方式工作:

  1. 它在一个源数据库连接和一个目标数据库连接之间进行操作。
  2. 它会逐页地将源数据库的数据复制到目标数据库。
  3. 在复制过程中,它会确保数据的一致性,这意味着即使在备份期间源数据库有写入操作,备份文件也是一个在某个时间点上一致的快照。
  4. 这个过程是非阻塞的,源数据库在备份期间可以继续进行读写操作。

2.2backup()的优势:原子性、一致性、非阻塞

2.3 方法签名与参数

source_connection.backup(target_connection, *, pages=-1, progress=None, name='main', sleep=0.250)

3. 分步实现:使用backup()进行备份

3.1准备:创建示例源数据库

首先,我们需要一个包含一些数据的 SQLite 数据库文件作为源,以便进行备份。

import sqlite3
import os

SOURCE_DB = "my_app.db"
BACKUP_DB = "my_app_backup.db"

def create_sample_db(db_filepath):
    """创建并填充一个示例SQLite数据库。"""
    print(f"\n--- Creating sample database: {db_filepath} ---")
    conn = sqlite3.connect(db_filepath)
    cursor = conn.cursor()
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT NOT NULL,
            email TEXT UNIQUE,
            age INTEGER
        );
    ''')
    # 插入一些数据,如果表为空
    cursor.execute("SELECT COUNT(*) FROM users;")
    if cursor.fetchone()[0] == 0:
        cursor.execute("INSERT INTO users (name, email, age) VALUES ('Alice', 'alice@example.com', 30);")
        cursor.execute("INSERT INTO users (name, email, age) VALUES ('Bob', 'bob@example.com', 25);")
        cursor.execute("INSERT INTO users (name, email, age) VALUES ('Charlie', 'charlie@example.com', 35);")
        conn.commit()
        print(f"  Inserted 3 sample users into '{db_filepath}'.")
    else:
        print(f"  '{db_filepath}' already contains data. Skipping insertion.")
    conn.close()

def cleanup_files(*filenames):
    """删除指定的文件。"""
    print("\n--- Cleaning up files ---")
    for filename in filenames:
        if os.path.exists(filename):
            os.remove(filename)
            print(f"  Deleted file: {filename}")

3.2建立源数据库连接

打开源数据库文件,获得一个 sqlite3.Connection 对象。

source_conn = sqlite3.connect(SOURCE_DB)

3.3建立目标备份数据库连接

打开目标备份文件(如果文件不存在,sqlite3.connect() 会自动创建它)。

backup_conn = sqlite3.connect(BACKUP_DB)

3.4执行备份操作

调用源连接对象的 backup() 方法,并传入目标连接。

try:
    source_conn.backup(backup_conn)
    print(f"Backup of '{SOURCE_DB}' to '{BACKUP_DB}' completed successfully.")
except sqlite3.Error as e:
    print(f"Error during backup: {e}")

3.5关闭连接与资源管理

无论备份成功与否,都应该关闭两个数据库连接,释放资源。

finally:
    if source_conn:
        source_conn.close()
    if backup_conn:
        backup_conn.close()

4. Python 代码示例

import sqlite3
import os

# --- Configuration ---
SOURCE_DB = "my_application.db"
BACKUP_DB = "my_application_backup.db"

# --- Helper Functions (defined above, repeated for clarity) ---
def create_sample_db(db_filepath):
    """创建并填充一个示例SQLite数据库。"""
    print(f"\n--- Creating sample database: {db_filepath} ---")
    conn = sqlite3.connect(db_filepath)
    cursor = conn.cursor()
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT NOT NULL,
            email TEXT UNIQUE,
            age INTEGER
        );
    ''')
    cursor.execute("SELECT COUNT(*) FROM users;")
    if cursor.fetchone()[0] == 0:
        cursor.execute("INSERT INTO users (name, email, age) VALUES ('Alice', 'alice@example.com', 30);")
        cursor.execute("INSERT INTO users (name, email, age) VALUES ('Bob', 'bob@example.com', 25);")
        cursor.execute("INSERT INTO users (name, email, age) VALUES ('Charlie', 'charlie@example.com', 35);")
        conn.commit()
        print(f"  Inserted 3 sample users into '{db_filepath}'.")
    else:
        print(f"  '{db_filepath}' already contains data. Skipping insertion.")
    conn.close()

def cleanup_files(*filenames):
    """删除指定的文件。"""
    print("\n--- Cleaning up files ---")
    for filename in filenames:
        if os.path.exists(filename):
            os.remove(filename)
            print(f"  Deleted file: {filename}")

# --- Core Backup Function ---
def perform_backup(source_db_path, backup_db_path):
    """
    使用 sqlite3.Connection.backup() 方法备份 SQLite 数据库。
    """
    print(f"\n--- Starting backup from '{source_db_path}' to '{backup_db_path}' ---")
    source_conn = None
    backup_conn = None
    try:
        source_conn = sqlite3.connect(source_db_path)
        backup_conn = sqlite3.connect(backup_db_path)
        
        # 执行备份操作
        source_conn.backup(backup_conn)
        
        print(f"Backup of '{source_db_path}' to '{backup_db_path}' completed successfully.")
        return True
    except sqlite3.Error as e:
        print(f"Error during backup: {e}")
        return False
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return False
    finally:
        if source_conn:
            source_conn.close()
        if backup_conn:
            backup_conn.close()

# --- Verification Function ---
def verify_backup(db_filepath):
    """验证备份数据库是否存在并包含数据。"""
    print(f"\n--- Verifying backup: {db_filepath} ---")
    if not os.path.exists(db_filepath):
        print(f"  Error: Backup file '{db_filepath}' does not exist.")
        return False

    conn = None
    try:
        conn = sqlite3.connect(db_filepath)
        cursor = conn.cursor()
        cursor.execute("SELECT COUNT(*) FROM users;")
        row_count = cursor.fetchone()[0]
        print(f"  Backup file '{db_filepath}' exists and contains {row_count} rows in 'users' table.")
        
        # 打印一些示例数据
        cursor.execute("SELECT id, name, email, age FROM users LIMIT 2;")
        sample_data = cursor.fetchall()
        print("  Sample data from backup:")
        for row in sample_data:
            print(f"    ID: {row[0]}, Name: {row[1]}, Email: {row[2]}, Age: {row[3]}")
        return True
    except sqlite3.Error as e:
        print(f"  Error accessing backup database '{db_filepath}': {e}")
        return False
    finally:
        if conn:
            conn.close()

# --- Main execution ---
# cleanup_files(SOURCE_DB, BACKUP_DB) # Clean up previous runs
# create_sample_db(SOURCE_DB)
# if perform_backup(SOURCE_DB, BACKUP_DB):
#     verify_backup(BACKUP_DB)
# cleanup_files(SOURCE_DB, BACKUP_DB) # Clean up after execution

5. 最佳实践与注意事项

5.1错误处理 (try-except)

始终将备份操作包装在 try...except sqlite3.Error 块中,以捕获可能发生的数据库错误。如果使用 with 语句管理连接,它可以简化资源释放,但在 backup() 方法中,由于涉及两个连接,手动 finally 块确保关闭所有连接更为稳妥。

5.2备份进度监控 (使用progress回调)

对于大型数据库,备份可能需要一段时间。提供一个 progress 回调函数可以向用户显示备份进度。

def backup_progress(status, remaining, total):
    """
    备份进度回调函数。
    :param status: 已复制的页数。
    :param remaining: 剩余未复制的页数。
    :param total: 数据库的总页数。
    """
    print(f"\r  Copied: {status} pages, Remaining: {remaining} pages, Total: {total} pages "
          f"({(status/total)*100:.2f}%)", end='')

# 在 perform_backup 函数中调用
# source_conn.backup(backup_conn, progress=backup_progress)
# print() # 备份完成后换行

5.3文件路径管理

5.4替代方案:简单的文件复制 (shutil.copyfile)

如果你确定在备份时数据库没有被任何进程写入,或者数据库是一个只读文件,那么直接使用 shutil.copyfile() 是一种简单快速的方法。

import shutil

def simple_file_copy_backup(source_path, backup_path):
    """
    简单的文件复制备份,仅在源数据库未被写入时安全。
    """
    print(f"\n--- Performing simple file copy backup from '{source_path}' to '{backup_path}' ---")
    if not os.path.exists(source_path):
        print(f"  Error: Source database '{source_path}' does not exist.")
        return False
    try:
        shutil.copyfile(source_path, backup_path)
        print(f"  Successfully copied '{source_path}' to '{backup_path}'.")
        return True
    except Exception as e:
        print(f"  Error during file copy: {e}")
        return False

# --- 严重警告 ---
# 仅当确定源数据库在复制期间不会有任何写入操作时才使用此方法。
# 否则,你可能会得到一个损坏或不一致的备份文件。
# 对于正在使用的数据库,始终优先使用 sqlite3.Connection.backup()。

5.5备份压缩(zipfile,shutil.make_archive)

SQLite 数据库文件可能很大。为了节省存储空间和方便传输,可以考虑对备份文件进行压缩。

import zipfile

def compress_backup(db_filepath, zip_filepath):
    """将数据库文件压缩成zip文件。"""
    print(f"\n--- Compressing '{db_filepath}' to '{zip_filepath}' ---")
    try:
        with zipfile.ZipFile(zip_filepath, 'w', zipfile.ZIP_DEFLATED) as zf:
            zf.write(db_filepath, os.path.basename(db_filepath))
        print(f"  Successfully compressed '{db_filepath}' to '{zip_filepath}'.")
        return True
    except Exception as e:
        print(f"  Error compressing backup: {e}")
        return False

# Example usage:
# if perform_backup(SOURCE_DB, BACKUP_DB):
#     compress_backup(BACKUP_DB, BACKUP_DB + ".zip")

5.6自动化与调度

在生产环境中,备份通常是自动化任务。你可以使用:

5.7存储位置与恢复策略

5.8从备份恢复

恢复通常只是将备份文件复制回原始数据库的位置。但在复制之前,请确保原始数据库文件被关闭或删除,并且备份文件是正确的。

def restore_from_backup(backup_db_path, target_db_path):
    """
    从备份文件恢复数据库。
    警告:这将覆盖目标路径的现有数据库!
    """
    print(f"\n--- Restoring from '{backup_db_path}' to '{target_db_path}' ---")
    if not os.path.exists(backup_db_path):
        print(f"  Error: Backup file '{backup_db_path}' does not exist.")
        return False
    try:
        # 确保目标数据库连接已关闭
        if os.path.exists(target_db_path):
            os.remove(target_db_path) # 删除旧的(或损坏的)数据库
            print(f"  Deleted existing database at '{target_db_path}'.")
        shutil.copyfile(backup_db_path, target_db_path)
        print(f"  Successfully restored '{backup_db_path}' to '{target_db_path}'.")
        return True
    except Exception as e:
        print(f"  Error during restoration: {e}")
        return False

6. 综合代码示例:包含进度监控和验证

import sqlite3
import os
import shutil # For file copy operations and cleanup
import datetime # For timestamping backup files

# --- Configuration ---
SOURCE_DB = "production_app.db"
BACKUP_DIR = "backups" # Directory to store backups
ZIP_DIR = "compressed_backups" # Directory to store compressed backups

# --- 1. Helper Functions ---
def create_sample_db(db_filepath):
    """Creates and populates a sample SQLite database."""
    print(f"\n--- Creating sample database: {db_filepath} ---")
    conn = None
    try:
        conn = sqlite3.connect(db_filepath)
        cursor = conn.cursor()
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS inventory (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                item_name TEXT NOT NULL,
                quantity INTEGER DEFAULT 0,
                last_updated TEXT
            );
        ''')
        cursor.execute("SELECT COUNT(*) FROM inventory;")
        if cursor.fetchone()[0] == 0:
            cursor.execute("INSERT INTO inventory (item_name, quantity, last_updated) VALUES ('Laptop', 150, '2023-10-26 10:00:00');")
            cursor.execute("INSERT INTO inventory (item_name, quantity, last_updated) VALUES ('Monitor', 200, '2023-10-26 11:30:00');")
            cursor.execute("INSERT INTO inventory (item_name, quantity, last_updated) VALUES ('Keyboard', 300, '2023-10-26 12:00:00');")
            conn.commit()
            print(f"  Inserted 3 sample items into '{db_filepath}'.")
        else:
            print(f"  '{db_filepath}' already contains data. Skipping insertion.")
    except sqlite3.Error as e:
        print(f"  Database error during sample DB creation: {e}")
    finally:
        if conn:
            conn.close()

def cleanup_dirs(*dirs):
    """Deletes directories and their contents."""
    print("\n--- Cleaning up directories ---")
    for dir_path in dirs:
        if os.path.exists(dir_path):
            shutil.rmtree(dir_path)
            print(f"  Deleted directory: {dir_path}")

def cleanup_files(*filenames):
    """Deletes specified files."""
    for filename in filenames:
        if os.path.exists(filename):
            os.remove(filename)
            print(f"  Deleted file: {filename}")

def get_backup_filename(base_name, timestamp=True, extension=".db"):
    """生成带时间戳的备份文件名。"""
    if timestamp:
        current_time = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        return f"{base_name}_{current_time}{extension}"
    return f"{base_name}{extension}"

# --- 2. Backup Progress Callback ---
def backup_progress_callback(status, remaining, total):
    """Prints backup progress to the console."""
    percentage = (status / total) * 100 if total > 0 else 0
    print(f"\r  Progress: {status}/{total} pages ({percentage:.2f}%) "
          f"Remaining: {remaining} pages", end='', flush=True)

# --- 3. Core Backup Function ---
def perform_db_backup(source_db_path, backup_dir_path):
    """
    使用 sqlite3.Connection.backup() 方法备份 SQLite 数据库,并包含进度监控。
    备份文件将存储在 backup_dir_path 目录下,并带有时间戳。
    """
    if not os.path.exists(backup_dir_path):
        os.makedirs(backup_dir_path)
        print(f"  Created backup directory: {backup_dir_path}")

    base_name = os.path.basename(source_db_path).replace(".db", "")
    backup_filename = get_backup_filename(base_name)
    target_backup_path = os.path.join(backup_dir_path, backup_filename)
    
    print(f"\n--- Starting online backup from '{source_db_path}' to '{target_backup_path}' ---")
    source_conn = None
    backup_conn = None
    try:
        source_conn = sqlite3.connect(source_db_path)
        backup_conn = sqlite3.connect(target_backup_path)
        
        # 执行备份操作,包含进度回调
        source_conn.backup(backup_conn, pages=1, progress=backup_progress_callback, sleep=0.05) # Small pages for visible progress
        
        print("\nBackup completed successfully.")
        return target_backup_path
    except sqlite3.Error as e:
        print(f"\nError during backup: {e}")
        if os.path.exists(target_backup_path):
            os.remove(target_backup_path) # Clean up partial backup on error
            print(f"  Removed incomplete backup file: {target_backup_path}")
        return None
    except Exception as e:
        print(f"\nAn unexpected error occurred during backup: {e}")
        return None
    finally:
        if source_conn:
            source_conn.close()
        if backup_conn:
            backup_conn.close()

# --- 4. Verification Function ---
def verify_db_backup(db_filepath):
    """Verifies the backup database exists and contains data."""
    print(f"\n--- Verifying backup: {db_filepath} ---")
    if not os.path.exists(db_filepath):
        print(f"  Error: Backup file '{db_filepath}' does not exist.")
        return False

    conn = None
    try:
        conn = sqlite3.connect(db_filepath)
        cursor = conn.cursor()
        cursor.execute("SELECT COUNT(*) FROM inventory;")
        row_count = cursor.fetchone()[0]
        print(f"  Backup file '{db_filepath}' exists and contains {row_count} rows in 'inventory' table.")
        
        cursor.execute("SELECT id, item_name, quantity FROM inventory LIMIT 2;")
        sample_data = cursor.fetchall()
        print("  Sample data from backup:")
        for row in sample_data:
            print(f"    ID: {row[0]}, Item: {row[1]}, Quantity: {row[2]}")
        return True
    except sqlite3.Error as e:
        print(f"  Error accessing backup database '{db_filepath}': {e}")
        return False
    except Exception as e:
        print(f"  An unexpected error occurred during verification: {e}")
        return False
    finally:
        if conn:
            conn.close()

# --- 5. Compression Function ---
def compress_db_backup(db_filepath, compressed_dir_path):
    """Compresses a database file into a .zip archive."""
    if not os.path.exists(compressed_dir_path):
        os.makedirs(compressed_dir_path)
        print(f"  Created compressed backup directory: {compressed_dir_path}")

    zip_filename = os.path.basename(db_filepath) + ".zip"
    target_zip_path = os.path.join(compressed_dir_path, zip_filename)
    
    print(f"\n--- Compressing '{db_filepath}' to '{target_zip_path}' ---")
    try:
        with zipfile.ZipFile(target_zip_path, 'w', zipfile.ZIP_DEFLATED) as zf:
            zf.write(db_filepath, os.path.basename(db_filepath)) # Write with just filename inside zip
        print(f"  Successfully compressed '{db_filepath}' to '{target_zip_path}'.")
        return target_zip_path
    except Exception as e:
        print(f"  Error compressing backup: {e}")
        return None

# --- Main Program Execution ---
def main():
    cleanup_dirs(BACKUP_DIR, ZIP_DIR)
    cleanup_files(SOURCE_DB) # Ensure source DB is clean if it was left from previous run

    create_sample_db(SOURCE_DB) # Create the source database with some data

    # Perform backup using sqlite3.Connection.backup()
    backup_file_path = perform_db_backup(SOURCE_DB, BACKUP_DIR)
    
    if backup_file_path:
        verify_db_backup(backup_file_path) # Verify the created backup
        
        # Optionally, compress the backup file
        compressed_file_path = compress_db_backup(backup_file_path, ZIP_DIR)
        if compressed_file_path:
            print(f"\nCompressed backup available at: {compressed_file_path}")
            
    print("\n--- Program finished ---")
    # cleanup_dirs(BACKUP_DIR, ZIP_DIR) # Uncomment to delete backup files after run
    # cleanup_files(SOURCE_DB) # Uncomment to delete source DB after run

if __name__ == "__main__":
    main()

7. 总结

为您详尽解析了在 Python 中使用 sqlite3 模块备份 SQLite 数据库的方法。

核心要点回顾:

通过掌握这些方法和最佳实践,您将能够构建一个可靠的备份策略,有效保护您的 SQLite 数据库数据免受意外丢失。

到此这篇关于使用Python备份SQLite数据库的文章就介绍到这了,更多相关Python备份SQLite数据库内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文