0

0

如何在 Python 中异步操作数据库?aiomysql、asyncpg、aioredis 使用介绍

PHPz

PHPz

发布时间:2023-05-06 12:55:06

|

1845人浏览过

|

来源于51CTO.COM

转载


如何在 Python 中异步操作数据库?aiomysql、asyncpg、aioredis 使用介绍

Python 目前已经进化到了 3.8 版本,对操作数据库也提供了相应的异步支持。当我们做一个 Web 服务时,性能的瓶颈绝大部分都在数据库上,如果一个请求从数据库中读数据的时候能够自动切换、去处理其它请求的话,是不是就能提高并发量了呢。

(编者注:原文写于 2020 年 2 月,当时最新为Python 3.8,文章内容现在仍未过时)

下面我们来看看如何使用 Python 异步操作 MySQL、PostgreSQL 以及 Redis,以上几个可以说是最常用的数据库了。至于 SQLServer、Oracle,本人没有找到相应的异步驱动,有兴趣可以自己去探索一下。

而操作数据库无非就是增删改查,下面我们来看看如何异步实现它们。

异步操作 MySQL

异步操作 MySQL 的话,需要使用一个 aiomysql,直接 pip install aiomysql 即可。

aiomysql 底层依赖于 pymysql,所以 aiomysql 并没有单独实现相应的连接驱动,而是在 pymysql 之上进行了封装。

查询记录

下面先来看看如何查询记录。

import asyncio
import aiomysql.sa as aio_sa
async def main():
# 创建一个异步引擎
engine = await aio_sa.create_engine(host="xx.xxx.xx.xxx",
port=3306,
user="root",
password="root",
db="_hanser",
connect_timeout=10)
# 通过 engine.acquire() 获取一个连接
async with engine.acquire() as conn:
# 异步执行, 返回一个 <class 'aiomysql.sa.result.ResultProxy'> 对象
result = await conn.execute("SELECT * FROM girl")
# 通过 await result.fetchone() 可以获取满足条件的第一条记录, 一个 <class 'aiomysql.sa.result.RowProxy'> 对象
data = await result.fetchone()
# 可以将 <class 'aiomysql.sa.result.RowProxy'> 对象想象成一个字典
print(data.keys())# KeysView((1, '古明地觉', 16, '地灵殿'))
print(list(data.keys()))# ['id', 'name', 'age', 'place']
print(data.values())# ValuesView((1, '古明地觉', 16, '地灵殿'))
print(list(data.values()))# [1, '古明地觉', 16, '地灵殿']
print(data.items())# ItemsView((1, '古明地觉', 16, '地灵殿'))
print(list(data.items()))# [('id', 1), ('name', '古明地觉'), ('age', 16), ('place', '地灵殿')]
# 直接转成字典也是可以的
print(dict(data))# {'id': 1, 'name': '古明地觉', 'age': 16, 'place': '地灵殿'}
# 最后别忘记关闭引擎, 当然你在创建引擎的时候也可以通过 async with aio_sa.create_engine 的方式创建
# async with 语句结束后会自动执行下面两行代码
engine.close()
await engine.wait_closed()
 loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

怎么样,是不是很简单呢,和同步库的操作方式其实是类似的。但是很明显,我们在获取记录的时候不会只获取一条,而是会获取多条,获取多条的话使用 await result.fetchall() 即可。

import asyncio
from pprint import pprint
import aiomysql.sa as aio_sa
async def main():
# 通过异步上下文管理器的方式创建, 会自动帮我们关闭引擎
async with aio_sa.create_engine(host="xx.xxx.xx.xxx",
port=3306,
user="root",
password="root",
db="_hanser",
connect_timeout=10) as engine:
async with engine.acquire() as conn:
result = await conn.execute("SELECT * FROM girl")
# 此时的 data 是一个列表, 列表里面是 <class 'aiomysql.sa.result.RowProxy'> 对象
data = await result.fetchall()
# 将里面的元素转成字典
pprint(list(map(dict, data)))
"""
[{'age': 16, 'id': 1, 'name': '古明地觉', 'place': '地灵殿'},
 {'age': 16, 'id': 2, 'name': '雾雨魔理沙', 'place': '魔法森林'},
 {'age': 400, 'id': 3, 'name': '芙兰朵露', 'place': '红魔馆'}]
"""
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

