
本文详解为何 `aiofile` 在多协程并发写入场景下易导致文件内容错乱,并通过切换至更稳定的 `aiofiles` 库、重构锁机制,彻底解决输出文件数据被随机截断或覆盖的问题。
在异步 Python 开发中,对多个文件进行高并发写入是一项常见但极易出错的操作。您遇到的问题——question_output.txt 和 answer_output.txt 中每行开头被随机截断、内容相互覆盖、甚至文件中间部分被反复重写——并非源于逻辑错误,而是由底层 I/O 库的行为差异与锁粒度不匹配共同导致的。
根本原因分析
原始代码使用 aiofile.AIOFile,该库不保证 write() 调用的原子性与位置一致性:
- AIOFile 的 write() 是非阻塞系统调用封装,其内部缓冲和文件偏移(file offset)管理在多协程竞争下不可靠;
- 即使加了 asyncio.Lock,若锁作用于 write_and_flush 函数内部,而两个 writer(q_writer 和 a_writer)各自独立维护文件指针,同一时刻多个协程仍可能并发修改同一文件的写入位置;
- 更关键的是:AIOFile 的 fsync() 仅确保内核缓冲刷盘,无法解决多写入者争抢文件偏移量导致的“写入撕裂”(write tearing)——即一个协程写到一半,另一个协程跳转到相同偏移处覆盖,造成行首缺失、换行错位等典型症状。
正确解法:aiofiles + 全局写入锁
aiofiles 基于标准 open() 封装,其 write() 行为与同步 f.write() 一致,天然遵循 POSIX 文件偏移语义,配合 asyncio.Lock 可真正实现串行化写入:
import asyncio
import aiofiles
async def write_batches(question_file, answer_file, results):
# ✅ 单一锁保护所有写入操作,确保 question_file 和 answer_file 的写入严格串行
async with file_lock: # ← 关键:锁覆盖两个文件的 write()
for my_result in results:
for my_record in my_result:
await question_file.write(my_record["question"] + "\n")
await answer_file.write(my_record["answer"] + "\n")⚠️ 注意:锁必须包裹全部相关写入语句(而非每个 write 单独加锁),否则仍存在竞态窗口。此处一个 file_lock 同时保护两个文件,是因为它们共享同一批数据生成节奏,且需保持 Q/A 行级严格对应。
进阶建议与最佳实践
- 避免 aiofile 在生产环境用于多写者场景:其设计偏向单写者高性能日志,缺乏对并发写入的完备支持;aiofiles 虽稍慢,但语义清晰、行为可预测,是通用异步 I/O 的更优选择。
- 批量写入优于逐行写入:若性能敏感,可先在内存中累积 batch_results,再一次性写入(如 await f.write("\n".join(questions) + "\n")),大幅减少系统调用次数。
- 显式关闭与异常安全:async with aiofiles.open(...) 已自动处理资源释放,无需手动 close();但若需写入后立即校验,可在 with 块内追加 await f.flush()(aiofiles 支持)。
- 测试验证技巧:在 write_batches 中加入 print(f"[WROTE] Q:{len(q_line)}, A:{len(a_line)}"),并检查输出文件行数是否恒等于输入行数——这是检测写入丢失最直接的方式。
通过本次重构,您不仅修复了数据损坏问题,更建立了一套可复用的异步文件安全写入模式:选对工具(aiofiles)、锁对范围(跨文件统一锁)、写对粒度(批量/行级按需)。这三者缺一不可。










