0

0

Python如何实现异步数据库操作?asyncpg库使用详解

絕刀狂花

絕刀狂花

发布时间:2025-08-08 12:55:01

|

385人浏览过

|

来源于php中文网

原创

asyncpg是postgresql异步操作的首选,1. 因其原生支持async/await语法,无需适配层,代码更自然;2. 性能卓越,基于c语言实现,直接对接postgresql二进制协议,减少python解释器开销;3. 提供精准的错误处理机制,将postgresql错误码映射为具体的python异常类,如uniqueviolationerror;4. 内置高效的连接池机制,通过create_pool创建连接池,利用min_size和max_size控制连接数量,结合async with pool.acquire()实现连接自动获取与释放,显著降低连接开销并提升资源利用率;5. 支持事务的上下文管理,通过async with conn.transaction()确保操作的原子性,异常时自动回滚,正常结束时自动提交,保障数据一致性;6. 具备良好的类型转换能力,支持jsonb、数组等复杂数据类型,便于处理实际业务场景;综上,asyncpg凭借高性能、易用性和健壮的错误处理机制,成为python中操作postgresql数据库最推荐的异步库。

Python如何实现异步数据库操作?asyncpg库使用详解

Python实现异步数据库操作,通常推荐使用支持

async/await
语法的库,其中
asyncpg
是PostgreSQL数据库的优秀选择,它提供了原生的异步支持,避免了传统同步操作的阻塞问题,显著提升了并发性能。

解决方案

实现异步数据库操作,核心在于利用Python的

asyncio
事件循环和专门为异步设计的数据库驱动。
asyncpg
就是这样一款库,它直接在C语言层面实现了PostgreSQL的二进制协议,提供了极高的性能和原生的异步支持。

使用

asyncpg
,你需要先安装它:
pip install asyncpg

立即学习Python免费学习笔记(深入)”;

基本的异步操作流程包括:

  1. 建立连接或获取连接池:这是异步操作的起点。对于单个操作或调试,可以直接建立连接;但对于生产环境,强烈推荐使用连接池。
  2. 执行查询:使用连接对象执行SQL查询,可以是
    execute
    (执行命令,不返回结果)、
    fetch
    (返回多行结果)、
    fetchrow
    (返回一行结果)或
    fetchval
    (返回单个值)。
  3. 处理结果:根据查询类型,处理返回的数据。
  4. 关闭连接或释放回连接池:确保资源得到妥善管理。
import asyncpg
import asyncio

async def main():
    # 建立一个连接(通常不推荐在生产环境直接使用,而是用连接池)
    conn = None
    try:
        conn = await asyncpg.connect(user='your_user', password='your_password',
                                     database='your_database', host='127.0.0.1')
        print("连接成功!")

        # 插入数据
        await conn.execute('''
            CREATE TABLE IF NOT EXISTS users (
                id SERIAL PRIMARY KEY,
                name TEXT,
                age INT
            )
        ''')
        print("表创建或已存在。")

        user_name = 'Alice'
        user_age = 30
        await conn.execute('INSERT INTO users(name, age) VALUES($1, $2)', user_name, user_age)
        print(f"插入了用户: {user_name}, {user_age}")

        # 查询数据
        rows = await conn.fetch('SELECT id, name, age FROM users WHERE age > $1', 25)
        print("查询结果:")
        for row in rows:
            print(f"  ID: {row['id']}, Name: {row['name']}, Age: {row['age']}")

        # 更新数据
        await conn.execute('UPDATE users SET age = $1 WHERE name = $2', 31, 'Alice')
        print("更新了Alice的年龄。")

        # 再次查询,验证更新
        updated_row = await conn.fetchrow("SELECT name, age FROM users WHERE name = 'Alice'")
        if updated_row:
            print(f"Alice更新后的年龄: {updated_row['age']}")

    except asyncpg.exceptions.PostgresError as e:
        print(f"数据库操作失败: {e}")
    finally:
        if conn:
            await conn.close()
            print("连接已关闭。")

# 运行主函数
# asyncio.run(main()) # 在Python 3.7+ 中使用

asyncpg为什么是PostgreSQL异步操作的首选?

当我第一次接触到异步编程,尤其是需要与数据库交互时,

asyncpg
确实给我留下了深刻印象。它不仅仅是“另一个”数据库驱动,它在设计哲学和性能上都有独到之处,使其成为PostgreSQL异步操作的佼佼者。

首先,它原生支持

