我正在使用一个主要的dag(main_dag),其中包含许多子subdag,并且每个子subdag具有许多任务。我从subdagA taskA推送了一个xcom,但是我在subdagB taskB中提取了该xcom。由于xcom_pull()中的dag_id参数默认为self.dag_id,因此我无法提取必要的xcom。我想知道如何做到这一点和/或是否有更好的方法来设置这种情况,这样我就不必处理这个问题。
我目前在subdagB中所做的示例:
def subdagB(parent_dag, child_dag, start_date, schedule_interval):
subdagB = DAG('%s.%s' % (parent_dag, child_dag), start_date=start_date, schedule_interval=schedule_interval)
start = DummyOperator(
task_id='taskA',
dag=subdagB)
tag_db_template = '''echo {{ task_instance.xcom_pull(dag_id='dag.main_dag.subdagA', task_ids='taskA') }};'''
t1 = BashOperator(
task_id='taskB',
bash_command=tag_db_template,
xcom_push=True,
dag=subdagB)
end = DummyOperator(
task_id='taskC',
dag=subdagB)
t0.set_upstream(start)
t1.set_upstream(t0)
end.set_upstream(t1)
return subdagB
Run Code Online (Sandbox Code Playgroud)
预先感谢您的任何帮助!
我有一个来自使用 Pandas 的导入 csv 的数据框。这个数据框有 160 个变量,我只想保留 5、9、10、46、89。
我试试这个:
dataf2 = dataf[[5] + [9] + [10] + [46] + [89]]
Run Code Online (Sandbox Code Playgroud)
但我接受这个错误:
KeyError: '[ 5 9 10 46 89] not in index'
Run Code Online (Sandbox Code Playgroud)