基本上我正在使用气流并开发了一项从外部源下载文件的任务。
t1 = PythonOperator(
task_id='download',
python_callable=download,
provide_context=True,
dag=dag)
Run Code Online (Sandbox Code Playgroud)
该气流正在虚拟环境(pipenv)中运行。
下载功能为:
def download(**kwargs):
folder_id = 'xxxxxx-xxxx-xxxx-xxxxxx'
file_name = download_file(folder_id)
return file_name
Run Code Online (Sandbox Code Playgroud)
所以基本上我使用 Xcons 将数据从一个任务传递到另一个任务...并且使用此配置不可能管理每个 DAG 的所有依赖项...
在文档中,我发现这个类名为“PythonVirtualenvOperator”,因此为了实现我编写的:
t1 = PythonVirtualenvOperator(
task_id='download',
python_callable=download,
requirements=['requests'],
python_version='3.8',
provide_context=True,
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
它给了我以下错误:
TypeError: can't pickle module objects
Run Code Online (Sandbox Code Playgroud)
download_file 函数是另一个文件中的 API 连接。
有什么建议我如何管理环境并在任务之间建立联系吗?