0

0

Delta Live Tables:高效地为所有列应用数据质量期望

DDD

DDD

发布时间:2025-12-02 11:31:01

|

579人浏览过

|

来源于php中文网

原创

delta live tables:高效地为所有列应用数据质量期望

本文探讨了如何在Delta Live Tables (DLT) 中高效地为表中的所有列动态应用数据质量期望,解决了传统方法中手动指定或低效循环的问题。通过利用`@dlt.expect_all_or_fail`装饰器,结合Python动态生成期望字典,用户可以轻松实现表级别的数据质量检查,确保数据完整性,并在不符合预期的行出现时及时中断管道,从而提升数据治理的自动化和可维护性。

引言:Delta Live Tables与数据质量期望

Delta Live Tables (DLT) 是Databricks提供的一个声明式框架,用于构建可靠、可维护且可测试的数据管道。其核心功能之一是内置的数据质量期望(Expectations),允许开发者在数据处理的不同阶段定义数据必须满足的条件。这些期望可以用于验证数据的完整性、一致性和准确性,并在数据不符合预期时采取相应的行动(如失败、丢弃或隔离)。

通常,我们在DLT中使用@dlt.expect或@dlt.expect_or_fail等装饰器为表的特定列定义数据质量规则,例如:

import dlt
from pyspark.sql.functions import expr

