0

0

PySpark DataFrame多列聚合与结果行式展示教程

碧海醫心

碧海醫心

发布时间:2025-10-22 12:08:26

|

582人浏览过

|

来源于php中文网

原创

PySpark DataFrame多列聚合与结果行式展示教程

本教程详细讲解如何在pyspark dataframe中对多个列应用多个聚合函数(如min和max),并将聚合结果以行式(而非默认的列式)结构进行展示。我们将通过分步操作,利用select、alias、f.lit和unionbyname等函数,将每个列的最小值和最大值分别作为独立行呈现,从而满足特定的数据分析和报告需求。

在PySpark中,对DataFrame的多个列执行聚合操作是常见的需求。通常,我们可以使用df.agg()配合F.min()、F.max()等函数来实现。然而,当期望的输出格式是将不同聚合函数的结果以行而非列的形式展示时,标准的df.agg()方法会生成一个单行多列的DataFrame,这与将“所有列的最小值”作为一行,“所有列的最大值”作为另一行的需求不符。本教程将介绍一种实现这种特定行式聚合结果的方法。

1. 准备示例数据

首先,我们创建一个示例PySpark DataFrame,以便演示后续的操作。

import operator
from pyspark.sql import functions as F
from pyspark.sql import SparkSession

# 初始化SparkSession
spark = SparkSession.builder.appName("PySparkMultiAggRowWise").getOrCreate()

_data = [
    (4, 123, 18, 29),
    (8, 5, 26, 187),
    (2, 97, 18, 29),
]
_schema = ['col_1', 'col2', 'col3', 'col_4']
df = spark.createDataFrame(_data, _schema)
df.show()

输出的DataFrame df 如下:

+-----+----+----+-----+
|col_1|col2|col3|col_4|
+-----+----+----+-----+
|    4| 123|  18|   29|
|    8|   5|  26|  187|
|    2|  97|  18|   29|
+-----+----+----+-----+

2. 执行列式聚合并合并结果

为了得到行式的聚合结果,我们首先分别计算每个列的最小值和最大值,并将它们收集到一个新的DataFrame中。

# 生成所有列的最小值表达式
min_vals = [F.min(c).alias(f'min_{c}') for c in df.columns]
# 生成所有列的最大值表达式
max_vals = [F.max(c).alias(f'max_{c}') for c in df.columns]

# 使用select执行聚合,结果将是一个单行DataFrame,包含所有min_和max_列
df_aggregated = df.select(min_vals + max_vals)
df_aggregated.cache() # 缓存结果,因为后续会多次使用
df_aggregated.show()

df_aggregated 的输出如下:

+-------+------+-------+-------+-------+------+-------+-------+
|min_col_1|min_col2|min_col3|min_col_4|max_col_1|max_col2|max_col3|max_col_4|
+-------+------+-------+-------+-------+------+-------+-------+
|      2|       5|      18|       29|        8|     123|     26|      187|
+-------+------+-------+-------+-------+------+-------+-------+

此时,我们得到了一个包含所有聚合结果的单行DataFrame,但其结构仍是列式的。

3. 重构为行式输出

为了将上述列式结果转换为行式,我们需要创建两个独立的DataFrame:一个用于最小值,一个用于最大值,然后将它们通过unionByName合并。

3.1 构造最小值DataFrame

我们从 df_aggregated 中选择所有 min_ 开头的列,并将它们重命名回原始列名。同时,添加一个名为 agg_type 的字面量列来标识这些行代表的是最小值。

Napkin AI
Napkin AI

Napkin AI 可以将您的文本转换为图表、流程图、信息图、思维导图视觉效果,以便快速有效地分享您的想法。

下载
min_cols = operator.add(
    [F.lit('min').alias('agg_type')], # 添加聚合类型标识列
    [F.col(f'min_{c}').alias(c) for c in df.columns] # 选择并重命名最小值列
)
min_df = df_aggregated.select(min_cols)
min_df.show()

min_df 的输出如下:

