0

0

PostgreSQL海量数据高效导入策略:Python与Django实践指南

碧海醫心

碧海醫心

发布时间:2025-12-01 13:07:10

|

280人浏览过

|

来源于php中文网

原创

postgresql海量数据高效导入策略:python与django实践指南

本文旨在提供在Python和Django环境下,向PostgreSQL数据库高效导入海量数据的策略与实践。针对传统批处理插入可能面临的性能瓶颈和连接中断问题,文章详细介绍了两种优化方案:利用数据库会话的预处理语句(Prepared Statements)提升重复插入效率,以及采用PostgreSQL原生的`COPY`命令实现极致的导入速度。同时,文章还将探讨相关最佳实践,包括事务管理、索引优化及与Django框架的集成考量。

在处理大规模数据导入PostgreSQL时,传统的逐行插入或简单的多值INSERT语句可能无法满足性能要求,甚至会导致OperationalError: server closed the connection unexpectedly之类的连接问题。本教程将深入探讨如何利用PostgreSQL的特性,结合Python和Django环境,实现高效、稳定的海量数据导入。

1. 传统批处理插入的局限性

当前常见的批处理插入方法,如通过INSERT INTO ... VALUES (v1), (v2), ...的形式一次性插入多行数据,确实比单行插入效率更高。然而,当数据量达到百万级别或更高时,这种方法仍可能面临以下挑战:

  • SQL语句过长: 随着批次内行数的增加,生成的SQL语句会非常庞大,这增加了网络传输负担、数据库解析时间以及潜在的内存消耗。
  • 重复解析: 每次执行cursor.execute()时,数据库都需要对SQL语句进行解析、规划和优化,即使是结构相同的语句,参数不同也可能导致重复解析。
  • 事务开销: 如果不合理地管理事务,每次批处理都可能开启和提交独立事务,增加不必要的开销。
  • 客户端计算负担: 在将数据发送到数据库之前,如果在Python端有大量的“计算密集型操作”,这会显著影响整体导入时间。

2. 优化方案一:利用预处理语句(Prepared Statements)

预处理语句允许数据库对一个SQL模板进行一次性解析、规划和优化,然后可以多次执行,只需提供不同的参数。这对于重复执行相同结构但参数不同的插入操作非常有效。

立即学习Python免费学习笔记(深入)”;

虽然Django ORM通常不会直接暴露预处理语句的API,但通过直接操作psycopg2游标,我们可以实现这一优化。

工作原理:

  1. 准备(PREPARE): 定义一个带参数占位符的SQL语句。数据库会对其进行解析和优化,并生成一个执行计划。
  2. 执行(EXECUTE): 使用准备好的语句名称和具体参数来执行操作。数据库直接使用已有的执行计划,无需重新解析。

Python/psycopg2 示例:

from django.db import connection
import psycopg2

def insert_with_prepared_statement(data_batches):
    with connection.cursor() as cursor:
        # 获取底层的psycopg2连接和游标
        pg_conn = cursor.connection
        pg_cursor = pg_conn.cursor()

        try:
            # 1. 准备语句
            # 使用psycopg2的execute()方法执行PREPARE命令
            # 声明一个名为'my_insert_stmt'的预处理语句
            # 参数类型需要明确指定,例如TEXT, INT, BIGINT等
            pg_cursor.execute("""
                PREPARE my_insert_stmt (BIGINT, TEXT, BIGINT, NUMERIC) AS
                INSERT INTO per_transaction_table (company_ref_id_id_column, rrn_column, transaction_type_ref_id_id_column, transactionamount_column)
                VALUES ($1, $2, $3, $4)
                ON CONFLICT (rrn_column) DO UPDATE SET company_ref_id_id_column = EXCLUDED.company_ref_id_id_column;
            """)

            for batch in data_batches:
                # 开启一个事务块,确保批次内的操作原子性
                pg_cursor.execute("BEGIN;")
                for row_data in batch:
                    # 2. 执行语句
                    # 使用EXECUTE命令调用预处理语句,并传入参数
                    pg_cursor.execute("EXECUTE my_insert_stmt (%s, %s, %s, %s);", row_data)
                pg_cursor.execute("COMMIT;")

            print(f"Successfully inserted {len(data_batches) * len(data_batches[0])} rows using prepared statements.")

        except psycopg2.Error as e:
            pg_conn.rollback() # 发生错误时回滚
            print(f"Database error: {e}")
        finally:
            # 3. 释放语句 (可选,会话结束时会自动释放)
            pg_cursor.execute("DEALLOCATE my_insert_stmt;")
            pg_cursor.close()

