Che*_* Wu 10 python pipeline machine-learning airflow
我正在学习如何使用气流来构建机器学习管道。
但没有找到一种方法将从 1 个任务生成的 pandas 数据帧传递到另一个任务中...似乎需要将数据转换为 JSON 格式或在每个任务中将数据保存在数据库中?
最后,我必须将所有内容放入 1 个任务中...是否有办法在气流任务之间传递数据帧?
这是我的代码:
from datetime import datetime
import pandas as pd
import numpy as np
import os
import lightgbm as lgb
from sklearn.model_selection import train_test_split
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import balanced_accuracy_score
from airflow.decorators import dag, task
from airflow.operators.python_operator import PythonOperator
@dag(dag_id='super_mini_pipeline', schedule_interval=None,
start_date=datetime(2021, 11, 5), catchup=False, tags=['ml_pipeline'])
def baseline_pipeline():
def all_in_one(label):
path_to_csv = os.path.join('~/airflow/data','leaf.csv')
df = pd.read_csv(path_to_csv)
y = df[label]
X = df.drop(label, axis=1)
folds = StratifiedKFold(n_splits=5, shuffle=True, random_state=10)
lgbm = lgb.LGBMClassifier(objective='multiclass', random_state=10)
metrics_lst = []
for train_idx, val_idx in folds.split(X, y):
X_train, y_train = X.iloc[train_idx], y.iloc[train_idx]
X_val, y_val = X.iloc[val_idx], y.iloc[val_idx]
lgbm.fit(X_train, y_train)
y_pred = lgbm.predict(X_val)
cv_balanced_accuracy = balanced_accuracy_score(y_val, y_pred)
metrics_lst.append(cv_balanced_accuracy)
avg_performance = np.mean(metrics_lst)
print(f"Avg Performance: {avg_performance}")
all_in_one_task = PythonOperator(task_id='all_in_one_task', python_callable=all_in_one, op_kwargs={'label':'species'})
all_in_one_task
# dag invocation
pipeline_dag = baseline_pipeline()
Run Code Online (Sandbox Code Playgroud)
小智 14
尽管 Airflow 用于许多 ETL 任务,但它并不是此类操作的正确选择,它适用于工作流而不是数据流。但是有很多方法可以做到这一点,而无需在任务之间传递整个数据帧。
您可以使用 xcom.push 和 xcom.pull 传递有关数据的信息:
A。将第一个任务的结果保存在某处(json、csv 等)
b. 传递到 xcom.push 有关已保存文件的信息。例如文件名、路径。
C。使用 xcom.pull 从其他任务中读取此文件名并执行所需的操作。
或者:
以上所有内容都使用一些数据库表:
A。在task_1中,您可以从某个数据帧中的table_1下载数据,对其进行处理并保存在另一个table_2中(df.to_sql())。
b. 使用 xcom.push 传递表的名称。
C。使用 xcom.pull 从另一个任务获取 table_2 并使用 df.read_sql() 读取它。
有关如何使用 xcom 的信息可以从气流示例中获得。示例: https: //github.com/apache/airflow/blob/main/airflow/example_dags/tutorial_etl_dag.py
恕我直言,还有很多其他更好的方法,我刚刚写下了我尝试过的方法。
完全同意@Talgat 的观点,Airflow 并不是为此而构建的。它侧重于任务依赖性而不是数据依赖性。
也许您可以考虑像ZenML这样以数据为中心的管道解决方案来解决这个问题?它有一个指南,其中包含跨管道步骤传递 Pandas Dataframes 的示例。您还可以利用跨步骤的数据缓存和其他功能,使其更适合您正在做的事情。
最重要的是,ZenML 管道也可以作为 Airflow DAG 进行部署。因此,您可以让 ZenML 来处理它,而不是专注于自己编写工件逻辑的持久化。
免责声明:我是 ZenML 的核心贡献者之一,所以这无疑是有偏见的。仍然认为这可能对OP有帮助!
| 归档时间: |
|
| 查看次数: |
11439 次 |
| 最近记录: |