0

0

高效并行处理字符串列表并安全写入CSV:分离计算与I/O的实践指南

心靈之曲

心靈之曲

发布时间:2026-03-12 09:15:59

|

839人浏览过

|

来源于php中文网

原创

高效并行处理字符串列表并安全写入CSV:分离计算与I/O的实践指南

本文介绍如何通过分离计算密集型任务与I/O操作,安全、高效地并行处理大规模字符串列表,并将结果可靠写入CSV文件——避免多进程/线程直接共享csv.writer引发的序列化失败、竞态或死锁问题。

本文介绍如何通过分离计算密集型任务与i/o操作,安全、高效地并行处理大规模字符串列表,并将结果可靠写入csv文件——避免多进程/线程直接共享`csv.writer`引发的序列化失败、竞态或死锁问题。

在Python中对大批量数据(如数千个医学术语)进行逐项处理时,盲目套用多线程或多进程常导致意外失败:csv.writer对象不可被pickle,无法跨进程传递;多线程并发写同一文件易引发数据错乱或IO阻塞;而粗粒度的超时控制(如thread.join(timeout))又难以优雅降级——超时时程序挂起、线程无法真正终止、后续批次停滞不前。

根本解法在于职责分离(Separation of Concerns)
并行层仅负责“计算”:每个工作单元独立执行run_mappers(),输入为单个字符串和参数,纯函数式输出处理结果(如list或dict),不触碰任何文件句柄或全局状态
串行层统一“聚合与落盘”:所有并行任务完成后,主线程按序收集结果,集中调用csv.writer.writerow()——规避并发写冲突,也无需考虑对象序列化限制。

以下是符合生产级要求的完整实现:

ColorMagic
ColorMagic

AI调色板生成工具

下载
import csv
import logging
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
from typing import List, Tuple, Any, Optional

# 配置结构化日志(便于追踪失败项)
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)-8s | %(message)s",
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler("processing.log", encoding="utf-8")
    ]
)

# 示例输入(实际中可从文件/数据库加载)
term_list = [
    "Dementia", "HER2-positive Breast Cancer", "Stroke", "Hemiplegia", 
    "Type 1 Diabetes", "IBD", "Lung Cancer", "Psoriasis", "Healthy", "Asthma"
    # ... 更多条目(见原始问题)
]

def run_mappers(individual_string: str, other_args: Any) -> List[Any]:
    """
    核心处理函数:仅执行计算,返回结构化结果。
    ✅ 无副作用:不修改全局变量,不访问文件/网络/I/O设备。
    ✅ 可异常中断:失败时抛出异常,由主流程捕获并记录。
    """
    try:
        # 模拟耗时业务逻辑(如API调用、NLP解析、规则匹配等)
        time.sleep(0.1 + (hash(individual_string) % 300) / 1000)  # 随机延迟

        # 示例处理:标准化术语 + 附加元数据
        normalized = individual_string.strip().title()
        processed_result = [normalized, len(normalized), other_args, int(time.time() % 1000)]

        # 可选:模拟偶发错误(便于测试容错)
        if "Cancer" in normalized and hash(normalized) % 17 == 0:
            raise RuntimeError(f"Transient failure on {normalized}")

        return processed_result

    except Exception as e:
        logging.error(f"Failed to process '{individual_string}': {e}")
        raise  # 让executor捕获异常,而非静默吞掉

def parallel_process_and_write(
    terms: List[str],
    other_args: Any,
    output_csv: str = "results.csv",
    error_log: str = "errors.txt",
    max_workers: int = 6,
    use_processes: bool = False  # True for CPU-bound; False (default) for I/O-bound
) -> Tuple[int, int]:
    """
    主协调函数:并行计算 + 串行写入。

    Args:
        terms: 待处理字符串列表
        other_args: 透传给run_mappers的额外参数
        output_csv: 输出CSV路径
        error_log: 错误日志路径
        max_workers: 并发工作单元数
        use_processes: 是否使用ProcessPoolExecutor(适合CPU密集型)

    Returns:
        (成功写入行数, 失败条目数)
    """
    logging.info(f"Starting parallel processing of {len(terms)} terms...")
    start_time = time.time()

    # Step 1: 并行提交所有任务(不阻塞)
    executor_class = ProcessPoolExecutor if use_processes else ThreadPoolExecutor
    with executor_class(max_workers=max_workers) as executor:
        # 提交所有future(注意:submit参数是单个term,非整个列表!)
        futures = [
            executor.submit(run_mappers, term, other_args) 
            for term in terms
        ]

        # Step 2: 异步收集结果(带超时保护,防无限等待)
        results = []
        errors = []
        for future in as_completed(futures, timeout=300):  # 全局超时5分钟
            try:
                result = future.result(timeout=60)  # 单任务超时1分钟
                results.append(result)
            except Exception as exc:
                # 记录具体失败原因(包括TimeoutError、RuntimeError等)
                errors.append(str(exc))
                logging.warning(f"Task failed: {exc}")

    # Step 3: 串行写入CSV(绝对线程/进程安全)
    success_count = 0
    try:
        with open(output_csv, "w", newline="", encoding="utf-8") as f_csv, \
             open(error_log, "w", encoding="utf-8") as f_err:

            writer = csv.writer(f_csv)
            # 写入表头(按需调整)
            writer.writerow(["Term", "Length", "OtherArgs", "Timestamp"])

            for result in results:
                writer.writerow(result)
                success_count += 1

            # 记录所有错误堆栈
            for err in errors:
                f_err.write(err + "\n")

    except Exception as e:
        logging.critical(f"Fatal error during CSV write: {e}")
        raise

    elapsed = time.time() - start_time
    logging.info(
        f"Processing completed in {elapsed:.1f}s: "
        f"{success_count}/{len(terms)} succeeded, {len(errors)} failed."
    )
    return success_count, len(errors)