除了 fetchone、fetchall 之外,还有一个 fetchmany,可以获取指定记录的条数。

import asyncio
from pprint import pprint
import aiomysql.sa as aio_sa
async def main():
# 通过异步上下文管理器的方式创建, 会自动帮我们关闭引擎
async with aio_sa.create_engine(host="xx.xxx.xx.xxx",
port=3306,
user="root",
password="root",
db="_hanser",
connect_timeout=10) as engine:
async with engine.acquire() as conn:
result = await conn.execute("SELECT * FROM girl")
# 默认是获取一条, 得到的仍然是一个列表
data = await result.fetchmany(2)
pprint(list(map(dict, data)))
"""
[{'age': 16, 'id': 1, 'name': '古明地觉', 'place': '地灵殿'},
 {'age': 16, 'id': 2, 'name': '雾雨魔理沙', 'place': '魔法森林'}]
"""
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

以上就是通过 aiomysql 查询数据库中的记录,没什么难度。但是值得一提的是,await conn.execute 里面除了可以传递一个原生的 SQL 语句之外,我们还可以借助 SQLAlchemy。

import asyncio
from pprint import pprint
import aiomysql.sa as aio_sa
from sqlalchemy.sql.selectable import Select
from sqlalchemy import text
async def main():
async with aio_sa.create_engine(host="xx.xxx.xx.xxx",
port=3306,
user="root",
password="root",
db="_hanser",
connect_timeout=10) as engine:
async with engine.acquire() as conn:
sql = Select([text("id, name, place")], whereclause=text("id != 1"), from_obj=text("girl"))
result = await conn.execute(sql)
data = await result.fetchall()
pprint(list(map(dict, data)))
"""
[{'id': 2, 'name': '雾雨魔理沙', 'place': '魔法森林'},
 {'id': 3, 'name': '芙兰朵露', 'place': '红魔馆'}]
"""
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

添加记录

然后是添加记录,我们同样可以借助 SQLAlchemy 帮助我们拼接 SQL 语句。

