Dagster资产间数据传递与用户配置管理教程

碧海醫心
发布: 2025-11-29 13:05:00
原创
475人浏览过

Dagster资产间数据传递与用户配置管理教程

本教程旨在解决dagster中常见的资产间数据传递和用户自定义配置(config)使用问题。通过详细解析错误案例,展示如何正确地将上游资产的输出作为参数传递给下游资产,并有效利用config对象接收用户定义的运行时参数,从而构建健壮、可配置的dagster数据管道,避免`dagsterinvalidconfigerror`等配置与数据流错误。

在数据工程实践中,我们经常需要构建可配置的数据管道,允许用户在运行时输入参数,例如数据拉取的起始日期或特定的筛选条件。同时,数据管道中的各个步骤(在Dagster中通常表现为“资产”)之间需要高效、明确地传递数据。然而,在Dagster中,如果不正确地处理用户配置和资产间的数据流,可能会遇到诸如DagsterInvalidConfigError之类的错误。本教程将深入探讨如何正确地实现这些功能。

理解Dagster中的资产与配置

Dagster的核心理念之一是“软件定义资产”(Software-Defined Assets)。每个资产都代表数据系统中的一个逻辑实体,并且可以定义其如何被计算。资产之间的依赖关系和数据流是其关键特性。

用户自定义配置 (Config)

Dagster通过Config类提供了一种声明式的方式来定义资产在运行时所需的参数。这些参数可以在Dagster UI中由用户输入,或通过编程方式提供。

from dagster import Config

class FruitConfig(Config):
    fruit_select: str
登录后复制

上述代码定义了一个名为FruitConfig的配置对象,它包含一个字符串类型的参数fruit_select。当一个资产需要此配置时,它会在其函数签名中声明一个类型为FruitConfig的参数。

资产间的数据传递

在Dagster中,资产的输出是其下游资产的输入。这种传递不是通过在下游资产中“调用”上游资产函数来实现的,而是通过将上游资产的输出作为参数注入到下游资产的函数中。

常见错误模式与原因分析

考虑以下不正确的Dagster资产定义,它试图使用用户配置并传递数据:

# 错误示例:不正确的资产定义
import pandas as pd
# ... 其他导入 ...
from dagster import asset, Config

# ... generate_dataset 资产定义 (与正确示例相同,略) ...

class fruit_config(Config):
    fruit_select: str 

@asset(deps=[generate_dataset]) # deps在这里用于数据传递是错误的
def filter_data(config: fruit_config):
    # 错误:不应在此处调用上游资产来获取数据
    df = generate_dataset() 
    df2 = df[df['fruit'] == config.fruit_select]
    print(df2)
    return df2

@asset(deps=[filter_data]) # deps在这里用于数据传递是错误的
def filter_again():
    # 错误:不应在此处调用上游资产来获取数据
    df2 = filter_data() 
    df3 = df2[df2['units'] > 5]
    print(df3)
    return df3
登录后复制

上述代码存在以下主要问题:

  1. 错误的资产数据获取方式: 在filter_data资产中,通过直接调用generate_dataset()来获取上游数据是错误的。在Dagster的资产模型中,上游资产的输出会作为参数自动注入到下游资产中。直接调用会导致每次运行时都重新执行上游资产,并且无法正确建立数据流依赖。同样的问题也存在于filter_again资产中。
  2. deps参数的误用: @asset装饰器中的deps参数用于声明“非数据流”依赖,即一个资产的执行依赖于另一个资产的完成,但不需要其输出数据。如果需要传递数据,应通过函数参数显式声明。
  3. 潜在的配置解析问题: 当filter_data试图在内部调用generate_dataset()时,Dagster的运行时可能无法正确解析filter_data所需的配置,因为其输入签名与实际的数据流期望不符,从而引发DagsterInvalidConfigError。

正确实现:数据流与配置的结合

为了正确地实现用户配置和资产间的数据传递,我们需要遵循Dagster的推荐模式:

JTopCms建站系统
JTopCms建站系统

JTopCMS基于JavaEE自主研发,是用于管理站群内容的国产开源软件(CMS),能高效便捷地进行内容采编,审核,模板制作,用户交互以及文件等资源的维护。安全,稳定,易扩展,支持国产中间件及数据库,适合建设政府,教育以及企事业单位的站群系统。 系统特色 1. 基于 JAVA 标准自主研发,支持主流国产信创环境,国产数据库以及国产中间件。安全,稳定,经过多次政务与企事业单位项目长期检验,顺利通过

JTopCms建站系统 0
查看详情 JTopCms建站系统
  1. 将上游资产的输出作为参数传递给下游资产。
  2. 为资产函数的参数和返回值添加类型提示,增强可读性和运行时检查。
  3. 将Config对象作为参数传递给需要配置的资产。

以下是修正后的代码示例:

import pandas as pd
import random
from datetime import datetime, timedelta
from dagster import asset, Config, materialize # materialize用于本地测试

