
本教程旨在解决dagster中常见的资产间数据传递和用户自定义配置(config)使用问题。通过详细解析错误案例,展示如何正确地将上游资产的输出作为参数传递给下游资产,并有效利用config对象接收用户定义的运行时参数,从而构建健壮、可配置的dagster数据管道,避免`dagsterinvalidconfigerror`等配置与数据流错误。
在数据工程实践中,我们经常需要构建可配置的数据管道,允许用户在运行时输入参数,例如数据拉取的起始日期或特定的筛选条件。同时,数据管道中的各个步骤(在Dagster中通常表现为“资产”)之间需要高效、明确地传递数据。然而,在Dagster中,如果不正确地处理用户配置和资产间的数据流,可能会遇到诸如DagsterInvalidConfigError之类的错误。本教程将深入探讨如何正确地实现这些功能。
Dagster的核心理念之一是“软件定义资产”(Software-Defined Assets)。每个资产都代表数据系统中的一个逻辑实体,并且可以定义其如何被计算。资产之间的依赖关系和数据流是其关键特性。
Dagster通过Config类提供了一种声明式的方式来定义资产在运行时所需的参数。这些参数可以在Dagster UI中由用户输入,或通过编程方式提供。
from dagster import Config
class FruitConfig(Config):
fruit_select: str上述代码定义了一个名为FruitConfig的配置对象,它包含一个字符串类型的参数fruit_select。当一个资产需要此配置时,它会在其函数签名中声明一个类型为FruitConfig的参数。
在Dagster中,资产的输出是其下游资产的输入。这种传递不是通过在下游资产中“调用”上游资产函数来实现的,而是通过将上游资产的输出作为参数注入到下游资产的函数中。
考虑以下不正确的Dagster资产定义,它试图使用用户配置并传递数据:
# 错误示例:不正确的资产定义
import pandas as pd
# ... 其他导入 ...
from dagster import asset, Config
# ... generate_dataset 资产定义 (与正确示例相同,略) ...
class fruit_config(Config):
fruit_select: str
@asset(deps=[generate_dataset]) # deps在这里用于数据传递是错误的
def filter_data(config: fruit_config):
# 错误:不应在此处调用上游资产来获取数据
df = generate_dataset()
df2 = df[df['fruit'] == config.fruit_select]
print(df2)
return df2
@asset(deps=[filter_data]) # deps在这里用于数据传递是错误的
def filter_again():
# 错误:不应在此处调用上游资产来获取数据
df2 = filter_data()
df3 = df2[df2['units'] > 5]
print(df3)
return df3上述代码存在以下主要问题:
为了正确地实现用户配置和资产间的数据传递,我们需要遵循Dagster的推荐模式:
JTopCMS基于JavaEE自主研发,是用于管理站群内容的国产开源软件(CMS),能高效便捷地进行内容采编,审核,模板制作,用户交互以及文件等资源的维护。安全,稳定,易扩展,支持国产中间件及数据库,适合建设政府,教育以及企事业单位的站群系统。 系统特色 1. 基于 JAVA 标准自主研发,支持主流国产信创环境,国产数据库以及国产中间件。安全,稳定,经过多次政务与企事业单位项目长期检验,顺利通过
0
以下是修正后的代码示例:
import pandas as pd
import random
from datetime import datetime, timedelta
from dagster import asset, Config, materialize # materialize用于本地测试
# 1. 定义生成原始数据集的资产
@asset
def generate_dataset() -> pd.DataFrame:
"""
生成一个包含水果、单位和日期的随机数据集。
"""
def random_dates(start_date, end_date, n=10):
date_range = end_date - start_date
return [start_date + timedelta(days=random.randint(0, date_range.days)) for _ in range(n)]
random.seed(42) # 保证可复现性
num_rows = 100
fruits = ['Apple', 'Banana', 'Orange', 'Grapes', 'Kiwi']
df = pd.DataFrame({
'fruit': [random.choice(fruits) for _ in range(num_rows)],
'units': [random.randint(1, 10) for _ in range(num_rows)],
'date': random_dates(datetime(2022, 1, 1), datetime(2022, 12, 31), num_rows)
})
print("Generated Dataset:")
print(df.head())
return df
# 2. 定义用户配置类
class FruitConfig(Config):
"""
用户自定义配置,用于选择要筛选的水果。
"""
fruit_select: str
# 3. 定义筛选数据的资产,接收上游资产输出和用户配置
@asset
def filter_data(generate_dataset: pd.DataFrame, config: FruitConfig) -> pd.DataFrame:
"""
根据用户配置的fruit_select筛选数据集。
Args:
generate_dataset (pd.DataFrame): 上游资产generate_dataset的输出。
config (FruitConfig): 用户提供的配置对象。
Returns:
pd.DataFrame: 筛选后的数据集。
"""
# 直接使用传入的generate_dataset参数
df_filtered = generate_dataset[generate_dataset['fruit'] == config.fruit_select]
print(f"\nFiltered Data for '{config.fruit_select}':")
print(df_filtered.head())
return df_filtered
# 4. 定义再次筛选的资产,接收上游资产输出
@asset
def filter_again(filter_data: pd.DataFrame) -> pd.DataFrame:
"""
对上游filter_data资产的输出进行二次筛选(单位大于5)。
Args:
filter_data (pd.DataFrame): 上游资产filter_data的输出。
Returns:
pd.DataFrame: 再次筛选后的数据集。
"""
# 直接使用传入的filter_data参数
df_final = filter_data[filter_data['units'] > 5]
print("\nFinal Filtered Data (units > 5):")
print(df_final.head())
return df_final
# 示例:如何在本地运行包含配置的资产
if __name__ == "__main__":
# 使用materialize函数在本地运行资产
# 传递Config的方式是嵌套在'ops'字典中,对应资产名和'config'键
result = materialize(
[generate_dataset, filter_data, filter_again],
run_config={
"ops": {
"filter_data": {
"config": {
"fruit_select": "Banana" # 用户在此处定义参数
}
}
}
}
)
assert result.success
print("\nPipeline executed successfully!")资产函数签名:
移除deps参数:
本地运行与配置:
通过本教程,我们学习了在Dagster中构建可配置数据管道的关键原则:
遵循这些最佳实践,可以有效地避免常见的配置和数据流错误,构建出更加健壮、可维护的Dagster数据管道。
以上就是Dagster资产间数据传递与用户配置管理教程的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号