您好,我正在学习 dagster,我需要启动日程安排方面的帮助,我可以在 dagit 中添加和启动日程安排,但我想自动启动日程安排,而不是从 dagit 打开每个日程安排。
#这是我的代码
@solid()
def test(context):
context.log.info("test")
@pipeline()
def testPipeline():
test()
@schedule(
cron_schedule="* * * * *", pipeline_name="testPipeline", execution_timezone="Asia/Kolkata"
)
def scheduleTest():
return {}
@repository()
def testRepo():
return [testPipeline, scheduleTest]
Run Code Online (Sandbox Code Playgroud) 我有一个由两个固体组成的 Dagster 管道(下面是可重现的示例)。第一个 ( return_some_list
) 输出一些对象的列表。第二个实体 ( print_num
) 接受第一个列表(不是完整列表)中的元素,并对该元素进行一些处理。
我该如何为第一个实体返回的列表中的每个元素调用第二个实体?还请解释任何最佳实践。
不确定这是否是最好的方法(让我知道),但我想print_num
为第一个实体输出的每个元素生成一个不同的实体实例。这将帮助我将来并行化实体并更好地处理长/计算密集型实体。
from dagster import execute_pipeline, pipeline, solid
@solid
def return_some_list(context):
return [1,2,3,4,5]
@solid
def print_num(context, some_num: int):
print(some_num)
return some_num
@pipeline
def some_pipeline():
output_list = return_some_list()
for some_num in output_list:
print_num(some_num)
if __name__ == "__main__":
result = execute_pipeline(some_pipeline)
Run Code Online (Sandbox Code Playgroud) 假设我使用以下实体创建了一个 Dagster 管道:
我想对 10 个不同的表并行执行此操作。每个表都需要不同的 SQL 查询。最好的方法是什么?
假设我在 Dagster 中有两个实体连接在管道上。第一个实体可以执行某些处理并生成有效输入,以便管道的其余部分执行,或者生成不应进一步处理的无效输入。为了实现此结果,当数据满足无效条件时,我会引发错误,以便管道停止并跳过其余的实体。
提出错误来解决我的用例似乎很棘手,有没有一种方法可以让我跳过管道其余部分的执行而不诉诸异常?
from dagster import solid, pipeline
@solid
def solid_1(context, x: int):
y = x + 1
if y%2 == 0:
raise "No even number is further processed"
return y
@solid
def solid_2(context, y:int):
return y**2
@pipeline
def toy_pipeline():
solid_2(solid_1())
Run Code Online (Sandbox Code Playgroud)
在这个非常人为的示例中,只有当第一个实体的输出为奇数时才应执行实体 2。
在我的实际用例中,第一个实体轮询数据库,有时找不到要处理的数据。在这种情况下,不将执行标记为失败而是标记为成功是有意义的。可以检查每个下游实体中的数据是否满足条件,但这很快就会增加样板文件。当接收数据的实体找不到要处理的数据时,最好有一种方法可以跳过所有下游实体的执行。
我正在使用 dagster 0.11.3(撰写本文时最新版本)
我创建了一个 Dagster 管道(另存为 pipeline.py),如下所示:
@solid
def return_a(context):
return 12.34
@pipeline(
mode_defs=[
ModeDefinition(
executor_defs=[dask_executor] # Note: dask only!
)
]
)
def the_pipeline():
return_a()
Run Code Online (Sandbox Code Playgroud)
我将 DAGSTER_HOME 环境变量设置为包含名为 dagster.yaml 的文件的目录,该文件是一个空文件。这应该没问题,因为根据这些文档,默认值是合理的: https: //docs.dagster.io/deployment/dagster-instance。
我有一个现有的 Dask 集群在“scheduler:8786”上运行。基于这些文档:https://docs.dagster.io/deployment/custom-infra/dask,我创建了一个名为 config.yaml 的运行配置,如下所示:
execution:
dask:
config:
cluster:
existing:
address: "scheduler:8786"
Run Code Online (Sandbox Code Playgroud)
我已经成功地将这个运行配置与 Dagster 一起使用,如下所示:
$ dagster pipeline execute -f pipeline.py -c config.yaml
Run Code Online (Sandbox Code Playgroud)
(我检查了 Dask 日志并确保它确实在我的 Dask 集群上运行)
我的问题是: 如何让 Dagit 使用这个 Dask 集群?我发现的唯一似乎相关的是: https: //docs.dagster.io/_apidocs/execution#executors
...但它甚至没有提到 Dask 作为一个选项(它有 dagster.in_process_executor 和 dagster.multiprocess_executor,它们似乎与 dask 完全无关)。 …
考虑这个例子 - 您需要从源数据库加载 table1,进行一些通用转换(例如转换带时间戳列的时区)并将结果数据写入 Snowflake。这是一个简单的操作,可以使用 3 个 dagster 操作来实现。
现在,假设您需要对 100 个表执行相同的操作。你会如何用 dagster 来做呢?您真的需要创建 100 个工作/图表吗?或者您可以创建一项将被执行 100 次的作业吗?您可以限制同时运行的这些作业的数量吗?
I've started using Dagster in our ML pipeline, and am running into some basic issues that I'm wondering if I'm missing something trivial here or if this is just how it is...
Say I have a simple ML pipepline:
Load raw data --> Process data into table --> Split train / test --> train model --> evaluate model.
Run Code Online (Sandbox Code Playgroud)
A linear model is straight forward in Dagster. But what if I want to add a little loop, say for cross-validation purposes: …