+--------+-----+----+----+-----+
|agg_type|col_1|col2|col3|col_4|
+--------+-----+----+----+-----+
|     min|    2|   5|  18|   29|
+--------+-----+----+----+-----+

3.2 构造最大值DataFrame

类似地,我们为最大值创建另一个DataFrame。

max_cols = operator.add(
    [F.lit('max').alias('agg_type')], # 添加聚合类型标识列
    [F.col(f'max_{c}').alias(c) for c in df.columns] # 选择并重命名最大值列
)
max_df = df_aggregated.select(max_cols)
max_df.show()

max_df 的输出如下:

+--------+-----+----+----+-----+
|agg_type|col_1|col2|col3|col_4|
+--------+-----+----+----+-----+
|     max|    8| 123|  26|  187|
+--------+-----+----+----+-----+

4. 合并最终结果

最后,使用 unionByName 将 min_df 和 max_df 合并。unionByName 会根据列名匹配来合并DataFrame,这确保了即使列顺序不同也能正确合并。

result = min_df.unionByName(max_df)
result.show()

最终 result DataFrame的输出如下,它以行式展示了每个列的最小值和最大值:

+--------+-----+----+----+-----+
|agg_type|col_1|col2|col3|col_4|
+--------+-----+----+----+-----+
|     min|    2|   5|  18|   29|
|     max|    8| 123|  26|  187|
+--------+-----+----+----+-----+

完整代码示例

import operator
from pyspark.sql import functions as F
from pyspark.sql import SparkSession

# 初始化SparkSession
spark = SparkSession.builder.appName("PySparkMultiAggRowWise").getOrCreate()

# 示例数据
_data = [
    (4, 123, 18, 29),
    (8, 5, 26, 187),
    (2, 97, 18, 29),
]
_schema = ['col_1', 'col2', 'col3', 'col_4']
df = spark.createDataFrame(_data, _schema)
print("原始DataFrame:")
df.show()

# 1. 生成所有列的最小值和最大值表达式
min_vals = [F.min(c).alias(f'min_{c}') for c in df.columns]
max_vals = [F.max(c).alias(f'max_{c}') for c in df.columns]

# 2. 执行列式聚合并缓存结果
df_aggregated = df.select(min_vals + max_vals)
df_aggregated.cache()
print("聚合后的单行DataFrame:")
df_aggregated.show()

# 3. 构造最小值DataFrame
min_cols = operator.add(
    [F.lit('min').alias('agg_type')],
    [F.col(f'min_{c}').alias(c) for c in df.columns]
)
min_df = df_aggregated.select(min_cols)
print("最小值DataFrame:")
min_df.show()

# 4. 构造最大值DataFrame
max_cols = operator.add(
    [F.lit('max').alias('agg_type')],
    [F.col(f'max_{c}').alias(c) for c in df.columns]
)
max_df = df_aggregated.select(max_cols)
print("最大值DataFrame:")
max_df.show()

# 5. 合并最终结果
result = min_df.unionByName(max_df)
print("最终行式聚合结果:")
result.show()

# 停止SparkSession
spark.stop()

注意事项与总结

  • df.agg() 与 df.select() 的选择: 如果你只需要一个包含所有聚合结果的单行DataFrame(例如,col1_min, col1_max, col2_min, col2_max...),那么直接使用df.agg()会更简洁。本教程的方法是针对需要将不同聚合类型作为独立行展示的特定场景。
  • cache() 的使用: 在 df_aggregated 上使用 cache() 是一个性能优化措施。由于 df_aggregated 会被 min_df 和 max_df 两次引用,缓存可以避免重复计算,提高效率。
  • 列重命名: 在构建 min_df 和 max_df 时,将 min_col_name 和 max_col_name 重命名回 col_name 是为了保持最终输出的列名一致性,方便后续处理。
  • F.lit() 的作用: F.lit() 函数用于创建一个字面量列,这对于添加如 agg_type 这样的标识符非常有用。
  • operator.add 的替代方案: 在生成 min_cols 和 max_cols 列表时,使用 operator.add 是为了将字面量列的表达式与聚合列的表达式列表连接起来。你也可以直接使用 [F.lit('min').alias('agg_type')] + [F.col(f'min_{c}').alias(c) for c in df.columns] 这样的列表拼接方式。