async/await
。这意味着你不需要任何适配层或者额外的配置,就能直接在你的异步函数中使用它,代码看起来自然且符合Python异步编程的习惯。这种原生性避免了许多同步驱动在异步环境中的“包装”问题,减少了不必要的开销。

其次,性能是它的王牌

asyncpg
是用C语言实现的,直接与PostgreSQL的二进制协议对话。这意味着它绕过了很多Python层面的解释器开销,数据传输和解析效率极高。在处理大量并发请求时,这种底层优化带来的性能提升是显而易见的。我记得有一次,一个高并发的API需要频繁读写数据库,从同步的
psycopg2
切换到
asyncpg
后,响应时间有了质的飞跃,这让我对它的性能有了切身体会。

再者,它的错误处理和类型转换做得相当到位。

asyncpg
能将PostgreSQL的错误码映射到具体的Python异常类,这让错误处理变得更精确、更可控。比如,如果你尝试插入一个重复的主键,你会得到一个清晰的
asyncpg.exceptions.UniqueViolationError
,而不是一个泛泛的数据库错误。同时,它对PostgreSQL的数据类型有很好的自动转换能力,比如数组、JSONB等,这在处理复杂数据结构时非常方便。

最后,它内置了连接池。这对于任何生产级应用都是至关重要的。手动管理连接非常麻烦且容易出错,而连接池能够高效地复用连接,减少了连接建立和关闭的开销,同时也限制了数据库的最大连接数,保护了数据库资源。这一点,我们接下来会详细聊聊。

asyncpg连接池如何高效管理数据库连接?

谈到数据库连接,尤其是在高并发场景下,连接的创建和销毁是一个不小的开销。每次用户请求都去建立一个新的数据库连接,然后用完就关闭,这就像你每次喝水都要重新烧开水、洗杯子一样,效率极低。这就是为什么连接池如此重要,而

asyncpg
在这方面做得非常出色。

新快购物系统
新快购物系统

新快购物系统是集合目前网络所有购物系统为参考而开发,不管从速度还是安全我们都努力做到最好,此版虽为免费版但是功能齐全,无任何错误,特点有:专业的、全面的电子商务解决方案,使您可以轻松实现网上销售;自助式开放性的数据平台,为您提供充满个性化的设计空间;功能全面、操作简单的远程管理系统,让您在家中也可实现正常销售管理;严谨实用的全新商品数据库,便于查询搜索您的商品。

下载

asyncpg
的连接池(通过
asyncpg.create_pool
创建)就像一个“连接银行”,它预先创建并维护了一定数量的数据库连接。当你的应用程序需要执行数据库操作时,它不是去创建一个新连接,而是从这个“银行”里“借用”一个已经建立好的连接。用完之后,这个连接不会被关闭,而是被“归还”回连接池,等待下一次被借用。

这种模式带来了几个显著的好处:

  • 减少开销:避免了频繁的TCP握手、认证等连接建立过程,显著降低了每次数据库操作的延迟。
  • 资源复用:连接可以被多次复用,提高了数据库资源的利用率。
  • 连接限制:你可以设置连接池的最大连接数(
    max_size
    参数),这可以有效防止应用程序因为创建过多连接而耗尽数据库资源,导致数据库性能下降甚至崩溃。
  • 自动管理:连接池会处理连接的健康检查、超时管理等问题,减轻了开发者的负担。

实际使用中,通常会创建一个全局的或应用级别的连接池,然后所有数据库操作都通过这个池来获取连接。

import asyncpg
import asyncio

async def create_and_use_pool():
    # 创建连接池
    # min_size: 最小连接数
    # max_size: 最大连接数
    # timeout: 获取连接的超时时间
    # command_timeout: SQL命令执行的超时时间
    pool = await asyncpg.create_pool(
        user='your_user', password='your_password',
        database='your_database', host='127.00.1',
        min_size=5, max_size=10, command_timeout=60
    )
    print("连接池创建成功!")

    try:
        # 从连接池获取一个连接,使用async with确保连接在块结束后自动释放回池
        async with pool.acquire() as conn:
            # 执行查询
            result = await conn.fetchval('SELECT 1 + 1')
            print(f"1 + 1 = {result}")

            # 插入数据
            await conn.execute('INSERT INTO users(name, age) VALUES($1, $2)', 'Bob', 25)
            print("插入了Bob。")

        # 再次获取连接执行其他操作
        async with pool.acquire() as conn:
            users = await conn.fetch('SELECT name, age FROM users WHERE name = $1', 'Bob')
            if users:
                print(f"查询到Bob: {users[0]['name']}, {users[0]['age']}")

    except asyncpg.exceptions.PostgresError as e:
        print(f"数据库操作失败: {e}")
    finally:
        # 关闭连接池,释放所有连接
        await pool.close()
        print("连接池已关闭。")

