小编Jul*_*ide的帖子

气流 - 处理DAG回调的正确方法

我有一个DAG然后无论何时成功或失败,我希望它触发一个发布到Slack的方法.

我的DAG args情况如下:

default_args = {
    [...]
    'on_failure_callback': slack.slack_message(sad_message),
    'on_success_callback': slack.slack_message(happy_message),
    [...]
}
Run Code Online (Sandbox Code Playgroud)

DAG定义本身:

dag = DAG(
    dag_id = dag_name_id,
    default_args=default_args,
    description='load data from mysql to S3',
    schedule_interval='*/10 * * * *',
    catchup=False
      )
Run Code Online (Sandbox Code Playgroud)

但是,当我检查Slack时,每分钟有超过100条消息,好像正在评估每个调度程序心跳,并且对于每个日志,它确实运行了成功和失败方法,就好像它工作并且不适用于同一个任务实例(不是精细).

我应该如何正确使用on_failure_callbackon_success_callback处理dags状态并调用自定义方法?

python airflow airflow-scheduler

1
推荐指数
2
解决办法
2644
查看次数

气流-从dag上下文回调中解析任务ID

刚开始使用dag callbackon_failure_callbackon_success_callback)时,我认为完成后会触发successor fail状态dag(如dag中定义)。但是随后似乎每个实例都task instance没有实例化它dag run,因此,如果DAG具有N个任务,它将触发N次这些回调。

我正在尝试捕获任务ID,因此发送到松弛状态。在阅读另一个相关问题时,我想到了以下内容:

def success_msg(context):
    slack.slack_message(context['task_instance']); #send task-id to slack

def failure_msg(context):
    slack.slack_message(context['task_instance']); #send task-id to slack

default_args = {
    [...]
    'on_failure_callback': failure_msg,
    'on_success_callback': success_msg,
    [...]
}
Run Code Online (Sandbox Code Playgroud)

但是它失败了,我应该如何解析上下文变量并因此获得任务ID?

airflow

0
推荐指数
1
解决办法
3965
查看次数

使用Ruby从map变量返回子串

我有一个来自地图的变量,我试图在方括号之间得到一个特定的部分

(例如, "dmfkdmfk [IWANTTHISPART] mlkm")

但它不像我那样工作.我正在尝试这里使用的相同方式.

原始代码:

query_values = activities.map do |activity|
  '(' +
  "#{activity['note']}"
  +')'

end
Run Code Online (Sandbox Code Playgroud)

我试过了:

query_values = activities.map do |activity|
  '(' +
  "#{activity['note'].[/#{"["}(.*?)#{"]"}/m, 1]}" 
  +')'

end
Run Code Online (Sandbox Code Playgroud)

错误日志:

syntax error, unexpected '[', expecting '('
      '(' + "#{activity['note'].[/#{"["}(.*?)#{"]"}/m, 1]},""'" +')'
                                 ^
quase.rb:40: syntax error, unexpected keyword_end, expecting tSTRING_DEND
Run Code Online (Sandbox Code Playgroud)

我该怎么办?非常感谢.

ruby orm ruby-on-rails

-1
推荐指数
1
解决办法
92
查看次数

标签 统计

airflow ×2

airflow-scheduler ×1

orm ×1

python ×1

ruby ×1

ruby-on-rails ×1