使用XCom在类之间交换数据?

Aar*_*ron 4 python airflow apache-airflow

我有以下DAG,它使用专用于数据预处理例程的类来执行不同的方法:

from datetime import datetime
import os
import sys

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

import ds_dependencies

SCRIPT_PATH = os.getenv('MARKETING_PREPROC_PATH')

if SCRIPT_PATH:
    sys.path.insert(0, SCRIPT_PATH)
    from table_builder import OnlineOfflinePreprocess
else:
    print('Define MARKETING_PREPROC_PATH value in environmental variables')
    sys.exit(1)

default_args = {
  'start_date': datetime.now(),
  'max_active_runs': 1,
  'concurrency': 4
}

worker = OnlineOfflinePreprocess()

DAG = DAG(
  dag_id='marketing_data_preproc',
  default_args=default_args,
  start_date=datetime.today()
)

import_online_data = PythonOperator(
  task_id='import_online_data',
  python_callable=worker.import_online_data,
  dag=DAG)

import_offline_data = PythonOperator(
  task_id='import_offline_data',
  python_callable=worker.import_offline_data,
  dag=DAG)

merge_aurum_to_sherlock = PythonOperator(
  task_id='merge_aurum_to_sherlock',
  python_callable=worker.merge_aurum_to_sherlock,
  dag=DAG)

merge_sherlock_to_aurum = PythonOperator(
   task_id='merge_sherlock_to_aurum',
   python_callable=worker.merge_sherlock_to_aurum,
   dag=DAG)

upload_au_to_sh = PythonOperator(
  task_id='upload_au_to_sh',
  python_callable=worker.upload_table,
  op_args='aurum_to_sherlock',
  dag=DAG)

upload_sh_to_au = PythonOperator(
  task_id='upload_sh_to_au',
  python_callable=worker.upload_table,
  op_args='sherlock_to_aurum',
  dag=DAG)

import_online_data >> merge_aurum_to_sherlock
import_offline_data >> merge_aurum_to_sherlock

merge_aurum_to_sherlock >> merge_sherlock_to_aurum
merge_aurum_to_sherlock >> upload_au_to_sh
merge_sherlock_to_aurum >> upload_sh_to_au
Run Code Online (Sandbox Code Playgroud)

这会产生以下错误:

[2017-09-07 19:32:09,587] {base_task_runner.py:97} INFO - Subtask: AttributeError: 'OnlineOfflinePreprocess' object has no attribute 'online_info'
Run Code Online (Sandbox Code Playgroud)

考虑到气流如何工作,这实际上非常明显:来自调用的不同类方法的输出不会存储到在图形顶部初始化的全局类对象.

我可以解决这个问题XCom吗?总的来说,如何将OOP与Airflow的连贯性融合在一起的想法是什么?

gni*_*las 7

关于气流的OOP和气流状态的更多问题不是问题.

需要在任务之间传递的任何状态都需要持久存储.这是因为每个气流任务都是一个独立的过程(甚至可以在不同的机器上运行!),因此无法进行内存中的通信.

你是对的,你可以使用XCOM来传递这种状态(如果它很小,因为它存储在气流数据库中).如果它很大,您可能希望将其存储在其他地方,可能是文件系统或S3或HDFS或专用数据库.