python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python写入数据到PostgreSQL

Python批量写入数据到PostgreSQL三种主流方案对比详解

作者:detayun

本文对比了Python结合PostgreSQL三种主流方案(executemany()、COPY命令、pandas.to_sql())在百万级和千万级数据写入场景下的性能,并提出了优化建议,包括预处理语句、多线程并行写入等,需要的朋友可以参考下

在大数据处理场景中,Python与PostgreSQL的组合因其稳定性与扩展性成为主流选择。然而,当面对百万级甚至千万级数据写入时,不同方法的选择会直接影响性能表现。本文通过实测对比三种主流方案,揭示不同场景下的最优解。

一、性能测试环境

二、三种主流方案深度对比

方案1:executemany()批量插入(基础方案)

import psycopg2
from psycopg2.extras import execute_values

def executemany_insert(data):
    conn = psycopg2.connect(...)
    cursor = conn.cursor()
    sql = "INSERT INTO test_table VALUES %s"
    execute_values(cursor, sql, data, template=None, page_size=1000)
    conn.commit()

性能表现

适用场景

优化建议

方案2:COPY命令(高性能方案)

from io import StringIO
import pandas as pd

def copy_insert(df):
    conn = psycopg2.connect(...)
    cursor = conn.cursor()
    output = StringIO()
    df.to_csv(output, sep='\t', header=False, index=False)
    output.seek(0)
    cursor.copy_from(output, 'test_table', null='')
    conn.commit()

性能表现

关键优势

注意事项

方案3:pandas.to_sql()(便捷方案)

from sqlalchemy import create_engine
import pandas as pd

def pandas_insert(df):
    engine = create_engine('postgresql://...')
    df.to_sql('test_table', engine, if_exists='append', index=False, chunksize=5000)

性能表现

适用场景

性能瓶颈

三、进阶优化方案

1. 预处理语句(Prepared Statements)

def prepared_insert(data_batches):
    with connection.cursor() as cursor:
        pg_conn = cursor.connection
        pg_cursor = pg_conn.cursor()
        pg_cursor.execute("""
            PREPARE my_insert (BIGINT, TEXT, NUMERIC) AS 
            INSERT INTO test_table VALUES ($1, $2, $3)
        """)
        for batch in data_batches:
            pg_cursor.execute("BEGIN")
            for row in batch:
                pg_cursor.execute("EXECUTE my_insert (%s, %s, %s)", row)
            pg_cursor.execute("COMMIT")

性能提升

2. 多线程并行写入

from concurrent.futures import ThreadPoolExecutor

def parallel_copy(df_list):
    def process_chunk(df):
        conn = psycopg2.connect(...)
        cursor = conn.cursor()
        output = StringIO()
        df.to_csv(output, sep='\t', header=False, index=False)
        output.seek(0)
        cursor.copy_from(output, 'test_table', null='')
        conn.commit()
    
    with ThreadPoolExecutor(max_workers=4) as executor:
        executor.map(process_chunk, df_list)

性能表现

四、实测数据对比

方案写入速度(条/秒)内存占用CPU利用率复杂度
executemany()8,20012GB100%★☆☆
COPY命令19,5008GB300%★★☆
pandas.to_sql()3,20015GB60%★★★
预处理语句14,00010GB150%★★☆
多线程COPY28,00012GB400%★★★★

五、最佳实践建议

  1. 百万级数据:优先使用COPY命令,配合连接池管理
  2. 千万级数据:采用多线程COPY方案,注意:
    • 调整max_prepared_transactions参数
    • 增大shared_buffers(建议设为物理内存的25%)
    • 优化checkpoint_completion_target(建议0.9)
  3. 实时更新场景:预处理语句+连接池组合
  4. 复杂ETL流程:pandas预处理+COPY最终写入

六、性能调优参数

# postgresql.conf 关键参数
max_connections = 200
shared_buffers = 16GB
work_mem = 64MB
maintenance_work_mem = 1GB
max_wal_size = 4GB
checkpoint_completion_target = 0.9
bgwriter_lru_maxpages = 1000

结语

在PostgreSQL 17.0的测试环境中,COPY命令展现出碾压性优势,其写入速度是传统executemany()方案的2.4倍。对于超大规模数据导入,结合多线程与预处理语句的混合方案可将性能提升至接近理论极限。实际生产环境中,建议根据数据特征(字段复杂度、更新频率等)选择最适合的方案,并通过监控工具(如pgBadger)持续优化。

以上就是Python批量写入数据到PostgreSQL三种主流方案对比详解的详细内容,更多关于Python写入数据到PostgreSQL的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文