如何在气流中使用PythonVirtualenvOperator?

Yar*_*diy 6 python python-3.x airflow

基本上我正在使用气流并开发了一项从外部源下载文件的任务。

t1 = PythonOperator(
        task_id='download',
        python_callable=download,
        provide_context=True,
        dag=dag)
Run Code Online (Sandbox Code Playgroud)

该气流正在虚拟环境(pipenv)中运行。

下载功能为:

def download(**kwargs):
   folder_id = 'xxxxxx-xxxx-xxxx-xxxxxx'
   file_name = download_file(folder_id)
   return file_name
Run Code Online (Sandbox Code Playgroud)

所以基本上我使用 Xcons 将数据从一个任务传递到另一个任务...并且使用此配置不可能管理每个 DAG 的所有依赖项...

在文档中,我发现这个类名为“PythonVirtualenvOperator”,因此为了实现我编写的:

t1 = PythonVirtualenvOperator(
        task_id='download',
        python_callable=download,
        requirements=['requests'],
        python_version='3.8',
        provide_context=True,
        dag=dag
    )
Run Code Online (Sandbox Code Playgroud)

它给了我以下错误:

TypeError: can't pickle module objects
Run Code Online (Sandbox Code Playgroud)

download_file 函数是另一个文件中的 API 连接。

有什么建议我如何管理环境并在任务之间建立联系吗?

Mat*_*ill 5

问题是

提供_上下文=真,

Airflow 无法对上下文进行 pickle,因为其中包含所有不可序列化的内容。如果您只需要简单的东西,您可以使用模板并op_kwargs解决这个问题,例如execution_ts

t1 = PythonVirtualenvOperator(
        task_id='download',
        python_callable=download,
        provide_context=False,
        op_kwargs={
          execution_date_str: '{{ execution_date }}',
        },
        dag=dag)
Run Code Online (Sandbox Code Playgroud)

当然,您需要更新可调用的参数。我没有比这更深入,因为它适合我的用例。


brk*_*rki 2

根据PythonVirtualenvOperator的定义:

The function must be defined using def, and not be
part of a class. All imports must happen inside the function
and no variables outside of the scope may be referenced.
Run Code Online (Sandbox Code Playgroud)

我猜测在您的函数中调用的代码链中的某个位置download,有一个使用顶级导入从另一个文件导入的方法。也许将导入转移到您的download函数中就足够了?