0

0

PySpark DataFrame多列多函数聚合与行式结果呈现

霞舞

霞舞

发布时间:2025-10-22 11:29:00

|

499人浏览过

|

来源于php中文网

原创

PySpark DataFrame多列多函数聚合与行式结果呈现

本教程详细介绍了如何在pyspark dataframe中对多个列应用多个聚合函数(如`min`和`max`),并将结果以行式结构呈现。通过分步演示,我们展示了如何利用`select`进行初步聚合,并结合`unionbyname`技巧将聚合结果重塑为易于分析的行式格式,适用于需要定制化聚合报告的场景。

在PySpark数据处理中,我们经常需要对DataFrame的多个列执行聚合操作,例如计算每个列的最小值和最大值。虽然PySpark的agg函数能够方便地进行多列多函数聚合,但其默认输出是将所有聚合结果并列在一行中。然而,在某些分析场景下,我们可能需要将不同聚合函数的结果以行(row-wise)的形式展示,即每一行代表一个聚合函数(如最小值、最大值),而列则对应原始DataFrame的列。本教程将详细介绍如何实现这种定制化的行式聚合输出。

1. 问题背景与常见误区

假设我们有一个PySpark DataFrame,并希望计算其中所有数值列的最小值和最大值。一个常见的初步尝试可能是使用列表推导式结合agg函数:

from pyspark.sql import functions as F

# 假设 df 是一个 PySpark DataFrame
# exprs = [F.min(c).alias(c), F.max(c).alias(c) for c in df.columns]
# df2 = df.agg(*exprs)

这种方法虽然可以计算出所有列的最小值和最大值,但其结果会是一个单行DataFrame,其中包含类似 min_col1, max_col1, min_col2, max_col2 等列。这与我们期望的“第一行是所有列的最小值,第二行是所有列的最大值”的行式输出格式不符。

2. 实现行式聚合输出的策略

为了实现行式聚合输出,我们需要采取一种分两步走的策略:

  1. 初步聚合所有函数的结果到单行DataFrame: 首先,我们将所有需要的聚合函数(例如,每个列的min和max)应用到DataFrame,生成一个包含所有聚合结果的单行DataFrame。
  2. 重塑DataFrame为行式输出: 接着,我们将这个单行DataFrame拆分成多个逻辑行,每行代表一个聚合函数的结果,并通过unionByName将它们合并起来。

3. 详细实现步骤

让我们通过一个具体的例子来演示这个过程。

3.1 准备示例数据

首先,创建一个示例PySpark DataFrame:

import operator
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# 初始化 SparkSession
spark = SparkSession.builder.appName("PySparkMultiAggTutorial").getOrCreate()

_data = [
    (4, 123, 18, 29),
    (8, 5, 26, 187),
    (2, 97, 18, 29),
]
_schema = ['col_1', 'col2', 'col3', 'col_4']
df = spark.createDataFrame(_data, _schema)

print("原始DataFrame:")
df.show()
# +-----+----+----+-----+
# |col_1|col2|col3|col_4|
# +-----+----+----+-----+
# |    4| 123|  18|   29|
# |    8|   5|  26|  187|
# |    2|  97|  18|   29|
# +-----+----+----+-----+

3.2 第一步:初步聚合所有函数的结果

我们首先为每个列生成min和max的聚合表达式,并使用df.select()来执行这些聚合。这里使用select而不是agg是因为select可以接受多个表达式作为参数,并直接创建新的列。

文心快码
文心快码

文心快码(Comate)是百度推出的一款AI辅助编程工具

下载
# 为每个列生成 min 和 max 聚合表达式
min_vals = [F.min(c).alias(f'min_{c}') for c in df.columns]
max_vals = [F.max(c).alias(f'max_{c}') for c in df.columns]

# 将所有聚合表达式合并,并使用 select 得到一个单行 DataFrame
# 注意:这里也可以使用 df.agg(*min_vals, *max_vals),效果类似
df_aggregated_single_row = df.select(min_vals + max_vals)

print("初步聚合后的单行DataFrame:")
df_aggregated_single_row.show()
# +-------+------+-------+--------+-------+-------+-------+--------+
# |min_col_1|min_col2|min_col3|min_col_4|max_col_1|max_col2|max_col3|max_col_4|
# +-------+------+-------+--------+-------+-------+-------+--------+
# |      2|     5|     18|      29|      8|    123|     26|     187|
# +-------+------+-------+--------+-------+-------+-------+--------+

注意事项: 如果df_aggregated_single_row后续会被多次使用,为了优化性能,建议对其进行cache()操作:df_aggregated_single_row.cache()。

3.3 第二步:重塑DataFrame为行式输出

现在我们有了包含所有聚合结果的单行DataFrame (df_aggregated_single_row)。接下来,我们需要将其重塑为期望的行式输出。这涉及到为每种聚合类型(如min和max)创建单独的DataFrame,并添加一个标识聚合类型的列,然后通过unionByName合并它们。

