一般来说,对于每个 Jira 票证示例:edw-123,我创建一个功能分支,feature/edw-123并且我可以在 Jira 票证中看到更新如下
我有 2 个 Jira 票证可以处理,例如edw-222 & edw-333,如何创建一个功能/分支,将两个票证合二为一,以便两个 Jira 票证都更新或至少具有提到的两个票证
需求:根据SQL的返回值分支下一个任务
我尝试使用branchsqloperator实现
错误:但出现 TypeError: 'XComArg' object is not iterable 错误
下面是SQL查询
SELECT
CASE
WHEN COUNT(*) > 2 THEN 0 --false
ELSE 1 --true
END
FROM
TABLE_A;
Run Code Online (Sandbox Code Playgroud)
达格:
@task(task_id="task_a")
def task_a():
return "job_success"
@task(task_id="task_b")
def task_b():
return "job_failure"
task_a = task_a()
task_b = task_b()
branch_sql = BranchSQLOperator(
task_id='branch_sql',
sql='query.sql',
follow_task_ids_if_true = task_a,
follow_task_ids_if_false = task_b,
conn_id = 'default_conn_id',
)
branch_sql >> [task_a,task_b]
Run Code Online (Sandbox Code Playgroud)
[![在此处输入图像描述][1]][1]
以下是日志:
[2022-07-20, 21:11:27 UTC] {crypto.py:82} WARNING - empty cryptography key - values will not be stored encrypted.
[2022-07-20, …Run Code Online (Sandbox Code Playgroud) 要求:要求是下载 s3 文件并使用逻辑编辑该文件并将文件上传回相同的 s3 位置
逻辑:编辑文件的最后一行,我需要删除尾随分隔符和双引号,这是我能够使用 python 函数实现的
问题:如何将 python 脚本或 python 函数传递给操作员S3FileTransformOperator并更新文件?如果不可行S3FileTransformOperator,我怎样才能使用 boto3 实现下载、编辑和上传文件的功能
Python函数:
# read the file into a list of lines
lines = open(mys3file, 'r').readlines()
lines[-1] = lines[-1].rstrip()
fields = [field.replace('"', '') for field in lines[-1].split('|') if field]
lines[-1] = '|'.join(fields)
# now write the modified list back out to the file
open(mys3file, 'w').writelines(lines)
Run Code Online (Sandbox Code Playgroud)
代码:
import logging
import datetime
from datetime import timedelta
import pprint
from airflow.models import DAG …Run Code Online (Sandbox Code Playgroud)