当我在 Airflow 1.10 版中使用KubernetesPodOperator运行docker 镜像时
一旦 pod 成功完成任务,airflow 会尝试通过 k8s 流客户端连接到 pod 来获取 xcom 值。
以下是我遇到的错误:
[2018-12-18 05:29:02,209] {{models.py:1760}} ERROR - (0)
Reason: Handshake status 403 Forbidden
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/kubernetes/stream/ws_client.py", line 249, in websocket_call
client = WSClient(configuration, get_websocket_url(url), headers)
File "/usr/local/lib/python3.6/site-packages/kubernetes/stream/ws_client.py", line 72, in __init__
self.sock.connect(url, header=header)
File "/usr/local/lib/python3.6/site-packages/websocket/_core.py", line 223, in connect
self.handshake_response = handshake(self.sock, *addrs, **options)
File "/usr/local/lib/python3.6/site-packages/websocket/_handshake.py", line 79, in handshake
status, resp = _get_resp_headers(sock)
File "/usr/local/lib/python3.6/site-packages/websocket/_handshake.py", line 152, …
Run Code Online (Sandbox Code Playgroud) kubernetes airflow kubernetes-python-client apache-airflow-xcom
我们正在尝试运行一个包含2个任务的简单DAG,这些任务将通过xcom进行数据通信.
DAG文件:
from __future__ import print_function
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}
dag = DAG(
'example_xcom',
schedule_interval="@once",
default_args=args)
value_1 = [1, 2, 3]
def push(**kwargs):
# pushes an XCom without a specific target
kwargs['ti'].xcom_push(key='value from pusher 1', value=value_1)
def puller(**kwargs):
ti = kwargs['ti']
v1 = ti.xcom_pull(key=None, task_ids='push')
assert v1 == value_1
v1 = ti.xcom_pull(key=None, task_ids=['push'])
assert (v1) == (value_1)
push1 = PythonOperator(
task_id='push', dag=dag, …
Run Code Online (Sandbox Code Playgroud) 我试图在 subdag 创建时访问来自父 dag 的一些 xcom 数据,我正在寻找在互联网上实现这一目标,但我没有找到任何东西。
def test(task_id):
logging.info(f' execution of task {task_id}')
def load_subdag(parent_dag_id, child_dag_id, args):
dag_subdag = DAG(
dag_id='{0}.{1}'.format(parent_dag_id, child_dag_id),
default_args=args,
schedule_interval="@daily",
)
with dag_subdag:
r = DummyOperator(task_id='random')
for i in range(r.xcom_pull(task_ids='take_Ana', key='the_message', dag_id=parent_dag_id)):
t = PythonOperator(
task_id='load_subdag_{0}'.format(i),
default_args=args,
python_callable=print_context,
op_kwargs={'task_id': 'load_subdag_{0}'.format(i)},
dag=dag_subdag,
)
return dag_subdag
load_tasks = SubDagOperator(
task_id='load_tasks',
subdag=load_subdag(dag.dag_id,
'load_tasks', args),
default_args=args,
)
Run Code Online (Sandbox Code Playgroud)
我的代码出现此错误
1 | Traceback (most recent call last):
airflow_1 | File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 374, in process_file
airflow_1 | m = imp.load_source(mod_name, filepath) …
Run Code Online (Sandbox Code Playgroud) 我的 dag 需要大约 50 秒来解析,我只使用外部触发器来启动 dag 运行,没有时间表。我注意到气流想要大量填充 dagbag --> 在每个 trigger_dag 命令和在后台,它不断检查 dags 文件夹并在部署新的 .py 后似乎立即创建 .pyc 文件。
无论如何我可以部署我的集群并填充一次 dags!然后在接下来的 2 周内,在任何 trigger_dag 上立即开始 dagruns(现在需要 50 秒才能在开始之前填充 dagbag)。我不需要在 2 周内更新 dag 定义。
是否可以仅在发生特定事件(例如将文件放入特定 S3 存储桶的事件)时运行气流任务。类似于 AWS Lambda 事件的内容
有,S3KeySensor
但我不知道它是否符合我的要求(仅在事件发生时运行 Task)
这是使问题更清楚的示例:
我有一个传感器对象如下
sensor = S3KeySensor(
task_id='run_on_every_file_drop',
bucket_key='file-to-watch-*',
wildcard_match=True,
bucket_name='my-sensor-bucket',
timeout=18*60*60,
poke_interval=120,
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
使用上述传感器对象,传感器任务的气流行为如下:
my-sensor-bucket
切换 DAG 之前已经存在与 S3 存储桶中的通配符匹配的对象名称ON
,则运行该任务(由于过去的 s3 对象的存在,我不想运行该任务)my-sensor-bucket
)我试图了解气流中的任务是否只能基于调度(如 cron 作业)或传感器(仅基于传感标准一次)运行,还是不能像基于事件的管道(类似于 AWS Lambda 的东西)那样设置
amazon-s3 airflow airflow-scheduler apache-airflow-xcom airflow-operator
在 Airflow UI 中,“浏览器 > 日志”下可用的日志事件之一是事件“触发器”以及负责触发此事件的 DAG ID 和所有者/用户。这些信息是否可以通过编程轻松获取?
用例是,我有一个 DAG,它允许一部分用户手动触发执行。根据触发此 DAG 执行的用户,从此 DAG 执行代码的行为会有所不同。
先感谢您。
airflow airflow-scheduler apache-airflow-xcom airflow-operator
在运行使用
docker镜像运行 jar 的 DAG 时,给出了xcom_push=True,它会在单个 pod 中创建另一个容器以及docker镜像。
达格:
jar_task = KubernetesPodOperator(
namespace='test',
image="path to image",
image_pull_secrets="secret",
image_pull_policy="Always",
node_selectors={"d-type":"na-node-group"},
cmds=["sh","-c",..~running jar here~..],
secrets=[secret_file],
env_vars=environment_vars,
labels={"k8s-app": "airflow"},
name="airflow-pod",
config_file=k8s_config_file,
resources=pod.Resources(request_cpu=0.2,limit_cpu=0.5,request_memory='512Mi',limit_memory='1536Mi'),
in_cluster=False,
task_id="run_jar",
is_delete_operator_pod=True,
get_logs=True,
xcom_push=True,
dag=dag)
Run Code Online (Sandbox Code Playgroud)
下面是JAR执行成功时的错误..
[2018-11-27 11:37:21,605] {{logging_mixin.py:95}} INFO - [2018-11-27 11:37:21,605] {{pod_launcher.py:166}} INFO - Running command... cat /airflow/xcom/return.json
[2018-11-27 11:37:21,605] {{logging_mixin.py:95}} INFO -
[2018-11-27 11:37:21,647] {{logging_mixin.py:95}} INFO - [2018-11-27 11:37:21,646] {{pod_launcher.py:173}} INFO - cat: can't open '/airflow/xcom/return.json': No such file or directory
[2018-11-27 …
Run Code Online (Sandbox Code Playgroud) 我正在创建一个 Airflow @daily DAG,它有一个get_daily_data
BigQueryGetDataOperator的上游任务,它根据 execution_date 和下游依赖任务(PythonOperator)获取数据,通过 xcom_pull 使用以上基于日期的数据。当我运行气流回填命令时,process_data_from_bq
我正在执行 xcom_pull的下游任务,它只获取最近的数据,而不是下游任务期望的同一执行日期的数据。Airfow 文档说如果我们传递如果 xcom_pull 为 task_ids 传递单个字符串,则返回来自该任务的最新 XCom 值
但是它没有说明如何获取 DAG 执行的同一实例的数据。
我经历了一个相同的问题如何在同一个 DAG 运行中从其他任务实例中提取 xcom 值(不是最近的一个)?然而,那里给出的一个解决方案是我已经在做的。但似乎它不是正确的答案。
DAG 定义:
dag = DAG(
'daily_motor',
default_args=default_args,
schedule_interval='@daily'
)
#This task creates data in a BigQuery table based on execution date
extract_daily_data = BigQueryOperator(
task_id='daily_data_extract',
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
allow_large_results=True,
sql=policy_by_transaction_date_sql('{{ ds }}'),
destination_dataset_table='Test.daily_data_tmp',
dag=dag)
get_daily_data = BigQueryGetDataOperator(
task_id='get_daily_data',
dataset_id='Test',
table_id='daily_data_tmp',
max_results='10000',
dag=dag
)
#This is where I need to pull …
Run Code Online (Sandbox Code Playgroud) 我正在BaseSensorOperator
Airflow 中工作。我有一个用例,我希望BaseSensorOperator.poke(context)
函数将一些信息传递给poke
. 我尝试使用 Xcom 如下(通过无意义值的模拟案例):
def poke(self, context):
task_instance = context['task_instance']
old_value = task_instance.xcom_pull(key='passing_this_value')
if old_value:
logging.info(f'retrieved from Xcom {old_value}')
else:
logging.info('no value was retrieved')
new_value = datetime.now()
logging.info(f'sending this value to Xcom {new_value}')
task_instance.xcom_push(key='passing_this_value', value=new_value)
if new_value.minute % 10 == 0:
return True
else:
return False
Run Code Online (Sandbox Code Playgroud)
这在创建传感器任务时效果很好,method='poke'
但method='reschedule'
由于在重新调度时为该运行中的任务清除了 Xcom ,因此失败。
有办法解决吗?我可以使用,Variable
但这会极大地破坏可变空间。还有什么建议吗?
我的 DAG(我们称之为 DAG_A)使用trigger_dagrun
操作符启动另一个 DAG (DAG_B) 。DAG_B 的任务使用 XCOM,我想在完成后从 DAG_B 运行的任务之一(正是我开始的任务)中获取 XCOM 值。
使用 XCOM 不是硬性要求 - 基本上 Airflow 本身提供的任何(合理的)机制都可以工作。如果需要,我可以更改 DAG_B。
找不到此类案例的任何示例,因此感谢您的帮助。
计划 B 是让 DAG_B 将 XCOM 值与一些运行 ID 一起保存到一些持久性存储(如 DB 或文件)中,DAG_A 将从那里获取它。但如果有一些内置机制可用,我想避免这种复杂化。