aqe 默认开启后join变慢,因小数据量或非均匀分区下,运行时统计缺失导致误判重分区与策略优化,反而增加调度开销和延迟。

PySpark 3.5+ 的 spark.sql.adaptive.enabled 默认开启后,为什么 JOIN 变慢了?
因为自适应查询执行(AQE)在小数据量或非均匀分区场景下反而引入调度开销和重分区判断延迟。它默认启用后,会自动合并小任务、动态优化 Join 策略、调整 shuffle 分区数——但这些决策依赖运行时统计,首次执行无历史信息,容易误判。
- 若你的作业多为
df1.join(df2, "id").filter(...)且df2很小(hint("broadcast") - 检查是否触发了
CoalescePartitions:用explain(mode="extended")看物理计划里有没有AdaptiveSparkPlan块;若有,再看子节点是否出现意外的Exchange - 临时关闭:设
spark.conf.set("spark.sql.adaptive.enabled", "false")对比耗时;长期建议保留,但配合spark.sql.adaptive.coalescePartitions.enabled等细粒度开关控制
PySpark 3.5+ 中 pandas_udf 被弃用,该用 scalar Pandas function 还是 vectorized UDF?
两者本质相同,都是基于 Arrow 的向量化函数,但 API 和语义有关键区别:前者是推荐路径,后者是旧名残留;真正要换的是调用方式和类型声明。
- 必须改写
@pandas_udf(returnType=StringType())→@pandas_function(returnType=StringType()),否则运行时报AttributeError: module 'pyspark.sql.functions' has no attribute 'pandas_udf' - 输入不再是单列
pd.Series,而是整个批次的pd.DataFrame(即使只有一列),需用df.iloc[:, 0]显式取列,否则易出KeyError - 性能上无差异,但新 API 强制要求显式声明
returnType,且不支持GROUPED_AGG模式——聚合场景得用groupby().applyInPandas()
PySpark 3.5+ 读 Parquet 时 mergeSchema 行为变了,字段缺失直接报错?
是的。3.5+ 默认启用 spark.sql.parquet.mergeSchema,但底层改用更严格的 schema 合并逻辑:当某文件缺失非 nullable 字段时,不再静默补 null,而是抛 org.apache.spark.sql.AnalysisException: Cannot resolve column name。
快速学习python书第二版是一本简洁清晰介绍python3的书籍,目标是新学习python 的程序员。这本更新版本囊括了所有python3版本的变化,即python从早期版本到新版本的特性变化 本书一开始用基础但是很有用的程序来传授给读者关于python的核心特性,包括语法,控制流程和数据结构。然后本书使用大型的应用程序包括代码管理,面向对象编程,web开发和转换老版本的python程序到新的版本等等。 忠实于作者的经验十足的开发者的观众,作者仔细检查普通程序特点,同时增加了更多细节关于这些python
- 常见于增量写入:上游用不同 schema 写了多个目录,比如一批含
user_id,另一批没写,3.5+ 读父目录就会失败 - 兼容做法:显式关掉合并,用
spark.read.option("mergeSchema", "false").parquet(...),再手动 union 或用schema参数指定统一 schema - 更健壮的做法是提前用
spark.read.parquet(...).schema扫描所有子目录推断一次,存为 JSON,后续读取时传入schema=StructType.fromJson(...)
spark.sql.files.maxPartitionBytes 在 3.5+ 影响比以前更大,为什么?
因为 3.5+ 把这个参数从“仅影响 text/CSV”扩展到所有文件源(包括 Parquet、ORC),且与新的 FileSourceScanExec 执行器深度耦合,直接影响 task 划分粒度和内存压力。
立即学习“Python免费学习笔记(深入)”;
- 默认值从 128MB 降到 64MB,导致小文件多的作业 task 数暴增,shuffle 压力上升;若集群 executor 内存不足,容易 OOM
- 调大前先确认:用
df.explain("formatted")查看InputPartitions数量和平均大小;若大量InputPartition小于 10MB,说明切太碎 - 安全调整范围:64MB ~ 256MB;超过 512MB 需同步调高
spark.sql.files.openCostInBytes,否则 Spark 会误判“打开文件代价高”,继续切小
最常被忽略的是:这个参数和 spark.sql.adaptive.coalescePartitions.enabled 是联动的——关了 AQE 的分区合并,又没调 maxPartitionBytes,就等于把小文件问题硬扛在 stage 里了。









