小编Yar*_*diy的帖子

如何在气流中使用PythonVirtualenvOperator?

基本上我正在使用气流并开发了一项从外部源下载文件的任务。

t1 = PythonOperator(
        task_id='download',
        python_callable=download,
        provide_context=True,
        dag=dag)
Run Code Online (Sandbox Code Playgroud)

该气流正在虚拟环境(pipenv)中运行。

下载功能为:

def download(**kwargs):
   folder_id = 'xxxxxx-xxxx-xxxx-xxxxxx'
   file_name = download_file(folder_id)
   return file_name
Run Code Online (Sandbox Code Playgroud)

所以基本上我使用 Xcons 将数据从一个任务传递到另一个任务...并且使用此配置不可能管理每个 DAG 的所有依赖项...

在文档中,我发现这个类名为“PythonVirtualenvOperator”,因此为了实现我编写的:

t1 = PythonVirtualenvOperator(
        task_id='download',
        python_callable=download,
        requirements=['requests'],
        python_version='3.8',
        provide_context=True,
        dag=dag
    )
Run Code Online (Sandbox Code Playgroud)

它给了我以下错误:

TypeError: can't pickle module objects
Run Code Online (Sandbox Code Playgroud)

download_file 函数是另一个文件中的 API 连接。

有什么建议我如何管理环境并在任务之间建立联系吗?

python python-3.x airflow

6
推荐指数
2
解决办法
2万
查看次数

XCOM 不适用于 PythonVirtualenvOperator 气流 1.10.6

我在气流项目中使用 PythonVirtualenvOperator,我需要将参数从一个任务传递到另一个任务,以测试我使用此示例的xcom ,它可以工作,但是当我将 pythonOperator 更改为 PythonVirtualenvOperator 时,它有一个问题。

代码:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator, PythonVirtualenvOperator

args = {
    'owner': 'Airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'provide_context': True,

}

dag = DAG('BAtatas', schedule_interval="@once", default_args=args)

value_1 = [1, 2, 3]
value_2 = {'a': 'b'}


def push(**kwargs):
    """Pushes an XCom without a specific target"""
    print(kwargs)
    kwargs['ti'].xcom_push(key='value from pusher 1', value=value_1)


def push_by_returning(**kwargs):
    """Pushes an XCom without a specific target, just by returning it"""
    return value_2


def puller(**kwargs):
    """Pull all previously pushed XComs …
Run Code Online (Sandbox Code Playgroud)

python-3.x airflow

2
推荐指数
1
解决办法
3603
查看次数

标签 统计

airflow ×2

python-3.x ×2

python ×1