# 示例数据生成 (假设数据已包含计算结果)
# data_batches 应该是一个列表的列表,每个内部列表代表一个批次,每个批次包含多个元组,每个元组代表一行数据
# 例如:[[ (1, 'R1', 101, 100.50), (2, 'R2', 102, 200.75) ], ...]
# 假设 company_ref_id_id_column 为 BIGINT, rrn_column 为 TEXT, transaction_type_ref_id_id_column 为 BIGINT, transactionamount_column 为 NUMERIC
# (请根据实际表结构调整参数类型和顺序)
# example_data_batches = [
#     [(1, 'R1', 101, 100.50), (2, 'R2', 102, 200.75)],
#     [(3, 'R3', 103, 300.25), (4, 'R4', 104, 400.00)]
# ]
# insert_with_prepared_statement(example_data_batches)

注意事项:

  • 预处理语句在当前数据库会话中有效。如果连接关闭或会话结束,预处理语句将失效。
  • PREPARE语句需要明确指定参数的类型。
  • 对于包含ON CONFLICT子句的复杂插入逻辑,预处理语句依然适用。

3. 优化方案二:使用PostgreSQL COPY 命令

COPY命令是PostgreSQL提供的一种最高效的数据导入方式,它允许直接在服务器端进行数据传输,绕过了SQL解析器的大部分开销。它比任何INSERT语句都快,因为它是为批量加载而设计的。

COPY命令支持从文件导入 (COPY FROM filename) 或从标准输入导入 (COPY FROM STDIN)。对于Python应用程序,COPY FROM STDIN是最常用的方式,通过psycopg2的copy_from或copy_expert方法实现。

COPY命令的优势:

  • 服务器端操作: 数据直接流式传输到数据库,减少客户端-服务器往返次数。
  • 极低开销: 绕过SQL解析和优化,直接将数据写入表文件。
  • 支持多种格式: CSV、文本等。

Python/psycopg2 COPY FROM STDIN 示例:

Chromox
Chromox

Chromox是一款领先的AI在线生成平台,专为喜欢AI生成技术的爱好者制作的多种图像、视频生成方式的内容型工具平台。

下载
import io
from django.db import connection
import psycopg2

def insert_with_copy_command(data_generator):
    with connection.cursor() as cursor:
        pg_conn = cursor.connection
        pg_cursor = pg_conn.cursor()

        try:
            # 使用StringIO模拟文件,将数据格式化为CSV或TSV
            # 确保数据的顺序与目标表的列顺序一致
            # 如果有ON CONFLICT需求,需要使用COPY FROM PROGRAM 或 copy_expert 结合临时表
            # 或者先COPY到临时表,再从临时表进行UPSERT
            # 这里先展示最简单的COPY,不带ON CONFLICT
            output = io.StringIO()
            for row_data in data_generator:
                # 假设数据是 (company_ref_id, rrn, transaction_type_ref_id, transaction_amount)
                # 并且 rrn_column 是文本类型,其他是数字
                # 格式化为CSV格式,逗号分隔,文本字段加引号
                output.write(f"{row_data[0]},\"{row_data[1]}\",{row_data[2]},{row_data[3]}\n")

            output.seek(0) # 将文件指针移到开头

            # 执行COPY命令
            # 注意:如果表中有ON CONFLICT,COPY INTO TABLE 无法直接处理。
            # 通常的做法是COPY到临时表,然后从临时表进行UPSERT。
            # 或者使用COPY FROM PROGRAM并结合SQL语句,但更复杂。
            # 对于有ON CONFLICT的场景,推荐先COPY到临时表,再进行MERGE/UPSERT。

            # 简单的COPY示例 (无ON CONFLICT)
            table_name = "per_transaction_table" # 替换为你的表名
            columns = "(company_ref_id_id_column, rrn_column, transaction_type_ref_id_id_column, transactionamount_column)"

            # 使用copy_expert来处理更复杂的COPY选项,例如CSV格式
            pg_cursor.copy_expert(
                f"COPY {table_name} {columns} FROM STDIN WITH (FORMAT CSV, DELIMITER ',', QUOTE '\"');",
                output
            )
            pg_conn.commit() # COPY操作通常需要在一个事务中
            print(f"Successfully inserted data using COPY command.")

        except psycopg2.Error as e:
            pg_conn.rollback()
            print(f"Database error during COPY: {e}")
        finally:
            pg_cursor.close()

# 示例数据生成器 (假设数据已包含计算结果)
# def generate_large_data(num_rows):
#     for i in range(num_rows):
#         yield (i + 1, f'R{i+1:07d}', (i % 10) + 100, (i + 1) * 10.50)
#
# insert_with_copy_command(generate_large_data(1000000))

处理ON CONFLICT与COPY:

