我在管道中有要执行的任务,但这些任务通常是异步的。我正在尝试使用 Airflow 运行管道,但它给了我错误。“类型错误:不能腌制协程对象”
由于函数是异步的,我想将它们包装在“asyncio.run”中,但仍然不起作用。
class Top:
async def process(self, input_data):
return [rawstr for rawstr in input_data]
class Bottom:
async def process(self, input_data):
return [len(x) for x in input_data]
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2019, 7, 25),
'retries': 1,
'provide_context': True,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('sof_dag', default_args=default_args, schedule_interval=timedelta(days=1))
async def top_1(x, **kwargs):
return asyncio.run(Top().process(x))
async def bottom_1(**kwargs):
ti = kwargs['ti']
y = ti.xcom_pull(key=None, task_ids='Router_1')
return asyncio.run((Bottom().process(y)))
t1 = PythonOperator(
task_id='task_top_1',
python_callable=top_1,
op_args=[["wow! this is great", "this is …Run Code Online (Sandbox Code Playgroud)