小编rap*_*auv的帖子

气流:当某些上游被短路跳过时运行任务

我有一个任务,我将调用final它具有多个上游连接。当ShortCircuitOperator此任务跳过上游之一时,也会跳过。我不希望final任务被跳过,因为它必须报告 DAG 成功。

为了避免它被跳过,我使用了trigger_rule='all_done',但它仍然被跳过。

如果我使用BranchPythonOperator而不是ShortCircuitOperator final任务不会被跳过。看起来分支工作流可能是一个解决方案,即使不是最优的,但现在final不会考虑上游任务的失败。

如何让它仅在上游成功或跳过时运行?

示例短路 DAG:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import ShortCircuitOperator
from datetime import datetime
from random import randint

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2018, 8, 1)}

dag = DAG(
    'shortcircuit_test',
    default_args=default_args,
    schedule_interval='* * * * *',
    catchup=False)

def shortcircuit_fn():
    return randint(0, 1) == 1

task_1 = DummyOperator(dag=dag, task_id='task_1')
task_2 = DummyOperator(dag=dag, task_id='task_2')

work = …
Run Code Online (Sandbox Code Playgroud)

airflow

10
推荐指数
3
解决办法
1万
查看次数

go - golang 测试参数化

在 python 中我可以轻松做到

@pytest.mark.parametrize('input, expected', [(1, 2), [2, 3]])
def test_tutu(input, expected):
    assert input + 1 == expected
Run Code Online (Sandbox Code Playgroud)

我怎样才能在 golang 中做同样的事情?不用给自己写一个循环

func tutu(a int) int {
    return a + 1
}

func Test_tutu(t *testing.T) {
    tests := []struct {
        input    int
        expected int
    }{
        {input: 1, expected: 2},
        {input: 2, expected: 3},
    }

    for _, tt := range tests {
        t.Run("", func(t *testing.T) {
            assert.Equal(t, tutu(tt.input), tt.expected)
        })
    }
}
Run Code Online (Sandbox Code Playgroud)

那么 golang 中的 python 参数化相当于什么?

def parametrize(all_args_name: str, all_values: List[Any], …
Run Code Online (Sandbox Code Playgroud)

testing go

9
推荐指数
1
解决办法
5065
查看次数

在多列上应用窗口函数

我想执行窗口函数(具体来说是移动平均),但要在数据帧的所有列上执行。

我可以这样做

from pyspark.sql import SparkSession, functions as func

df = ...

df.select([func.avg(df[col]).over(windowSpec).alias(col) for col in df.columns])
Run Code Online (Sandbox Code Playgroud)

但恐怕这不是很有效。有没有更好的方法来做到这一点?

apache-spark apache-spark-sql

6
推荐指数
1
解决办法
2333
查看次数

等待第一个子进程完成

我有一个 ' 进程列表subprocess。我不和他们沟通,只是等待。

我想等待第一个过程完成(此解决方案有效):

import subprocess

a = subprocess.Popen(['...'])
b = subprocess.Popen(['...'])

# wait for the first process to finish
while True:
    over = False
    for child in {a, b}:
        try:
            rst = child.wait(timeout=5)
        except subprocess.TimeoutExpired:
            continue  # this subprocess is still running

        if rst is not None:  # subprocess is no more running
            over = True
            break  # If either subprocess exits, so do we.
    if over:
        break
Run Code Online (Sandbox Code Playgroud)

我不想使用 use os.wait(),因为它可能从另一个subprocess不属于我正在等待的列表中返回。

一个漂亮而优雅的解决方案可能是使用epollor …

python subprocess wait

6
推荐指数
1
解决办法
4776
查看次数