0

0

PySpark DataFrame多列多函数聚合与结果重塑教程

心靈之曲

心靈之曲

发布时间:2025-10-23 08:02:15

|

219人浏览过

|

来源于php中文网

原创

PySpark DataFrame多列多函数聚合与结果重塑教程

本教程详细介绍了如何在pyspark中对dataframe的所有列同时应用多个聚合函数(如`min`和`max`),并以行式结构(每行代表一个聚合结果)展示。通过结合使用`select`进行初步聚合、`cache`优化性能以及`unionbyname`进行结果重塑,实现了灵活且高效的数据分析,避免了直接`agg`函数无法满足特定输出格式的问题。

在PySpark进行数据分析时,一个常见的需求是对DataFrame中的所有或指定列应用多个聚合函数,例如同时计算每列的最小值和最大值。虽然DataFrame.agg()方法能够轻松实现多列多函数的聚合,但其默认输出是将所有聚合结果展平为单行,这往往无法满足将不同聚合类型(如最小值和最大值)作为独立行呈现的需求。

例如,直接使用df.agg(*exprs)的方式,其中exprs = [min(c).alias(c), max(c).alias(c) for c in df.columns],会生成一个包含所有列的最小值和最大值,但这些值都将并列在同一行中,而不是我们期望的“一行是所有列的最小值,另一行是所有列的最大值”的结构。

为了实现这种行式输出的聚合结果,我们需要一种更为精细的策略,结合PySpark的select、cache和unionByName等操作。

解决方案:多阶段聚合与结果重塑

以下步骤将详细演示如何通过分阶段处理来达到目标输出格式:

歌者PPT
歌者PPT

歌者PPT,AI 写 PPT 永久免费

下载
  1. 初步聚合所有最小值和最大值: 首先,对DataFrame的所有列分别计算其最小值和最大值。这些聚合结果将暂时存储在一个新的DataFrame的单行中,其中每一列对应一个聚合值(例如,min_col1, max_col1, min_col2, max_col2等)。
  2. 缓存中间结果: 为了避免重复计算,对包含所有聚合值的中间DataFrame进行缓存。这在处理大型数据集时尤为重要。
  3. 重塑结果为行式结构: 将缓存的单行聚合结果拆分为多个DataFrame,每个DataFrame代表一种聚合类型(例如,一个DataFrame只包含所有列的最小值,另一个只包含所有列的最大值)。在拆分过程中,为每个DataFrame添加一个标识列(如agg_type),并统一列名,以便后续合并。
  4. 合并结果: 使用unionByName()方法将重塑后的DataFrame合并,最终得到我们期望的行式输出。

示例代码与详细解释

让我们通过一个具体的PySpark代码示例来演示上述过程:

import operator
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# 初始化Spark会话
spark = SparkSession.builder.appName("MultiFunctionAggregate").getOrCreate()

# 示例数据
_data = [
    (4, 123, 18, 29),
    (8, 5, 26, 187),
    (2, 97, 18, 29),
]
_schema = ['col_1', 'col2', 'col3', 'col_4']
df = spark.createDataFrame(_data, _schema)

print("原始DataFrame:")
df.show()
# +-----+----+----+-----+
# |col_1|col2|col3|col_4|
# +-----+----+----+-----+
# |    4| 123|  18|   29|
# |    8|   5|  26|  187|
# |    2|  97|  18|   29|
# +-----+----+----+-----+

# 1. 初步聚合所有最小值和最大值
# 构建min聚合表达式列表,并为结果列添加'min_'前缀
min_vals = [F.min(c).alias(f'min_{c}') for c in df.columns]
# 构建max聚合表达式列表,并为结果列添加'max_'前缀
max_vals = [F.max(c).alias(f'max_{c}') for c in df.columns]

# 使用select执行所有聚合,结果是一个单行DataFrame
df_agg_raw = df.select(min_vals + max_vals)

print("初步聚合结果 (单行):")
df_agg_raw.show()
# +-------+-------+-------+--------+-------+-------+-------+--------+
# |min_col_1|min_col2|min_col3|min_col_4|max_col_1|max_col2|max_col3|max_col_4|
# +-------+-------+-------+--------+-------+-------+-------+--------+
# |      2|       5|      18|       29|      8|     123|     26|     187|
# +-------+-------+-------+--------+-------+-------+-------+--------+