通过上述步骤,我们成功地将PySpark DataFrame的多个列聚合结果以所需的行式结构呈现,这对于需要按聚合类型进行行级别分析或报告的场景非常实用。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

WorkBuddy
WorkBuddy

腾讯云推出的AI原生桌面智能体工作台

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
mysql标识符无效错误怎么解决
mysql标识符无效错误怎么解决

mysql标识符无效错误的解决办法:1、检查标识符是否被其他表或数据库使用;2、检查标识符是否包含特殊字符;3、使用引号包裹标识符;4、使用反引号包裹标识符;5、检查MySQL的配置文件等等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

211

2023.12.04

Python标识符有哪些
Python标识符有哪些

Python标识符有变量标识符、函数标识符、类标识符、模块标识符、下划线开头的标识符、双下划线开头、双下划线结尾的标识符、整型标识符、浮点型标识符等等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

325

2024.02.23

java标识符合集
java标识符合集

本专题整合了java标识符相关内容,想了解更多详细内容,请阅读下面的文章。

293

2025.06.11

c++标识符介绍
c++标识符介绍

本专题整合了c++标识符相关内容,阅读专题下面的文章了解更多详细内容。

179

2025.08.07

PHP 高并发与性能优化
PHP 高并发与性能优化

本专题聚焦 PHP 在高并发场景下的性能优化与系统调优,内容涵盖 Nginx 与 PHP-FPM 优化、Opcode 缓存、Redis/Memcached 应用、异步任务队列、数据库优化、代码性能分析与瓶颈排查。通过实战案例(如高并发接口优化、缓存系统设计、秒杀活动实现),帮助学习者掌握 构建高性能PHP后端系统的核心能力。

114

2025.10.16

PHP 数据库操作与性能优化
PHP 数据库操作与性能优化

本专题聚焦于PHP在数据库开发中的核心应用,详细讲解PDO与MySQLi的使用方法、预处理语句、事务控制与安全防注入策略。同时深入分析SQL查询优化、索引设计、慢查询排查等性能提升手段。通过实战案例帮助开发者构建高效、安全、可扩展的PHP数据库应用系统。

99

2025.11.13

JavaScript 性能优化与前端调优
JavaScript 性能优化与前端调优

本专题系统讲解 JavaScript 性能优化的核心技术,涵盖页面加载优化、异步编程、内存管理、事件代理、代码分割、懒加载、浏览器缓存机制等。通过多个实际项目示例,帮助开发者掌握 如何通过前端调优提升网站性能,减少加载时间,提高用户体验与页面响应速度。

36

2025.12.30

JavaScript浏览器渲染机制与前端性能优化实践
JavaScript浏览器渲染机制与前端性能优化实践

本专题围绕 JavaScript 在浏览器中的执行与渲染机制展开,系统讲解 DOM 构建、CSSOM 解析、重排与重绘原理,以及关键渲染路径优化方法。内容涵盖事件循环机制、异步任务调度、资源加载优化、代码拆分与懒加载等性能优化策略。通过真实前端项目案例,帮助开发者理解浏览器底层工作原理,并掌握提升网页加载速度与交互体验的实用技巧。

105

2026.03.06

TypeScript类型系统进阶与大型前端项目实践
TypeScript类型系统进阶与大型前端项目实践

本专题围绕 TypeScript 在大型前端项目中的应用展开,深入讲解类型系统设计与工程化开发方法。内容包括泛型与高级类型、类型推断机制、声明文件编写、模块化结构设计以及代码规范管理。通过真实项目案例分析,帮助开发者构建类型安全、结构清晰、易维护的前端工程体系,提高团队协作效率与代码质量。

26

2026.03.13

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Java 教程
Java 教程

共578课时 | 82.1万人学习

国外Web开发全栈课程全集
国外Web开发全栈课程全集

共12课时 | 1万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号