COPY命令本身不直接支持ON CONFLICT。如果需要处理冲突(UPSERT),通常有以下策略:

  1. COPY到临时表,然后UPSERT:

    • 创建一个与目标表结构相同的临时表。
    • 使用COPY命令将所有数据导入到临时表。
    • 执行一个INSERT INTO ... SELECT ... ON CONFLICT DO UPDATE语句,从临时表将数据导入到目标表。
    • 删除临时表。
    def insert_with_copy_and_upsert(data_generator, target_table_name, conflict_column, columns_to_insert):
        with connection.cursor() as cursor:
            pg_conn = cursor.connection
            pg_cursor = pg_conn.cursor()
            temp_table_name = f"temp_{target_table_name}_{pg_conn.pid}" # 使用进程ID避免冲突
    
            try:
                # 1. 创建临时表 (结构与目标表一致)
                pg_cursor.execute(f"""
                    CREATE TEMPORARY TABLE {temp_table_name} (LIKE {target_table_name} INCLUDING DEFAULTS);
                """)
    
                # 2. 准备数据并COPY到临时表
                output = io.StringIO()
                for row_data in data_generator:
                    # 确保数据格式与temp_table_name的列匹配
                    output.write(",".join(map(str, row_data)) + "\n") # 简单示例,实际需根据数据类型做CSV/TSV格式化
                output.seek(0)
    
                pg_cursor.copy_expert(
                    f"COPY {temp_table_name} ({','.join(columns_to_insert)}) FROM STDIN WITH (FORMAT CSV, DELIMITER ',');",
                    output
                )
    
                # 3. 从临时表进行UPSERT到目标表
                update_set_clause = ", ".join([f"{col} = EXCLUDED.{col}" for col in columns_to_insert if col != conflict_column])
    
                pg_cursor.execute(f"""
                    INSERT INTO {target_table_name} ({','.join(columns_to_insert)})
                    SELECT {','.join(columns_to_insert)} FROM {temp_table_name}
                    ON CONFLICT ({conflict_column}) DO UPDATE SET {update_set_clause};
                """)
                pg_conn.commit()
                print(f"Successfully inserted/updated data using COPY to temp table and UPSERT.")
    
            except psycopg2.Error as e:
                pg_conn.rollback()
                print(f"Database error during COPY+UPSERT: {e}")
            finally:
                # 4. 删除临时表
                pg_cursor.execute(f"DROP TABLE IF EXISTS {temp_table_name};")
                pg_cursor.close()
    
    # 示例调用
    # columns_to_insert = ['company_ref_id_id_column', 'rrn_column', 'transaction_type_ref_id_id_column', 'transactionamount_column']
    # conflict_col = 'rrn_column'
    # insert_with_copy_and_upsert(generate_large_data(1000000), 'per_transaction_table', conflict_col, columns_to_insert)

4. 性能优化与最佳实践

除了选择合适的导入方法,还有一些通用的最佳实践可以进一步提升性能:

4.1 索引和约束管理

  • 延迟创建索引: 在导入海量数据之前,如果目标表上存在大量索引、唯一约束或外键约束,每次插入都会触发这些约束的检查和索引的更新,显著降低导入速度。
    • 最佳实践: 在导入数据前,暂时禁用或删除所有非主键索引和外键约束。完成数据导入后,再重新创建这些索引和约束。对于唯一约束,可以先删除,导入后重建为CREATE UNIQUE INDEX CONCURRENTLY。
  • 主键和唯一约束: 对于主键和唯一约束,如果数据量巨大且存在冲突,ON CONFLICT是必要的。但在COPY场景下,如前所述,通常需要结合临时表处理。

4.2 事务管理

  • 大事务 vs. 小事务: 将多个批次操作封装在一个大事务中,可以减少事务提交的开销,但如果事务过大,失败时回滚的代价也高,且可能长时间锁定表。
  • 合理批次大小: 找到一个平衡点。批次太小,事务开销大;批次太大,可能导致内存问题、网络超时或回滚开销大。10万到50万行通常是一个不错的起点,具体需要根据数据大小、服务器资源和网络状况进行测试。

4.3 UNLOGGED 表(非日志表)

  • 如果数据是临时的、可以随时重建的,并且不需要WAL日志记录(即不需要崩溃恢复或流复制),可以考虑使用UNLOGGED TABLE。

  • 优势: UNLOGGED表不会写入WAL日志,这使得数据写入速度极快。

  • 劣势: 数据库崩溃时,非日志表内容会丢失。它们也不能用于流复制或时间点恢复。

  • 使用场景: 作为临时数据暂存区,导入后进行处理或聚合,然后将结果写入永久表。

    CREATE UNLOGGED TABLE my_temp_data (
        id BIGINT,
        name TEXT
    );

    对于COPY到UNLOGGED表,可以使用WITH (FREEZE)选项进一步优化,但这通常在COPY到空表时效果显著,且需谨慎使用,因为它会标记行已冻结,跳过VACUUM检查。

