
本文详解使用 `aiofiles` 替代 `aiofile` 并配合细粒度 `asyncio.lock` 实现线程安全、顺序一致的异步批量文件写入,彻底解决因竞态导致的文件内容错乱、行首截断与覆盖问题。
在异步 I/O 场景中,并发写入多个文件时若缺乏恰当的同步机制,极易引发数据损坏——典型表现为:输出文件中单行文本被随机截断、多线程写入内容相互覆盖、换行符错位,甚至部分写入丢失。您遇到的问题(AIOFile 下 question_output.txt 与 answer_output.txt 数据混乱)根本原因在于:aiofile 的底层实现不保证跨协程的写入原子性,且其 write() 操作并非真正“线程/协程安全”;即使为每个文件单独加锁,也无法阻止两个独立 writer 对同一文件系统位置的无序偏移写入(尤其当未显式控制文件指针或缓冲策略时)。
✅ 正确解法是转向更成熟、社区验证充分的 aiofiles 库,并重构锁策略:
统一使用单把锁保护所有写入操作
原代码中为 q_lock 和 a_lock 分别加锁,看似隔离,实则埋下隐患:question_writer.write() 与 answer_writer.write() 可能并发执行,而两个 AIOFile 实例共享底层 OS 文件描述符状态(如当前写入偏移),导致写入位置冲突。改为共用一把 asyncio.Lock(),确保「向 question 文件写一行 + 向 answer 文件写一行」构成一个原子操作单元,从根本上杜绝交错。弃用 aiofile,改用 aiofiles
aiofiles 是基于标准 open() 的异步封装,兼容性高、行为可预测;它通过 loop.run_in_executor() 将阻塞 I/O 提交至线程池,天然规避了 aiofile 在某些平台(尤其是 Windows 或特定文件系统)下因底层 libuv/io_uring 调度引发的偏移错乱。同时,aiofiles.open(..., "w") 默认启用行缓冲(line-buffered),配合 await file.write(...) + await file.flush() 可保障每行写入的完整性。移除冗余 fsync(),依赖 aiofiles 的隐式刷新
原逻辑中调用 writer.fsync() 不仅非必需("w" 模式下 write() 已触发内核缓冲),还可能因频繁强制刷盘拖慢性能。aiofiles 在 close() 时自动 flush,日常写入无需手动 fsync()——除非有强持久化要求(如金融日志),此时应单独设计 flush+fsync 时机。
以下是优化后的核心写入逻辑(含关键注释):
立即学习“Python免费学习笔记(深入)”;
import asyncio
import aiofiles
import pandas as pd
async def process_data(model, factory):
df = pd.read_csv("sitemap_data_raw", header=None, names=["Record"], on_bad_lines="warn").drop_duplicates()
# ✅ 单锁统管所有写入,确保 Q/A 成对原子写入
file_lock = asyncio.Lock()
async def process_batch(rows):
tasks = [factory.build_qa_chain(model).ainvoke({"chunk": row.Record}) for row in rows]
return await asyncio.gather(*tasks)
async def write_batches(q_file, a_file, results):
for result_batch in results:
for record in result_batch:
# ? 锁定整个 Q+A 写入流程,避免交叉
async with file_lock:
await q_file.write(record["question"] + "\n")
await a_file.write(record["answer"] + "\n")
# ⚠️ 无需 await q_file.flush() — aiofiles 在 close 时自动 flush
# ✅ 使用 aiofiles.open,语义清晰且行为可靠
async with aiofiles.open("question_output.txt", "w") as q_file, \
aiofiles.open("answer_output.txt", "w") as a_file:
batch_size = 1000
for i in range(0, len(df), batch_size):
batch_rows = df.iloc[i:i+batch_size].itertuples(index=False)
batch_results = await process_batch(batch_rows)
await write_batches(q_file, a_file, batch_results)? 额外建议:
- 若需极致性能,可将多行合并为单次 write()(如 await q_file.write("\n".join(questions) + "\n")),减少系统调用次数;
- 对超大文件,考虑分块写入 + 定期 await q_file.flush() 防止内存积压;
- 生产环境务必添加异常处理(try/except 包裹 write_batches),避免锁未释放导致死锁。
综上,并发文件写入的安全基石不是“给每个文件加锁”,而是“让所有相关写入受同一把锁协调”。结合 aiofiles 的稳健实现,即可在保持异步高吞吐的同时,获得字节级精确的输出一致性。