@dlt.table(
  comment="Wikipedia clickstream data cleaned and prepared for analysis."
)
@dlt.expect("valid_current_page_title", "current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_prepared():
  return (
    dlt.read("clickstream_raw")
      .withColumn("click_count", expr("CAST(n AS INT)"))
      .withColumnRenamed("curr_title", "current_page_title")
      .withColumnRenamed("prev_title", "previous_page_title")
      .select("current_page_title", "click_count", "previous_page_title")
  )

然而,当需要对表中所有列应用相同的期望(例如,所有列都不能为NULL)时,手动为每一列编写@dlt.expect装饰器将变得繁琐且难以维护。

挑战:为所有列动态应用期望

在处理包含大量列的数据集时,如果需要对所有列应用相同的通用数据质量检查(例如,检查所有列是否为NULL),传统方法会面临以下问题:

  1. 手动重复: 为每列手动添加@dlt.expect装饰器,代码冗长且易出错。

  2. 低效循环: 尝试通过Python循环动态生成多个DLT表定义来模拟对每列的检查,如以下示例所示:

    # 这种方法非常低效且不推荐
    # for column in columns_list_order_table:     
    #     exec(f'''
    # @dlt.table(comment="null value validations for {column}")
    # @dlt.expect_or_drop("null values","is_null == false")
    # def null_validation_orders_for_column_{column}():
    #     df = dlt.read("bronze_orders")
    #     return df.withColumn("is_null", col("{column}").isNull())
    # ''')

    这种exec动态生成函数和表的方式不仅效率低下,而且难以调试和管理,严重违背了DLT声明式管道的初衷。

解决方案:使用 @dlt.expect_all_or_fail 动态应用期望

DLT提供了一个强大的装饰器@dlt.expect_all_or_fail(expectations),专门用于解决需要同时应用多个数据质量期望的场景。这个装饰器接受一个Python字典作为参数,字典的键是期望的描述,值是期望的约束条件。如果任何一行违反了字典中定义的任何一个期望,DLT管道将立即停止执行。

利用这个特性,我们可以通过Python代码动态生成包含所有列期望的字典,然后将其传递给@dlt.expect_all_or_fail。

Unscreen
Unscreen

AI智能视频背景移除工具

下载

步骤一:动态生成期望字典

首先,获取数据表的所有列名,然后遍历这些列名,为每列构造一个非NULL的期望。

# 假设这是您的列名列表
columns_list = ['state', 'store_id', 'product_category', 'SKU', 'price']

expectations = {}
for col_name in columns_list:
    # 为每列生成一个名为 "valid_<列名>" 的期望,并检查其是否为 NULL
    expectations[f"valid_{col_name}"] = f"{col_name} IS NOT NULL"

# 打印生成的期望字典,以便理解其结构
print(expectations)

上述代码将生成一个如下所示的字典:

{'valid_state': 'state IS NOT NULL', 
'valid_store_id': 'store_id IS NOT NULL',
'valid_product_category': 'product_category IS NOT NULL', 
'valid_SKU': 'SKU IS NOT NULL',
'valid_price': 'price IS NOT NULL'}

步骤二:将字典应用于 DLT 表

将生成的expectations字典传递给@dlt.expect_all_or_fail装饰器,并将其应用于您的DLT表定义。

import dlt
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

# 假设这是您的列名列表
columns_list = ['state', 'store_id', 'product_category', 'SKU', 'price']

# 动态生成期望字典
expectations = {}
for col_name in columns_list:
    expectations[f"valid_{col_name}"] = f"{col_name} IS NOT NULL"

# 定义表的Schema,这对于从CSV等文件读取数据时非常重要
# 实际应用中,您可能从数据源推断或定义更复杂的Schema
schema = StructType([
    StructField("state", StringType(), True),
    StructField("store_id", IntegerType(), True),
    StructField("product_category", StringType(), True),
    StructField("SKU", StringType(), True),
    StructField("price", DoubleType(), True)
])

@dlt.table(
  comment="原始点击流数据,已应用所有列的非空期望"
)
@dlt.expect_all_or_fail(expectations) # 将动态生成的期望字典应用于表
def clickstream_raw():
  # 假设从CSV文件读取数据,请根据实际情况修改路径和格式
  # 注意:如果CSV包含标题行,必须设置 .option("header", "true")
  return (
    spark.read
      .schema(schema) # 应用定义的Schema
      .option("header", "true") # 如果CSV有标题行,请设置为true
      .format("csv")
      .load("/path/to/your/csv/data.csv") # 替换为您的数据路径
  )

# 在实际的DLT管道中,您不需要手动创建SparkSession,DLT运行时会提供
# 此处仅为示例完整性,展示如何读取数据
# spark = SparkSession.builder.appName("DLTExpectationsExample").getOrCreate()

在这个例子中,clickstream_raw表在加载数据时,会检查columns_list中定义的每一列是否为NULL。如果任何一列的任何一行包含NULL值,整个DLT管道将立即失败,从而确保数据的严格质量。

注意事项与最佳实践

  1. 期望行为: @dlt.expect_all_or_fail的“或失败”行为意味着一旦有任何一行违反了任何一个期望,整个管道就会中断。如果您的需求是允许部分脏数据通过,但要记录或隔离它们,可以考虑使用@dlt.expect_all_or_drop(丢弃不符合期望的行)或@dlt.expect_all_or_quarantine(隔离不符合期望的行)。

  2. 动态列列表: 在实际生产环境中,columns_list可能不是硬编码的。您可以从数据源的Schema动态获取列名,例如:

    # 假设您已经读取了DataFrame 'df'
    # columns_list = df.columns
    # 或者从Spark Schema中提取
    # schema_df = spark.read.format("csv").load("/path/to/data.csv")
    # columns_list = [field.name for field in schema_df.schema.fields]
  3. 数据源配置: 当从文件(如CSV)读取数据时,确保正确配置数据源选项,例如header=true以正确解析列名。错误的配置可能导致列名不匹配,从而使期望失效或产生错误。

  4. 期望的复杂性: 虽然示例中展示的是简单的IS NOT NULL检查,但expectations字典中的约束条件可以是任何有效的Spark SQL表达式,允许您定义更复杂的业务规则。

  5. 可维护性: 这种动态生成期望的方式极大地提高了代码的可维护性。当表的列发生变化时,只需更新columns_list(或动态获取),而无需修改大量的@dlt.expect装饰器。

总结

通过巧妙地结合Python的动态编程能力和Delta Live Tables的@dlt.expect_all_or_fail装饰器,我们可以高效、声明式地为表中的所有列应用数据质量期望。这种方法不仅避免了手动重复和低效的循环,还提升了数据管道的鲁棒性、可维护性和自动化程度,是构建高质量DLT管道的关键实践。掌握这一技巧,将使您能够更有效地管理大规模数据的数据质量。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
数据分析工具有哪些
数据分析工具有哪些

数据分析工具有Excel、SQL、Python、R、Tableau、Power BI、SAS、SPSS和MATLAB等。详细介绍:1、Excel,具有强大的计算和数据处理功能;2、SQL,可以进行数据查询、过滤、排序、聚合等操作;3、Python,拥有丰富的数据分析库;4、R,拥有丰富的统计分析库和图形库;5、Tableau,提供了直观易用的用户界面等等。

1133

2023.10.12

SQL中distinct的用法
SQL中distinct的用法

SQL中distinct的语法是“SELECT DISTINCT column1, column2,...,FROM table_name;”。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

340

2023.10.27

SQL中months_between使用方法
SQL中months_between使用方法

在SQL中,MONTHS_BETWEEN 是一个常见的函数,用于计算两个日期之间的月份差。想了解更多SQL的相关内容,可以阅读本专题下面的文章。

381

2024.02.23

SQL出现5120错误解决方法
SQL出现5120错误解决方法

SQL Server错误5120是由于没有足够的权限来访问或操作指定的数据库或文件引起的。想了解更多sql错误的相关内容,可以阅读本专题下面的文章。

2152

2024.03.06

sql procedure语法错误解决方法
sql procedure语法错误解决方法

sql procedure语法错误解决办法:1、仔细检查错误消息;2、检查语法规则;3、检查括号和引号;4、检查变量和参数;5、检查关键字和函数;6、逐步调试;7、参考文档和示例。想了解更多语法错误的相关内容,可以阅读本专题下面的文章。

380

2024.03.06

oracle数据库运行sql方法
oracle数据库运行sql方法

运行sql步骤包括:打开sql plus工具并连接到数据库。在提示符下输入sql语句。按enter键运行该语句。查看结果,错误消息或退出sql plus。想了解更多oracle数据库的相关内容,可以阅读本专题下面的文章。

1683

2024.04.07

sql中where的含义
sql中where的含义

sql中where子句用于从表中过滤数据,它基于指定条件选择特定的行。想了解更多where的相关内容,可以阅读本专题下面的文章。

585

2024.04.29

sql中删除表的语句是什么
sql中删除表的语句是什么

sql中用于删除表的语句是drop table。语法为drop table table_name;该语句将永久删除指定表的表和数据。想了解更多sql的相关内容,可以阅读本专题下面的文章。

440

2024.04.29

C# ASP.NET Core微服务架构与API网关实践
C# ASP.NET Core微服务架构与API网关实践

本专题围绕 C# 在现代后端架构中的微服务实践展开,系统讲解基于 ASP.NET Core 构建可扩展服务体系的核心方法。内容涵盖服务拆分策略、RESTful API 设计、服务间通信、API 网关统一入口管理以及服务治理机制。通过真实项目案例,帮助开发者掌握构建高可用微服务系统的关键技术,提高系统的可扩展性与维护效率。

3

2026.03.11

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
最新Python教程 从入门到精通
最新Python教程 从入门到精通

共4课时 | 22.5万人学习

Django 教程
Django 教程

共28课时 | 4.9万人学习

SciPy 教程
SciPy 教程

共10课时 | 1.9万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号