# 2. 缓存中间结果
# 缓存df_agg_raw以提高后续操作的性能
df_agg_raw.cache()

# 3. 重塑结果为行式结构
# 为最小值行构建选择表达式:添加'agg_type'列,并将min_前缀的列重命名回原始列名
min_cols = operator.add(
    [F.lit('min').alias('agg_type')], # 添加一个字面量列,标识聚合类型为'min'
    [F.col(f'min_{c}').alias(c) for c in df.columns] # 选取带有'min_'前缀的列,并将其别名改回原始列名
)
# 为最大值行构建选择表达式,原理同上
max_cols = operator.add(
    [F.lit('max').alias('agg_type')], # 添加一个字面量列,标识聚合类型为'max'
    [F.col(f'max_{c}').alias(c) for c in df.columns] # 选取带有'max_'前缀的列,并将其别名改回原始列名
)

# 从缓存的df_agg_raw中选择并重命名列,创建最小值DataFrame
min_df = df_agg_raw.select(min_cols)
# 从缓存的df_agg_raw中选择并重命名列,创建最大值DataFrame
max_df = df_agg_raw.select(max_cols)

print("重塑后的最小值DataFrame:")
min_df.show()
# +--------+-----+----+----+-----+
# |agg_type|col_1|col2|col3|col_4|
# +--------+-----+----+----+-----+
# |     min|    2|   5|  18|   29|
# +--------+-----+----+----+-----+

print("重塑后的最大值DataFrame:")
max_df.show()
# +--------+-----+----+----+-----+
# |agg_type|col_1|col2|col3|col_4|
# +--------+-----+----+----+-----+
# |     max|    8| 123|  26|  187|
# +--------+-----+----+----+-----+

# 4. 合并结果
# 使用unionByName合并两个DataFrame,确保按列名匹配
result = min_df.unionByName(max_df)

print("最终结果DataFrame:")
result.show()
# +--------+-----+----+----+-----+
# |agg_type|col_1|col2|col3|col_4|
# +--------+-----+----+----+-----+
# |     min|    2|   5|  18|   29|
# |     max|    8| 123|  26|  187|
# +--------+-----+----+----+-----+

# 停止Spark会话
spark.stop()

注意事项与总结

  1. 列名管理: 在聚合阶段,通过alias()为聚合结果列添加前缀(如min_,max_)是关键,这有助于在后续重塑阶段清晰地识别和操作这些列。
  2. operator.add 的使用: 示例中operator.add用于连接两个列表,它等同于简单的列表拼接操作(list1 + list2)。
  3. F.lit()的作用: F.lit('min')或F.lit('max')用于创建一个字面量列,其值在所有行中都相同。这对于标识不同聚合类型至关重要。
  4. F.col()与alias(): 在重塑阶段,F.col(f'min_{c}').alias(c)的作用是选取带有特定前缀的列,并将其重命名回原始的列名,以保持最终结果的列名一致性。
  5. cache()的重要性: df_agg_raw.cache()在执行min_df和max_df的select操作之前,将中间聚合结果持久化到内存中。这避免了每次创建min_df和max_df时都重新计算原始DataFrame的聚合,显著提升了性能。
  6. unionByName(): unionByName()是合并具有相同列名但可能顺序不同的DataFrame的理想选择。它会根据列名进行匹配,而不是列的物理位置,从而增加了代码的健壮性。
  7. 扩展性: 这种方法不仅限于min和max,您可以轻松扩展到其他聚合函数(如avg, sum, count等),只需相应地修改聚合表达式和重塑逻辑即可。

通过上述方法,我们能够灵活地控制PySpark聚合结果的输出格式,满足将不同聚合类型以行式结构呈现的特定分析需求,同时兼顾了性能优化。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

WorkBuddy
WorkBuddy

腾讯云推出的AI原生桌面智能体工作台

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
counta和count的区别
counta和count的区别

Count函数用于计算指定范围内数字的个数,而CountA函数用于计算指定范围内非空单元格的个数。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

203

2023.11.20

