
本文详解如何在 PySpark 中对按日期排序的数据进行“滚动窗口式”动态分组——即以首行 item_date 为起点,将后续落在其 max_window(如 +365 天)内的记录归入同一组,并自动触发新分组;全程纯 SQL 函数实现,零 UDF、零 Pandas 转换。
本文详解如何在 pyspark 中对按日期排序的数据进行“滚动窗口式”动态分组——即以首行 `item_date` 为起点,将后续落在其 `max_window`(如 +365 天)内的记录归入同一组,并自动触发新分组;全程纯 sql 函数实现,零 udf、零 pandas 转换。
在大规模时序数据处理中,常需按“逻辑业务周期”而非固定时间粒度(如月/年)进行分组,例如:将首次事件发生后的 365 天内所有关联事件划为一个生命周期组,后续超出则开启新周期。这种依赖前序累积状态的分组逻辑无法通过常规 groupBy() 或窗口聚合直接表达,但借助 PySpark 3.4+ 引入的高阶函数 aggregate() 与累积差值建模,可完全在 Catalyst 优化器内高效完成。
核心思路是:
- 按 item_date 排序,确保时序一致性;
- 计算相邻日期差(天数),转化为相对增量序列;
- 构建前缀累积差值列表,模拟“从组首日到当前行”的累计跨度;
- 用 aggregate() 模拟状态机:维护 (current_span, group_id) 元组,若新增差值使 current_span ≤ 365,则延续当前组;否则重置 current_span = 0 并递增 group_id。
以下为完整可运行代码(适配 Spark 3.4+):
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql.types import *
# 初始化 SparkSession(生产环境请配置相应 master)
spark = SparkSession.builder.appName("DateRangeGrouping").getOrCreate()
# 构造示例数据
sample_df = spark.createDataFrame([
('2020-01-01', '2021-01-01', 1),
('2020-02-01', '2021-02-01', 1),
('2021-01-15', '2022-01-15', 2),
('2022-01-15', '2023-01-15', 2),
('2022-02-01', '2023-02-01', 3),
('2022-03-01', '2023-03-01', 3),
('2023-03-01', '2024-03-01', 4),
], ['item_date', 'max_window', 'expected_grouping_index'])
# 步骤 1:转为 date 类型并排序
df = sample_df.withColumn("item_date", col("item_date").cast("date"))
window_spec = Window.orderBy("item_date")
# 步骤 2:计算与前一行的天数差(首行为 NULL)
df = df.withColumn("days_since_prev",
datediff(col("item_date"), lag(col("item_date"), 1).over(window_spec)))
# 步骤 3:收集从首行到当前行的所有差值(形成累积路径)
cumulative_window = window_spec.rowsBetween(Window.unboundedPreceding, Window.currentRow)
df = df.withColumn("diff_list", collect_list("days_since_prev").over(cumulative_window))
# 步骤 4:使用 aggregate 模拟分组状态机
# 初始状态:(累计跨度, 当前组ID) = (0, 0)
initial_state = array(lit(0), lit(0))
df = df.withColumn(
"group_state",
aggregate(
"diff_list",
initial_state,
lambda acc, x: when(
(acc[0] + coalesce(x, lit(0))) <= 365, # 若累加后仍在窗口内
array(acc[0] + coalesce(x, lit(0)), acc[1]) # 延续当前组:更新跨度
).otherwise(
array(lit(0), acc[1] + 1) # 超出窗口:重置跨度,组ID+1
)
)
)
# 步骤 5:提取 group_id(即 state[1])
result_df = df.withColumn("group_id", col("group_state")[1])
# 最终结果:每组仅保留首行(按 item_date 升序,即 row_number() == 1)
final_df = result_df.withColumn("rn", row_number().over(Window.partitionBy("group_id").orderBy("item_date"))) \
.filter(col("rn") == 1) \
.drop("diff_list", "group_state", "rn", "days_since_prev")
result_df.select("item_date", "max_window", "expected_grouping_index", "group_id").show(truncate=False)关键注意事项:
✅ 性能保障:全程使用 Catalyst 内置函数,避免 UDF 的 JVM 序列化开销与不可优化性;collect_list 在窗口内可控(数据已排序且分组天然稀疏),实践中建议对超长序列加 limit 防止 OOM。
⚠️ 边界鲁棒性:lag() 产生的首行 days_since_prev 为 NULL,coalesce(x, lit(0)) 确保 aggregate 计算安全;若实际窗口非固定 365 天(如依赖 max_window 字段),可将
? 扩展应用:该模式可泛化至任意“状态驱动分组”,如会话超时分组(30 分钟无活动则新建 session)、库存批次合并(累计数量 ≤ 阈值则同批)等场景。
通过此方法,你不仅能精准复现示例中的 expected_grouping_index(对应 group_id + 1),更能获得可扩展、可维护、高性能的生产级分组能力——真正践行“用 Spark 的方式解决 Spark 的问题”。










