我有几个python文件,我目前正在使用BashOperator执行.这使我可以灵活地轻松选择python虚拟环境.
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
...}
dag = DAG('python_tasks', default_args=default_args, schedule_interval="23 4 * * *")
t1 = BashOperator(
task_id='task1',
bash_command='~/anaconda3/envs/myenv/bin/python
/python_files/python_task1.py',
dag=dag)
Run Code Online (Sandbox Code Playgroud)
如何使用PythonOperator实现相同的目标呢?
from airflow.operators.bash_operator import PythonOperator
import python_files.python_task1
python_task = PythonOperator(
task_id='python_task',
python_callable=python_task1.main,
dag=dag)
Run Code Online (Sandbox Code Playgroud)
我假设PythonOperator将使用系统python环境.我发现Airflow有PythonVirtualenvOperator,但这似乎可以通过使用指定的要求动态创建新的虚拟环境来实现.我更喜欢使用已经正确配置的现有服务器.如何使用指定的python路径运行PythonOperator?