# asyncio.run(create_and_use_pool())

使用

async with pool.acquire() as conn:
这种上下文管理器模式,是管理连接的最佳实践,它确保了无论代码块如何退出(正常完成或抛出异常),连接都会被正确地释放回连接池。

在asyncpg中如何处理事务和错误?

数据库操作中,事务和错误处理是保证数据一致性和系统健壮性的基石。在异步环境中,这些概念依然重要,

asyncpg
也提供了直观且强大的方式来处理它们。

事务处理

事务(Transaction)是一组原子性的数据库操作,要么全部成功提交,要么全部失败回滚。这对于需要多步操作才能完成的逻辑(比如转账:从A账户扣款,向B账户加款)至关重要。

asyncpg
通过上下文管理器提供了非常优雅的事务处理方式。

import asyncpg
import asyncio

async def transaction_example(pool):
    try:
        async with pool.acquire() as conn:
            # 使用 conn.transaction() 作为上下文管理器来管理事务
            async with conn.transaction():
                print("事务开始...")
                # 假设有一个accounts表,有id和balance字段
                # 模拟从账户1转账100到账户2

                # 扣款
                await conn.execute(
                    'UPDATE accounts SET balance = balance - $1 WHERE id = $2', 100, 1
                )
                print("账户1扣款成功。")

                # 模拟一个可能失败的操作,比如账户2不存在,或者余额不足
                # if True: # 模拟一个错误
                #     raise ValueError("模拟一个非数据库错误,看事务是否回滚")

                # 加款
                await conn.execute(
                    'UPDATE accounts SET balance = balance + $1 WHERE id = $2', 100, 2
                )
                print("账户2加款成功。")

            print("事务成功提交!") # 如果没有异常,事务会自动提交
    except asyncpg.exceptions.PostgresError as e:
        print(f"数据库事务失败,已回滚: {e}") # 任何PostgresError都会导致事务回滚
    except Exception as e:
        print(f"非数据库错误导致事务回滚: {e}") # 其他异常也会导致事务回滚

当你在

async with conn.transaction():
块内部抛出任何异常(无论是
asyncpg
的数据库异常还是普通的Python异常),事务都会自动回滚(ROLLBACK)。如果块正常执行完毕,事务则会自动提交(COMMIT)。这种设计极大地简化了事务管理,减少了手动
BEGIN
,
COMMIT
,
ROLLBACK
的样板代码。

错误处理

asyncpg
将PostgreSQL的错误码映射到具体的Python异常类,这使得错误处理更加精细化。所有
asyncpg
相关的错误都继承自
asyncpg.exceptions.PostgresError

import asyncpg
import asyncio

async def error_handling_example(pool):
    try:
        async with pool.acquire() as conn:
            # 尝试插入一个违反唯一约束的数据
            # 假设有一个users表,name字段是UNIQUE
            await conn.execute('''
                CREATE TABLE IF NOT EXISTS unique_users (
                    id SERIAL PRIMARY KEY,
                    username TEXT UNIQUE
                )
            ''')
            print("unique_users表创建或已存在。")

            await conn.execute("INSERT INTO unique_users(username) VALUES($1)", "john_doe")
            print("第一次插入john_doe成功。")

            # 再次插入相同的username,这将触发UniqueViolationError
            await conn.execute("INSERT INTO unique_users(username) VALUES($1)", "john_doe")
            print("第二次插入john_doe成功。") # 这行代码通常不会执行到

    except asyncpg.exceptions.UniqueViolationError as e:
        print(f"捕获到唯一约束冲突错误: {e}")
        # 这里可以进行特定的错误处理,例如提示用户换一个用户名
    except asyncpg.exceptions.PostgresError as e:
        print(f"捕获到其他PostgreSQL错误: {e}")
        # 处理其他数据库错误
    except Exception as e:
        print(f"捕获到非数据库错误: {e}")
        # 处理其他所有非数据库错误
    finally:
        print("错误处理示例完成。")

# 假设 pool 已经创建
# asyncio.run(error_handling_example(pool))

通过捕获

