我有一个任务,我将调用final它具有多个上游连接。当ShortCircuitOperator此任务跳过上游之一时,也会跳过。我不希望final任务被跳过,因为它必须报告 DAG 成功。
为了避免它被跳过,我使用了trigger_rule='all_done',但它仍然被跳过。
如果我使用BranchPythonOperator而不是ShortCircuitOperator final任务不会被跳过。看起来分支工作流可能是一个解决方案,即使不是最优的,但现在final不会考虑上游任务的失败。
如何让它仅在上游成功或跳过时运行?
示例短路 DAG:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import ShortCircuitOperator
from datetime import datetime
from random import randint
default_args = {
'owner': 'airflow',
'start_date': datetime(2018, 8, 1)}
dag = DAG(
'shortcircuit_test',
default_args=default_args,
schedule_interval='* * * * *',
catchup=False)
def shortcircuit_fn():
return randint(0, 1) == 1
task_1 = DummyOperator(dag=dag, task_id='task_1')
task_2 = DummyOperator(dag=dag, task_id='task_2')
work = …Run Code Online (Sandbox Code Playgroud) 在 python 中我可以轻松做到
@pytest.mark.parametrize('input, expected', [(1, 2), [2, 3]])
def test_tutu(input, expected):
assert input + 1 == expected
Run Code Online (Sandbox Code Playgroud)
我怎样才能在 golang 中做同样的事情?不用给自己写一个循环
func tutu(a int) int {
return a + 1
}
func Test_tutu(t *testing.T) {
tests := []struct {
input int
expected int
}{
{input: 1, expected: 2},
{input: 2, expected: 3},
}
for _, tt := range tests {
t.Run("", func(t *testing.T) {
assert.Equal(t, tutu(tt.input), tt.expected)
})
}
}
Run Code Online (Sandbox Code Playgroud)
那么 golang 中的 python 参数化相当于什么?
def parametrize(all_args_name: str, all_values: List[Any], …Run Code Online (Sandbox Code Playgroud) 我想执行窗口函数(具体来说是移动平均),但要在数据帧的所有列上执行。
我可以这样做
from pyspark.sql import SparkSession, functions as func
df = ...
df.select([func.avg(df[col]).over(windowSpec).alias(col) for col in df.columns])
Run Code Online (Sandbox Code Playgroud)
但恐怕这不是很有效。有没有更好的方法来做到这一点?
我有一个 ' 进程列表subprocess。我不和他们沟通,只是等待。
我想等待第一个过程完成(此解决方案有效):
import subprocess
a = subprocess.Popen(['...'])
b = subprocess.Popen(['...'])
# wait for the first process to finish
while True:
over = False
for child in {a, b}:
try:
rst = child.wait(timeout=5)
except subprocess.TimeoutExpired:
continue # this subprocess is still running
if rst is not None: # subprocess is no more running
over = True
break # If either subprocess exits, so do we.
if over:
break
Run Code Online (Sandbox Code Playgroud)
我不想使用 use os.wait(),因为它可能从另一个subprocess不属于我正在等待的列表中返回。
一个漂亮而优雅的解决方案可能是使用epollor …