Apache airflow:使用sqlite时不能使用超过1个线程.将max_threads设置为1

Har*_*tia 8 sqlite postgresql multithreading apache-airflow

我已经设置了postgresql数据库的气流,我正在创建多个dags

def subdag(parent_dag_name, child_dag_name,currentDate,batchId,category,subCategory,yearMonth,utilityType,REALTIME_HOME, args):
dag_subdag = DAG(
    dag_id='%s.%s' % (parent_dag_name, child_dag_name),
    default_args=args,
    schedule_interval="@once",
)
# get site list to run bs reports
site_list = getSiteListforProcessing(category,subCategory,utilityType,yearMonth);
print (site_list)
def update_status(siteId,**kwargs):
    createdDate=getCurrentTimestamp();
    print ('N',siteId,batchId,yearMonth,utilityType,'N')
    updateJobStatusLog('N',siteId,batchId,yearMonth,utilityType,'P')

def error_status(siteId,**kwargs):
    createdDate=getCurrentTimestamp();
    print ('N',siteId,batchId,yearMonth,utilityType,'N')

BS_template = """
echo "{{ params.date }}"
java -cp xx.jar com.xGenerator {{params.siteId}} {{params.utilityType}} {{params.date}}
"""

for index,siteid in enumerate(site_list):
    t1 = BashOperator(
            task_id='%s-task-%s' % (child_dag_name, index + 1),
            bash_command=BS_template,
            params={'date':  currentDate, 'realtime_home': REALTIME_HOME,'siteId': siteid, "utilityType":utilityType},
            default_args=args,
            dag=dag_subdag)
    t2 = PythonOperator(
            task_id='%s-updatetask-%s' % (child_dag_name, index + 1),
            dag=dag_subdag,
            python_callable=update_status,
            op_kwargs={'siteId':siteid})

    t2.set_upstream(t1)
return dag_subdag
Run Code Online (Sandbox Code Playgroud)

它创建动态任务,但是在所有动态任务中,它始终会失败,并将错误记录为:"使用sqlite时不能使用多个线程.将max_threads设置为1"例如:如果创建4个任务3次运行,并且如果创建2个任务1次运行.