asyncpg.exceptions
模块中定义的具体异常,你可以针对不同类型的数据库错误执行不同的逻辑。例如,
UniqueViolationError
通常意味着用户输入了重复的数据,而
IntegrityConstraintViolationError
可能涉及外键约束。这种细粒度的错误处理,让你的应用在面对数据库问题时,能够给出更智能、更友好的响应。

当然,在实际项目中,你可能还会遇到网络问题、连接超时等,这些也都可以通过

asyncpg
asyncio
提供的异常来捕获和处理。关键在于理解异常的层级和类型,从而构建健壮的错误恢复机制。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
C语言变量命名
C语言变量命名

c语言变量名规则是:1、变量名以英文字母开头;2、变量名中的字母是区分大小写的;3、变量名不能是关键字;4、变量名中不能包含空格、标点符号和类型说明符。php中文网还提供c语言变量的相关下载、相关课程等内容,供大家免费下载使用。

401

2023.06.20

c语言入门自学零基础
c语言入门自学零基础

C语言是当代人学习及生活中的必备基础知识,应用十分广泛,本专题为大家c语言入门自学零基础的相关文章,以及相关课程,感兴趣的朋友千万不要错过了。

619

2023.07.25

c语言运算符的优先级顺序
c语言运算符的优先级顺序

c语言运算符的优先级顺序是括号运算符 > 一元运算符 > 算术运算符 > 移位运算符 > 关系运算符 > 位运算符 > 逻辑运算符 > 赋值运算符 > 逗号运算符。本专题为大家提供c语言运算符相关的各种文章、以及下载和课程。

354

2023.08.02

c语言数据结构
c语言数据结构

数据结构是指将数据按照一定的方式组织和存储的方法。它是计算机科学中的重要概念,用来描述和解决实际问题中的数据组织和处理问题。数据结构可以分为线性结构和非线性结构。线性结构包括数组、链表、堆栈和队列等,而非线性结构包括树和图等。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

259

2023.08.09

c语言random函数用法
c语言random函数用法

c语言random函数用法:1、random.random,随机生成(0,1)之间的浮点数;2、random.randint,随机生成在范围之内的整数,两个参数分别表示上限和下限;3、random.randrange,在指定范围内,按指定基数递增的集合中获得一个随机数;4、random.choice,从序列中随机抽选一个数;5、random.shuffle,随机排序。

604

2023.09.05

c语言const用法
c语言const用法

const是关键字,可以用于声明常量、函数参数中的const修饰符、const修饰函数返回值、const修饰指针。详细介绍:1、声明常量,const关键字可用于声明常量,常量的值在程序运行期间不可修改,常量可以是基本数据类型,如整数、浮点数、字符等,也可是自定义的数据类型;2、函数参数中的const修饰符,const关键字可用于函数的参数中,表示该参数在函数内部不可修改等等。

530

2023.09.20

c语言get函数的用法
c语言get函数的用法

get函数是一个用于从输入流中获取字符的函数。可以从键盘、文件或其他输入设备中读取字符,并将其存储在指定的变量中。本文介绍了get函数的用法以及一些相关的注意事项。希望这篇文章能够帮助你更好地理解和使用get函数 。

645

2023.09.20

c数组初始化的方法
c数组初始化的方法

c语言数组初始化的方法有直接赋值法、不完全初始化法、省略数组长度法和二维数组初始化法。详细介绍:1、直接赋值法,这种方法可以直接将数组的值进行初始化;2、不完全初始化法,。这种方法可以在一定程度上节省内存空间;3、省略数组长度法,这种方法可以让编译器自动计算数组的长度;4、二维数组初始化法等等。

603

2023.09.22

Python 自然语言处理(NLP)基础与实战
Python 自然语言处理(NLP)基础与实战

本专题系统讲解 Python 在自然语言处理(NLP)领域的基础方法与实战应用,涵盖文本预处理(分词、去停用词)、词性标注、命名实体识别、关键词提取、情感分析,以及常用 NLP 库(NLTK、spaCy)的核心用法。通过真实文本案例,帮助学习者掌握 使用 Python 进行文本分析与语言数据处理的完整流程,适用于内容分析、舆情监测与智能文本应用场景。

10

2026.01.27

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
最新Python教程 从入门到精通
最新Python教程 从入门到精通

共4课时 | 22.3万人学习

Django 教程
Django 教程

共28课时 | 3.6万人学习

SciPy 教程
SciPy 教程

共10课时 | 1.3万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号