4.4 客户端计算优化

  • 原始问题中提到“Process Intensive Calculations”。这些计算应尽可能在数据导入数据库之前完成,并且应优化其性能。如果计算量巨大,可以考虑使用多进程或异步任务来并行处理数据,再将处理后的结果批量导入。

4.5 Django ORM与底层游标

  • Django的bulk_create()方法是处理批量插入的便捷方式,它会生成一个优化的INSERT语句。但它不支持ON CONFLICT,且在处理百万级别数据时,其性能可能不如直接使用psycopg2的COPY命令。
  • 当需要极致性能或特定数据库特性(如COPY、预处理语句)时,直接通过django.db.connection.cursor().connection.cursor()获取底层的psycopg2游标是必要的。

总结

在Python和Django环境中向PostgreSQL导入海量数据时,选择合适的策略至关重要。

  1. 对于中等规模的批量插入或需要ON CONFLICT的场景,并且数据量并非极端巨大时,预处理语句(Prepared Statements)是一个很好的选择。 它减少了数据库的解析开销,提升了重复插入的效率。
  2. 对于需要极致导入速度的场景,特别是百万级以上的数据,PostgreSQL的COPY命令是无可匹敌的最佳方案。 结合临时表进行UPSERT是处理冲突的有效策略。
  3. 在实施任何导入策略时,务必考虑索引和约束的暂时禁用、合理的事务管理以及客户端计算的优化。 这些辅助措施能显著提升整体导入性能。

通过综合运用这些技术,可以有效地解决海量数据导入PostgreSQL所面临的性能和稳定性挑战。始终建议在实际生产环境前,在测试环境中进行充分的性能测试和调优。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

WorkBuddy
WorkBuddy

腾讯云推出的AI原生桌面智能体工作台

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
数据分析工具有哪些
数据分析工具有哪些

数据分析工具有Excel、SQL、Python、R、Tableau、Power BI、SAS、SPSS和MATLAB等。详细介绍:1、Excel,具有强大的计算和数据处理功能;2、SQL,可以进行数据查询、过滤、排序、聚合等操作;3、Python,拥有丰富的数据分析库;4、R,拥有丰富的统计分析库和图形库;5、Tableau,提供了直观易用的用户界面等等。

1134

2023.10.12

SQL中distinct的用法
SQL中distinct的用法

SQL中distinct的语法是“SELECT DISTINCT column1, column2,...,FROM table_name;”。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

340

2023.10.27

SQL中months_between使用方法
SQL中months_between使用方法

在SQL中,MONTHS_BETWEEN 是一个常见的函数,用于计算两个日期之间的月份差。想了解更多SQL的相关内容,可以阅读本专题下面的文章。

381

2024.02.23

SQL出现5120错误解决方法
SQL出现5120错误解决方法

SQL Server错误5120是由于没有足够的权限来访问或操作指定的数据库或文件引起的。想了解更多sql错误的相关内容,可以阅读本专题下面的文章。

2174

2024.03.06

sql procedure语法错误解决方法
sql procedure语法错误解决方法

sql procedure语法错误解决办法:1、仔细检查错误消息;2、检查语法规则;3、检查括号和引号;4、检查变量和参数;5、检查关键字和函数;6、逐步调试;7、参考文档和示例。想了解更多语法错误的相关内容,可以阅读本专题下面的文章。

380

2024.03.06

oracle数据库运行sql方法
oracle数据库运行sql方法

运行sql步骤包括:打开sql plus工具并连接到数据库。在提示符下输入sql语句。按enter键运行该语句。查看结果,错误消息或退出sql plus。想了解更多oracle数据库的相关内容,可以阅读本专题下面的文章。

1703

2024.04.07

sql中where的含义
sql中where的含义

sql中where子句用于从表中过滤数据,它基于指定条件选择特定的行。想了解更多where的相关内容,可以阅读本专题下面的文章。

585

2024.04.29

sql中删除表的语句是什么
sql中删除表的语句是什么

sql中用于删除表的语句是drop table。语法为drop table table_name;该语句将永久删除指定表的表和数据。想了解更多sql的相关内容,可以阅读本专题下面的文章。

440

2024.04.29

C# ASP.NET Core微服务架构与API网关实践
C# ASP.NET Core微服务架构与API网关实践

本专题围绕 C# 在现代后端架构中的微服务实践展开,系统讲解基于 ASP.NET Core 构建可扩展服务体系的核心方法。内容涵盖服务拆分策略、RESTful API 设计、服务间通信、API 网关统一入口管理以及服务治理机制。通过真实项目案例,帮助开发者掌握构建高可用微服务系统的关键技术,提高系统的可扩展性与维护效率。

76

2026.03.11

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
最新Python教程 从入门到精通
最新Python教程 从入门到精通

共4课时 | 22.5万人学习

Django 教程
Django 教程

共28课时 | 5万人学习

SciPy 教程
SciPy 教程

共10课时 | 1.9万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号