# 1. 创建 min 结果的 DataFrame
#    - 添加 'agg_type' 列标识为 'min'
#    - 重命名聚合列回原始列名
min_cols = operator.add(
    [F.lit('min').alias('agg_type')],  # 添加聚合类型标识列
    [F.col(f'min_{c}').alias(c) for c in df.columns] # 选择并重命名 min_xxx 列
)
min_df = df_aggregated_single_row.select(min_cols)

# 2. 创建 max 结果的 DataFrame
#    - 添加 'agg_type' 列标识为 'max'
#    - 重命名聚合列回原始列名
max_cols = operator.add(
    [F.lit('max').alias('agg_type')],  # 添加聚合类型标识列
    [F.col(f'max_{c}').alias(c) for c in df.columns] # 选择并重命名 max_xxx 列
)
max_df = df_aggregated_single_row.select(max_cols)

# 3. 使用 unionByName 合并 min_df 和 max_df
#    unionByName 要求两个 DataFrame 具有相同的列名和类型
result_df = min_df.unionByName(max_df)

print("\n最终行式聚合结果:")
result_df.show()
# +--------+-----+----+----+-----+
# |agg_type|col_1|col2|col3|col_4|
# +--------+-----+----+----+-----+
# |     min|    2|   5|  18|   29|
# |     max|    8| 123|  26|  187|
# +--------+-----+----+----+-----+

这里的operator.add用于连接两个列表,它与直接使用+的效果相同,例如 [F.lit('min').alias('agg_type')] + [F.col(f'min_{c}').alias(c) for c in df.columns]。

4. 总结与扩展

这种方法提供了一个灵活且强大的模式,用于在PySpark中实现复杂的行式聚合输出。

  • 核心思想: 将多函数聚合分解为两个阶段:首先进行所有聚合生成单行结果,然后通过选择、重命名和unionByName操作将单行结果重塑为多行。
  • 可扩展性: 这种模式可以轻松扩展到更多的聚合函数,例如平均值 (F.avg)、标准差 (F.stddev)、计数 (F.count) 等。只需为每个新的聚合函数重复“生成聚合表达式 -> 创建新的 DataFrame -> 与现有结果 unionByName”的步骤即可。
  • 性能考量: 对于大型DataFrame,df_aggregated_single_row.cache() 是一个重要的优化点,可以避免重复计算。
  • 通用性: 这种方法不仅限于min和max,任何可以表示为PySpark SQL函数的聚合都可以通过类似的方式处理。

通过掌握这种技巧,开发者可以更灵活地控制PySpark聚合结果的呈现方式,以满足各种数据分析和报告的需求。

相关专题

更多
数据分析工具有哪些
数据分析工具有哪些

数据分析工具有Excel、SQL、Python、R、Tableau、Power BI、SAS、SPSS和MATLAB等。详细介绍:1、Excel,具有强大的计算和数据处理功能;2、SQL,可以进行数据查询、过滤、排序、聚合等操作;3、Python,拥有丰富的数据分析库;4、R,拥有丰富的统计分析库和图形库;5、Tableau,提供了直观易用的用户界面等等。

683

2023.10.12

SQL中distinct的用法
SQL中distinct的用法

SQL中distinct的语法是“SELECT DISTINCT column1, column2,...,FROM table_name;”。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

320

2023.10.27

SQL中months_between使用方法
SQL中months_between使用方法

在SQL中,MONTHS_BETWEEN 是一个常见的函数,用于计算两个日期之间的月份差。想了解更多SQL的相关内容,可以阅读本专题下面的文章。

347

2024.02.23

SQL出现5120错误解决方法
SQL出现5120错误解决方法

SQL Server错误5120是由于没有足够的权限来访问或操作指定的数据库或文件引起的。想了解更多sql错误的相关内容,可以阅读本专题下面的文章。

1095

2024.03.06

sql procedure语法错误解决方法
sql procedure语法错误解决方法

sql procedure语法错误解决办法:1、仔细检查错误消息;2、检查语法规则;3、检查括号和引号;4、检查变量和参数;5、检查关键字和函数;6、逐步调试;7、参考文档和示例。想了解更多语法错误的相关内容,可以阅读本专题下面的文章。

357

2024.03.06

oracle数据库运行sql方法
oracle数据库运行sql方法

运行sql步骤包括:打开sql plus工具并连接到数据库。在提示符下输入sql语句。按enter键运行该语句。查看结果,错误消息或退出sql plus。想了解更多oracle数据库的相关内容,可以阅读本专题下面的文章。

676

2024.04.07

sql中where的含义
sql中where的含义

sql中where子句用于从表中过滤数据,它基于指定条件选择特定的行。想了解更多where的相关内容,可以阅读本专题下面的文章。

575

2024.04.29

sql中删除表的语句是什么
sql中删除表的语句是什么

sql中用于删除表的语句是drop table。语法为drop table table_name;该语句将永久删除指定表的表和数据。想了解更多sql的相关内容,可以阅读本专题下面的文章。

417

2024.04.29

xml格式相关教程
xml格式相关教程

本专题整合了xml格式相关教程汇总,阅读专题下面的文章了解更多详细内容。

0

2026.01.19

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Java 教程
Java 教程

共578课时 | 47.9万人学习

国外Web开发全栈课程全集
国外Web开发全栈课程全集

共12课时 | 1.0万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号