import asyncio
from pprint import pprint
import aiomysql.sa as aio_sa
from sqlalchemy import Table, MetaData, create_engine
async def main():
async with aio_sa.create_engine(host="xx.xx.xx.xxx",
port=3306,
user="root",
password="root",
db="_hanser",
connect_timeout=10) as engine:
async with engine.acquire() as conn:
# 我们还需要创建一个 SQLAlchemy 中的引擎, 然后将表反射出来
s_engine = create_engine("mysql+pymysql://root:root@xx.xx.xx.xxx:3306/_hanser")
tbl = Table("girl", MetaData(bind=s_engine), autoload=True
insert_sql = tbl.insert().values(
[{"name": "十六夜咲夜", "age": 17, "place": "红魔馆"},
 {"name": "琪露诺", "age": 60, "place": "雾之湖"}])
# 注意: 执行的执行必须开启一个事务, 否则数据是不会进入到数据库中的
async with conn.begin():
# 同样会返回一个 <class 'aiomysql.sa.result.ResultProxy'> 对象
# 尽管我们插入了多条, 但只会返回最后一条的插入信息
result = await conn.execute(insert_sql)
# 返回最后一条记录的自增 id
print(result.lastrowid)
# 影响的行数
print(result.rowcount)
# 重新查询, 看看记录是否进入到数据库中
async with engine.acquire() as conn:
data = await (await conn.execute("select * from girl")).fetchall()
data = list(map(dict, data))
pprint(data)
"""
[{'age': 16, 'id': 1, 'name': '古明地觉', 'place': '地灵殿'},
 {'age': 16, 'id': 2, 'name': '雾雨魔理沙', 'place': '魔法森林'},
 {'age': 400, 'id': 3, 'name': '芙兰朵露', 'place': '红魔馆'},
 {'age': 17, 'id': 16, 'name': '十六夜咲夜', 'place': '红魔馆'},
 {'age': 60, 'id': 17, 'name': '琪露诺', 'place': '雾之湖'}]
"""
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

还是很方便的,但是插入多条记录的话只会返回插入的最后一条记录的信息,所以如果你希望获取每一条的信息,那么就一条一条插入。

修改记录

修改记录和添加记录是类似的,我们来看一下。

import asyncio
from pprint import pprint
import aiomysql.sa as aio_sa
from sqlalchemy import Table, MetaData, create_engine, text
async def main():
async with aio_sa.create_engine(host="xx.xx.xx.xxx",
port=3306,
user="root",
password="root",
db="_hanser",
connect_timeout=10) as engine:
async with engine.acquire() as conn:
s_engine = create_engine("mysql+pymysql://root:root@xx.xx.xx.xxx:3306/_hanser")
tbl = Table("girl", MetaData(bind=s_engine), autoload=True)
update_sql = tbl.update().where(text("name = '古明地觉'")).values({"place": "东方地灵殿"})
# 同样需要开启一个事务
async with conn.begin():
result = await conn.execute(update_sql)
print(result.lastrowid)# 0
print(result.rowcount) # 1
# 查询结果
async with engine.acquire() as conn:
data = await (await conn.execute("select * from girl where name = '古明地觉'")).fetchall()
data = list(map(dict, data))
pprint(data)
"""
[{'age': 16, 'id': 1, 'name': '古明地觉', 'place': '东方地灵殿'}]
"""
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

可以看到,记录被成功的修改了。

删除记录

删除记录就更简单了,直接看代码。

import asyncio
import aiomysql.sa as aio_sa
from sqlalchemy import Table, MetaData, create_engine, text
async def main():
async with aio_sa.create_engine(host="xx.xx.xx.xxx",
port=3306,
user="root",
password="root",
db="_hanser",
connect_timeout=10) as engine:
async with engine.acquire() as conn:
s_engine = create_engine("mysql+pymysql://root:root@xx.xx.xx.xxx:3306/_hanser")
tbl = Table("girl", MetaData(bind=s_engine), autoload=True)
update_sql = tbl.delete()# 全部删除
# 同样需要开启一个事务
async with conn.begin():
result = await conn.execute(update_sql)
# 返回最后一条记录的自增 id, 我们之前修改了 id = 0 记录, 所以它跑到最后了
print(result.lastrowid)# 0
# 受影响的行数
print(result.rowcount) # 6
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

此时数据库中的记录已经全部被删除了。

Nanonets
Nanonets

基于AI的自学习OCR文档处理,自动捕获文档数据

下载

整体来看还是比较简单的,并且支持的功能也比较全面。

异步操作 PostgreSQL

异步操作 PostgreSQL 的话,我们有两个选择,一个是 asyncpg 库,另一个是 aiopg 库。

asyncpg 是自己实现了一套连接驱动,而 aiopg 则是对 psycopg2 进行了封装,个人更推荐 asyncpg,性能和活跃度都比 aiopg 要好。

下面来看看如何使用 asyncpg,首先是安装,直接 pip install asyncpg 即可。

查询记录

首先是查询记录。

import asyncio
from pprint import pprint
import asyncpg
async def main():
# 创建连接数据库的驱动
conn = await asyncpg.connect(host="localhost",
 port=5432,
 user="postgres",
 password="zgghyys123",
 database="postgres",
 timeout=10)
# 除了上面的方式,还可以使用类似于 SQLAlchemy 的方式创建
# await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")
# 调用 await conn.fetchrow 执行 select 语句,获取满足条件的单条记录
# 调用 await conn.fetch 执行 select 语句,获取满足条件的全部记录
row1 = await conn.fetchrow("select * from girl")
row2 = await conn.fetch("select * from girl")
# 返回的是一个 Record 对象,这个 Record 对象等于将返回的记录进行了一个封装
# 至于怎么用后面会说
print(row1)# <Record id=1 name='古明地觉' age=16 place='地灵殿'>
pprint(row2)
"""
[<Record id=1 name='古明地觉' age=16 place='地灵殿'>,
 <Record id=2 name='椎名真白' age=16 place='樱花庄'>,
 <Record id=3 name='古明地恋' age=15 place='地灵殿'>]
"""
# 关闭连接
await conn.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

以上我们演示了如何使用 asyncpg 来获取数据库中的记录,我们看到执行 select 语句的话,我们可以使用 conn.fetchrow(query) 来获取满足条件的单条记录,conn.fetch(query) 来获取满足条件的所有记录。

Record 对象

我们说使用 conn.fetchone 查询得到的是一个 Record 对象,使用 conn.fetch 查询得到的是多个 Record 对象组成的列表,那么这个 Rcord 对象怎么用呢?

import asyncio
import asyncpg
async def main():
conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")
row = await conn.fetchrow("select * from girl")
print(type(row))# <class 'asyncpg.Record'>
print(row)# <Record id=1 name='古明地觉' age=16 place='地灵殿'>
# 这个 Record 对象可以想象成一个字典
# 我们可以将返回的字段名作为 key, 通过字典的方式进行获取
print(row["id"], row["name"])# 1 古明地觉
# 除此之外,还可以通过 get 获取,获取不到的时候会返回默认值
print(row.get("id"), row.get("name"))# 1 古明地觉
print(row.get("xxx"), row.get("xxx", "不存在的字段"))# None 不存在的字段
# 除此之外还可以调用 keys、values、items,这个不用我说,都应该知道意味着什么
# 只不过返回的是一个迭代器
print(row.keys())# <tuple_iterator object at 0x000001D6FFDAE610>
print(row.values())# <tuple_iterator object at 0x000001D6FFDAE610>
print(row.items())# <RecordItemsIterator object at 0x000001D6FFDF20C0>
# 我们需要转成列表或者元组
print(list(row.keys()))# ['id', 'name', 'age', 'place']
print(list(row.values()))# [1, '古明地觉', 16, '地灵殿']
print(dict(row.items()))# {'id': 1, 'name': '古明地觉', 'age': 16, 'place': '地灵殿'}
print(dict(row))# {'id': 1, 'name': '古明地觉', 'age': 16, 'place': '地灵殿'}
# 关闭连接
await conn.close()
if __name__ == '__main__':
asyncio.run(main())

当然我们也可以借助 SQLAlchemy 帮我们拼接 SQL 语句。

import asyncio
from pprint import pprint
import asyncpg
from sqlalchemy.sql.selectable import Select
from sqlalchemy import text
async def main():
conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")
sql = Select([text("id, name, place")], whereclause=text("id != 1"), from_obj=text("girl"))
# 我们不能直接传递一个 Select 对象, 而是需要将其转成原生的字符串才可以
rows = await conn.fetch(str(sql))
pprint(list(map(dict, rows)))
"""
[{'id': 2, 'name': '椎名真白', 'place': '樱花庄'},
 {'id': 3, 'name': '古明地恋', 'place': '地灵殿'}]
"""
# 关闭连接
await conn.close()
if __name__ == '__main__':
asyncio.run(main())

此外,conn.fetch 里面还支持占位符,使用百分号加数字的方式,举个例子:

import asyncio
from pprint import pprint
import asyncpg
async def main():
conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")
rows = await conn.fetch("select * from girl where id != $1", 1)
pprint(list(map(dict, rows)))
"""
[{'age': 16, 'id': 2, 'name': '椎名真白', 'place': '樱花庄'},
 {'age': 15, 'id': 3, 'name': '古明地恋', 'place': '地灵殿'}]
"""
# 关闭连接
await conn.close()
if __name__ == '__main__':
asyncio.run(main())
  • 还是推荐使用 SQLAlchemy 的方式,这样更加方便一些,就像 aiomysql 一样。但是对于 asyncpg 而言,实际上接收的是一个原生的 SQL 语句,是一个字符串,因此它不能像 aiomysql 一样自动识别 Select 对象,我们还需要手动将其转成字符串。而且这样还存在一个问题,至于是什么我们下面介绍添加记录的时候说。

添加记录

然后是添加记录,我们看看如何往库里面添加数据。


import asyncio
from pprint import pprint
import asyncpg
from sqlalchemy.sql.selectable import Select
from sqlalchemy import text
async def main():
conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")
# 执行 insert 语句我们可以使用 execute
row = await conn.execute("insert into girl(name, age, place) values ($1, $2, $3)",
 '十六夜咲夜', 17, '红魔馆')
pprint(row)# INSERT 0 1
pprint(type(row))# <class 'str'>
await conn.close()
if __name__ == '__main__':
asyncio.run(main())

通过 execute 可以插入单条记录,同时返回相关信息,但是说实话这个信息没什么太大用。除了 execute 之外,还有 executemany,用来执行多条插入语句。

import asyncio
import asyncpg
async def main():
conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")
# executemany:第一条参数是一个模板,第二条命令是包含多个元组的列表
# 执行多条记录的话,返回的结果为 None
rows = await conn.executemany("insert into girl(name, age, place) values ($1, $2, $3)",
[('十六夜咲夜', 17, '红魔馆'), ('琪露诺', 60, '雾之湖')])
print(rows)# None
# 关闭连接
await conn.close()
if __name__ == '__main__':
asyncio.run(main())

注意:如果是执行大量 insert 语句的话,那么 executemany 要比 execute 快很多,但是 executemany 不具备事务功。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

WorkBuddy
WorkBuddy

腾讯云推出的AI原生桌面智能体工作台

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
数据分析工具有哪些
数据分析工具有哪些

数据分析工具有Excel、SQL、Python、R、Tableau、Power BI、SAS、SPSS和MATLAB等。详细介绍:1、Excel,具有强大的计算和数据处理功能;2、SQL,可以进行数据查询、过滤、排序、聚合等操作;3、Python,拥有丰富的数据分析库;4、R,拥有丰富的统计分析库和图形库;5、Tableau,提供了直观易用的用户界面等等。

1134

2023.10.12

SQL中distinct的用法
SQL中distinct的用法

SQL中distinct的语法是“SELECT DISTINCT column1, column2,...,FROM table_name;”。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

340

2023.10.27

SQL中months_between使用方法
SQL中months_between使用方法

在SQL中,MONTHS_BETWEEN 是一个常见的函数,用于计算两个日期之间的月份差。想了解更多SQL的相关内容,可以阅读本专题下面的文章。

381

2024.02.23

SQL出现5120错误解决方法
SQL出现5120错误解决方法

SQL Server错误5120是由于没有足够的权限来访问或操作指定的数据库或文件引起的。想了解更多sql错误的相关内容,可以阅读本专题下面的文章。

2174

2024.03.06

sql procedure语法错误解决方法
sql procedure语法错误解决方法

sql procedure语法错误解决办法:1、仔细检查错误消息;2、检查语法规则;3、检查括号和引号;4、检查变量和参数;5、检查关键字和函数;6、逐步调试;7、参考文档和示例。想了解更多语法错误的相关内容,可以阅读本专题下面的文章。

380

2024.03.06

oracle数据库运行sql方法
oracle数据库运行sql方法

运行sql步骤包括:打开sql plus工具并连接到数据库。在提示符下输入sql语句。按enter键运行该语句。查看结果,错误消息或退出sql plus。想了解更多oracle数据库的相关内容,可以阅读本专题下面的文章。

1703

2024.04.07

sql中where的含义
sql中where的含义

sql中where子句用于从表中过滤数据,它基于指定条件选择特定的行。想了解更多where的相关内容,可以阅读本专题下面的文章。

586

2024.04.29

sql中删除表的语句是什么
sql中删除表的语句是什么

sql中用于删除表的语句是drop table。语法为drop table table_name;该语句将永久删除指定表的表和数据。想了解更多sql的相关内容,可以阅读本专题下面的文章。

440

2024.04.29

Python异步编程与Asyncio高并发应用实践
Python异步编程与Asyncio高并发应用实践

本专题围绕 Python 异步编程模型展开,深入讲解 Asyncio 框架的核心原理与应用实践。内容包括事件循环机制、协程任务调度、异步 IO 处理以及并发任务管理策略。通过构建高并发网络请求与异步数据处理案例,帮助开发者掌握 Python 在高并发场景中的高效开发方法,并提升系统资源利用率与整体运行性能。

37

2026.03.12

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
最新Python教程 从入门到精通
最新Python教程 从入门到精通

共4课时 | 22.5万人学习

Django 教程
Django 教程

共28课时 | 5万人学习

SciPy 教程
SciPy 教程

共10课时 | 1.9万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号