dask.delayed 更适合数据流水线因其构建可调度的dag,支持中间复用、条件分支与失败重算;而concurrent.futures仅适用于独立函数调用。

为什么 dask.delayed 比直接用 concurrent.futures 更适合数据流水线?
dask.delayed 不是简单地把函数扔进线程池,而是构建一个延迟执行的有向无环图(DAG),后续能做任务调度、重试、内存感知和跨节点分发。你写的是“做什么”,不是“怎么做”——这在处理多阶段 ETL 时特别关键。
- 如果你只是跑几个独立函数,
concurrent.futures.ThreadPoolExecutor更轻量、启动更快 - 但只要涉及中间结果复用(比如 A → B → C,同时 B → D)、条件分支或部分失败重算,
dask.delayed的图能力立刻显出价值 - 注意:所有被
@dask.delayed装饰的函数,返回值会自动包装成Delayed对象;直接 print 或取值会触发计算,别在定义阶段就调.compute()
@delayed
def load_csv(path):
return pd.read_csv(path)
<p>@delayed<br />
def clean(df):
return df.dropna()</p><h1>这里没计算,只建图</h1><p>cleaned = clean(load_csv("data.csv"))</p>
dask.dataframe 读 CSV 卡住或内存暴涨?检查这三件事
dask.dataframe.read_csv 默认按行数切分块(blocksize),但实际切分依赖文件是否含换行符、压缩格式、是否有 header 行——这些都会让块大小失控,导致某一块巨长、其他块为空,甚至卡死在元数据探测阶段。
- 确保文件是纯文本、LF 换行、无嵌入换行符的 CSV;如果用 Excel 导出,先用
dos2unix或 Python 清洗一遍 - 显式指定
blocksize="64MB"(别用字节硬算,用字符串如"128MB"),并配合sample=10000控制 schema 推断采样行数 - 遇到
OSError: [Errno 22] Invalid argument,大概率是 Windows 下路径含中文或 UNC 路径未转义,改用r"\server\path"或正斜杠
本地运行 dask.distributed.Client 反而比单线程慢?常见配置误用
开一个本地 Client(n_workers=4, threads_per_worker=1) 听起来合理,但默认会启用 dashboard(Web UI)、心跳检测、序列化/反序列化日志——对小数据集(
立即学习“Python免费学习笔记(深入)”;
- 小规模调试优先用
scheduler="threads"或scheduler="synchronous",完全绕过调度器 - 必须用
Client时,关掉不需要的功能:dashboard_address=None、silence_logs=logging.ERROR -
Client启动后默认连接 localhost:8786,如果端口被占,会静默 fallback 到随机端口——查client.dashboard_link才知道它到底在哪,别猜
用 dask.array 处理图像堆栈却报 Array chunk size too large
dask.array 把大数组切块(chunks)来并行,但图像数据维度固定(如 (1000, 1024, 1024)),若 chunk 设置不当,容易生成单块超 1GB 的内存块,触发 ValueError。
- 别用
chunks=-1或chunks=(1000, "auto", "auto")——"auto" 在高维下可能把第一维全塞进一块 - 图像堆栈推荐按切片维度拆:如
chunks=(1, 512, 512),确保每块最多一张图的一部分 - 用
da.from_array(arr, chunks=(1, 512, 512)).persist()替代直接计算,避免重复加载原始数据
事情说清了就结束。真正卡住的地方,往往不在代码怎么写,而在你默认相信的“自动行为”——比如 dask 怎么猜 CSV 分隔符、怎么选 chunk 大小、怎么处理缺失值传播——这些细节不盯住,图建得再漂亮也跑不起来。