# 1. 定义生成原始数据集的资产
@asset 
def generate_dataset() -> pd.DataFrame:
    """
    生成一个包含水果、单位和日期的随机数据集。
    """
    def random_dates(start_date, end_date, n=10):
        date_range = end_date - start_date
        return [start_date + timedelta(days=random.randint(0, date_range.days)) for _ in range(n)]

    random.seed(42) # 保证可复现性
    num_rows = 100
    fruits = ['Apple', 'Banana', 'Orange', 'Grapes', 'Kiwi']

    df = pd.DataFrame({
        'fruit': [random.choice(fruits) for _ in range(num_rows)],
        'units': [random.randint(1, 10) for _ in range(num_rows)],
        'date': random_dates(datetime(2022, 1, 1), datetime(2022, 12, 31), num_rows)
    })
    print("Generated Dataset:")
    print(df.head())
    return df

# 2. 定义用户配置类
class FruitConfig(Config):
    """
    用户自定义配置,用于选择要筛选的水果。
    """
    fruit_select: str 

# 3. 定义筛选数据的资产,接收上游资产输出和用户配置
@asset 
def filter_data(generate_dataset: pd.DataFrame, config: FruitConfig) -> pd.DataFrame:
    """
    根据用户配置的fruit_select筛选数据集。

    Args:
        generate_dataset (pd.DataFrame): 上游资产generate_dataset的输出。
        config (FruitConfig): 用户提供的配置对象。

    Returns:
        pd.DataFrame: 筛选后的数据集。
    """
    # 直接使用传入的generate_dataset参数
    df_filtered = generate_dataset[generate_dataset['fruit'] == config.fruit_select]
    print(f"\nFiltered Data for '{config.fruit_select}':")
    print(df_filtered.head())
    return df_filtered

# 4. 定义再次筛选的资产,接收上游资产输出
@asset
def filter_again(filter_data: pd.DataFrame) -> pd.DataFrame:
    """
    对上游filter_data资产的输出进行二次筛选(单位大于5)。

    Args:
        filter_data (pd.DataFrame): 上游资产filter_data的输出。

    Returns:
        pd.DataFrame: 再次筛选后的数据集。
    """
    # 直接使用传入的filter_data参数
    df_final = filter_data[filter_data['units'] > 5]
    print("\nFinal Filtered Data (units > 5):")
    print(df_final.head())
    return df_final

# 示例:如何在本地运行包含配置的资产
if __name__ == "__main__":
    # 使用materialize函数在本地运行资产
    # 传递Config的方式是嵌套在'ops'字典中,对应资产名和'config'键
    result = materialize(
        [generate_dataset, filter_data, filter_again],
        run_config={
            "ops": {
                "filter_data": {
                    "config": {
                        "fruit_select": "Banana" # 用户在此处定义参数
                    }
                }
            }
        }
    )
    assert result.success
    print("\nPipeline executed successfully!")
登录后复制

代码解析与最佳实践

  1. 资产函数签名:

    • filter_data(generate_dataset: pd.DataFrame, config: FruitConfig) -> pd.DataFrame:
      • generate_dataset: pd.DataFrame:明确表示filter_data资产依赖于名为generate_dataset的上游资产的输出,并且该输出的类型是pd.DataFrame。Dagster运行时会自动将generate_dataset资产的返回值注入到此参数中。
      • config: FruitConfig:声明此资产需要一个FruitConfig类型的配置对象。用户在Dagster UI或通过run_config提供的值将填充此对象。
      • -> pd.DataFrame:这是Python的类型提示,表明filter_data资产将返回一个pd.DataFrame对象。这对于Dagster理解资产的输出类型至关重要,也增强了代码的可读性。
    • filter_again(filter_data: pd.DataFrame) -> pd.DataFrame:
      • 同样地,filter_again资产接收filter_data资产的输出作为其输入参数。
  2. 移除deps参数:

    • 在正确实现中,@asset装饰器不再需要deps参数来表示数据流依赖。当一个资产函数的参数与另一个资产的名称匹配时,Dagster会自动识别并建立数据流依赖。
  3. 本地运行与配置:

    • 在if __name__ == "__main__":块中,展示了如何使用materialize函数在本地运行这些资产。
    • run_config字典用于提供运行时配置。对于资产级别的配置,它需要嵌套在"ops"键下,然后是资产名称,再是"config"键,最后是配置参数。例如,{"ops": {"filter_data": {"config": {"fruit_select": "Banana"}}}}。

总结

通过本教程,我们学习了在Dagster中构建可配置数据管道的关键原则:

  • 明确的资产输入/输出: 使用函数参数来接收上游资产的输出和用户配置。
  • 类型提示: 强烈建议为资产函数的参数和返回值添加类型提示,这不仅提高了代码的可读性,也帮助Dagster在运行时进行验证。
  • 正确使用Config: 将Config对象作为资产函数的参数,Dagster会自动处理配置的解析和注入。
  • 避免在下游资产中直接调用上游资产: 这种做法违背了Dagster的数据流模型,会导致错误和低效。

遵循这些最佳实践,可以有效地避免常见的配置和数据流错误,构建出更加健壮、可维护的Dagster数据管道。

以上就是Dagster资产间数据传递与用户配置管理教程的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号