PHP 高并发与性能优化
PHP 高并发与性能优化

本专题聚焦 PHP 在高并发场景下的性能优化与系统调优,内容涵盖 Nginx 与 PHP-FPM 优化、Opcode 缓存、Redis/Memcached 应用、异步任务队列、数据库优化、代码性能分析与瓶颈排查。通过实战案例(如高并发接口优化、缓存系统设计、秒杀活动实现),帮助学习者掌握 构建高性能PHP后端系统的核心能力。

114

2025.10.16

PHP 数据库操作与性能优化
PHP 数据库操作与性能优化

本专题聚焦于PHP在数据库开发中的核心应用,详细讲解PDO与MySQLi的使用方法、预处理语句、事务控制与安全防注入策略。同时深入分析SQL查询优化、索引设计、慢查询排查等性能提升手段。通过实战案例帮助开发者构建高效、安全、可扩展的PHP数据库应用系统。

99

2025.11.13

JavaScript 性能优化与前端调优
JavaScript 性能优化与前端调优

本专题系统讲解 JavaScript 性能优化的核心技术,涵盖页面加载优化、异步编程、内存管理、事件代理、代码分割、懒加载、浏览器缓存机制等。通过多个实际项目示例,帮助开发者掌握 如何通过前端调优提升网站性能,减少加载时间,提高用户体验与页面响应速度。

36

2025.12.30

JavaScript浏览器渲染机制与前端性能优化实践
JavaScript浏览器渲染机制与前端性能优化实践

本专题围绕 JavaScript 在浏览器中的执行与渲染机制展开,系统讲解 DOM 构建、CSSOM 解析、重排与重绘原理,以及关键渲染路径优化方法。内容涵盖事件循环机制、异步任务调度、资源加载优化、代码拆分与懒加载等性能优化策略。通过真实前端项目案例,帮助开发者理解浏览器底层工作原理,并掌握提升网页加载速度与交互体验的实用技巧。

102

2026.03.06

TypeScript类型系统进阶与大型前端项目实践
TypeScript类型系统进阶与大型前端项目实践

本专题围绕 TypeScript 在大型前端项目中的应用展开,深入讲解类型系统设计与工程化开发方法。内容包括泛型与高级类型、类型推断机制、声明文件编写、模块化结构设计以及代码规范管理。通过真实项目案例分析,帮助开发者构建类型安全、结构清晰、易维护的前端工程体系,提高团队协作效率与代码质量。

25

2026.03.13

Python异步编程与Asyncio高并发应用实践
Python异步编程与Asyncio高并发应用实践

本专题围绕 Python 异步编程模型展开,深入讲解 Asyncio 框架的核心原理与应用实践。内容包括事件循环机制、协程任务调度、异步 IO 处理以及并发任务管理策略。通过构建高并发网络请求与异步数据处理案例,帮助开发者掌握 Python 在高并发场景中的高效开发方法,并提升系统资源利用率与整体运行性能。

44

2026.03.12

C# ASP.NET Core微服务架构与API网关实践
C# ASP.NET Core微服务架构与API网关实践

本专题围绕 C# 在现代后端架构中的微服务实践展开,系统讲解基于 ASP.NET Core 构建可扩展服务体系的核心方法。内容涵盖服务拆分策略、RESTful API 设计、服务间通信、API 网关统一入口管理以及服务治理机制。通过真实项目案例,帮助开发者掌握构建高可用微服务系统的关键技术,提高系统的可扩展性与维护效率。

174

2026.03.11

Go高并发任务调度与Goroutine池化实践
Go高并发任务调度与Goroutine池化实践

本专题围绕 Go 语言在高并发任务处理场景中的实践展开,系统讲解 Goroutine 调度模型、Channel 通信机制以及并发控制策略。内容包括任务队列设计、Goroutine 池化管理、资源限制控制以及并发任务的性能优化方法。通过实际案例演示,帮助开发者构建稳定高效的 Go 并发任务处理系统,提高系统在高负载环境下的处理能力与稳定性。

50

2026.03.10

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Java 教程
Java 教程

共578课时 | 81.6万人学习

国外Web开发全栈课程全集
国外Web开发全栈课程全集

共12课时 | 1万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号