相关疑难解决方法(0)

如何在气流中使用PythonVirtualenvOperator?

基本上我正在使用气流并开发了一项从外部源下载文件的任务。

t1 = PythonOperator(
        task_id='download',
        python_callable=download,
        provide_context=True,
        dag=dag)
Run Code Online (Sandbox Code Playgroud)

该气流正在虚拟环境(pipenv)中运行。

下载功能为:

def download(**kwargs):
   folder_id = 'xxxxxx-xxxx-xxxx-xxxxxx'
   file_name = download_file(folder_id)
   return file_name
Run Code Online (Sandbox Code Playgroud)

所以基本上我使用 Xcons 将数据从一个任务传递到另一个任务...并且使用此配置不可能管理每个 DAG 的所有依赖项...

在文档中,我发现这个类名为“PythonVirtualenvOperator”,因此为了实现我编写的:

t1 = PythonVirtualenvOperator(
        task_id='download',
        python_callable=download,
        requirements=['requests'],
        python_version='3.8',
        provide_context=True,
        dag=dag
    )
Run Code Online (Sandbox Code Playgroud)

它给了我以下错误:

TypeError: can't pickle module objects
Run Code Online (Sandbox Code Playgroud)

download_file 函数是另一个文件中的 API 连接。

有什么建议我如何管理环境并在任务之间建立联系吗?

python python-3.x airflow

6
推荐指数
2
解决办法
2万
查看次数

标签 统计

airflow ×1

python ×1

python-3.x ×1