是否可以使用 Dagster 创建动态工作?

mis*_*kin 3 dagster

考虑这个例子 - 您需要从源数据库加载 table1,进行一些通用转换(例如转换带时间戳列的时区)并将结果数据写入 Snowflake。这是一个简单的操作,可以使用 3 个 dagster 操作来实现。

现在,假设您需要对 100 个表执行相同的操作。你会如何用 dagster 来做呢?您真的需要创建 100 个工作/图表吗?或者您可以创建一项将被执行 100 次的作业吗?您可以限制同时运行的这些作业的数量吗?

小智 6

您有两个主要选项可以执行此操作:

  1. 使用具有动态输出的单个作业:

通过此设置,所有 ETL 都将在单个作业中发生。您将有一个初始操作,该操作将为您想要执行此过程的每个表名生成一个 DynamicOutput,并将其输入到将在每个单独的 DynamicOutput 上运行的一组操作(可能组织成一个图形)。

根据您使用的执行程序,可以限制总体步骤并发性(例如,默认的multiprocess_executor支持此选项)。

  1. 创建一个可配置的作业(我认为这更有可能是你想要的)
    from dagster import job, op, graph
    import pandas as pd
    
    
    @op(config_schema={"table_name": str})
    def extract_table(context) -> pd.DataFrame:
        table_name = context.op_config["table_name"]
        # do some load...
        return pd.DataFrame()
    
    
    @op
    def transform_table(table: pd.DataFrame) -> pd.DataFrame:
        # do some transform...
        return table
    
    
    @op(config_schema={"table_name": str})
    def load_table(context, table: pd.DataFrame):
        table_name = context.op_config["table_name"]
        # load to snowflake...
    
    
    @job
    def configurable_etl():
        load_table(transform_table(extract_table()))
    
    # this is what the configuration would look like to extract from table
    # src_foo and load into table dest_foo
    configurable_etl.execute_in_process(
        run_config={
            "ops": {
                "extract_table": {"config": {"table_name": "src_foo"}},
                "load_table": {"config": {"table_name": "dest_foo"}},
            }
        }
    )
Run Code Online (Sandbox Code Playgroud)

在这里,您可以通过为相关操作提供配置架构来创建一个可以指向源表和目标表的作业。根据这些配置选项(当您通过运行配置创建运行时提供),您的作业将在不同的源/目标表上运行。

该示例显示使用 python API 显式运行此作业,但如果您从 Dagit 运行它,您还可以在此处输入此配置的 yaml 版本。如果您想简化配置模式(因为它非常嵌套,如图所示),您始终可以创建一个配置映射以使界面更好:)

从这里,您可以通过向作业提供唯一标记并使用 QueuedRunCoordinator限制该标记的最大并发运行数来限制运行并发性。