# 使用示例
if __name__ == "__main__":
    # 处理全量数据(支持分批调用以控内存)
    total_success, total_fail = 0, 0
    batch_size = 50

    for i in range(0, len(term_list), batch_size):
        batch = term_list[i:i + batch_size]
        logging.info(f"Processing batch [{i}:{i+len(batch)}]...")

        try:
            success, fail = parallel_process_and_write(
                terms=batch,
                other_args="metadata_v1",
                output_csv=f"batch_{i//batch_size}.csv",
                error_log=f"batch_{i//batch_size}_errors.txt",
                max_workers=4,
                use_processes=False  # 若run_mappers含大量CPU计算,设为True
            )
            total_success += success
            total_fail += fail
        except Exception as e:
            logging.error(f"Batch [{i}:{i+len(batch)}] crashed: {e}")
            # 继续下一组,不中断整体流程

    logging.info(f"Final summary: {total_success} success, {total_fail} failed.")

关键注意事项与最佳实践

  • 永远不要跨进程/线程共享csv.writer或文件对象:它们不是线程安全的,且csv.writer内部持有不可序列化的缓冲区,multiprocessing会直接报PicklingError。
  • 选择ThreadPoolExecutor还是ProcessPoolExecutor?
    • ✅ ThreadPoolExecutor:适用于I/O密集型任务(如HTTP请求、数据库查询、轻量文本处理);开销小,启动快。
    • ✅ ProcessPoolExecutor:适用于CPU密集型任务(如复杂正则、数值计算、机器学习推理);可绕过GIL,但进程创建/通信成本高,需确保other_args可被pickle。
  • 超时设计要分层:as_completed(timeout=...)控制整体等待,future.result(timeout=...)控制单任务,双重防护避免卡死。
  • 错误处理必须显式:用future.exception()或try/except捕获异常,绝不能依赖future.cancel()(它无法强制终止已运行的线程/进程,仅对未开始的任务有效)。
  • 内存友好分批处理:对超大term_list,按batch_size切片后循环调用主函数,避免一次性加载全部future到内存。
  • 日志即监控:结构化日志(含时间戳、级别、消息)是调试并行问题的唯一可靠依据,比print()强大百倍。

遵循此模式,你将获得:✅ 稳定可扩展的并行吞吐量、✅ 100%安全的CSV输出、✅ 清晰的错误溯源能力、✅ 无缝的失败降级策略——这才是生产环境应有的并行化实践。

本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
python中print函数的用法
python中print函数的用法

python中print函数的语法是“print(value1, value2, ..., sep=' ', end=' ', file=sys.stdout, flush=False)”。本专题为大家提供print相关的文章、下载、课程内容,供大家免费下载体验。

192

2023.09.27

python print用法与作用
python print用法与作用

本专题整合了python print的用法、作用、函数功能相关内容,阅读专题下面的文章了解更多详细教程。

18

2026.02.03

js 字符串转数组
js 字符串转数组

js字符串转数组的方法:1、使用“split()”方法;2、使用“Array.from()”方法;3、使用for循环遍历;4、使用“Array.split()”方法。本专题为大家提供js字符串转数组的相关的文章、下载、课程内容,供大家免费下载体验。

760

2023.08.03

js截取字符串的方法
js截取字符串的方法

js截取字符串的方法有substring()方法、substr()方法、slice()方法、split()方法和slice()方法。本专题为大家提供字符串相关的文章、下载、课程内容,供大家免费下载体验。

221

2023.09.04

java基础知识汇总
java基础知识汇总

java基础知识有Java的历史和特点、Java的开发环境、Java的基本数据类型、变量和常量、运算符和表达式、控制语句、数组和字符串等等知识点。想要知道更多关于java基础知识的朋友,请阅读本专题下面的的有关文章,欢迎大家来php中文网学习。

1566

2023.10.24

字符串介绍
字符串介绍

字符串是一种数据类型,它可以是任何文本,包括字母、数字、符号等。字符串可以由不同的字符组成,例如空格、标点符号、数字等。在编程中,字符串通常用引号括起来,如单引号、双引号或反引号。想了解更多字符串的相关内容,可以阅读本专题下面的文章。

649

2023.11.24

java读取文件转成字符串的方法
java读取文件转成字符串的方法

Java8引入了新的文件I/O API,使用java.nio.file.Files类读取文件内容更加方便。对于较旧版本的Java,可以使用java.io.FileReader和java.io.BufferedReader来读取文件。在这些方法中,你需要将文件路径替换为你的实际文件路径,并且可能需要处理可能的IOException异常。想了解更多java的相关内容,可以阅读本专题下面的文章。

1228

2024.03.22

php中定义字符串的方式
php中定义字符串的方式

php中定义字符串的方式:单引号;双引号;heredoc语法等等。想了解更多字符串的相关内容,可以阅读本专题下面的文章。

1184

2024.04.29

C# ASP.NET Core微服务架构与API网关实践
C# ASP.NET Core微服务架构与API网关实践

本专题围绕 C# 在现代后端架构中的微服务实践展开,系统讲解基于 ASP.NET Core 构建可扩展服务体系的核心方法。内容涵盖服务拆分策略、RESTful API 设计、服务间通信、API 网关统一入口管理以及服务治理机制。通过真实项目案例,帮助开发者掌握构建高可用微服务系统的关键技术,提高系统的可扩展性与维护效率。

3

2026.03.11

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号