例如:
我们有一个带有2列['A','B']的Pandas dataFrame foo。
我想做类似的功能
foo.set_index([0,1])
而不是
foo.set_index(['A', 'B'])
也尝试过foo.set_index([[0,.1]]),但出现此错误:
长度不匹配:预期轴有9个元素,新值有2个元素
我想将气流变量传递到 SQL 查询模板文件,如下所示(在 sql/test.sql 文件中):
select 'test', '{{ params.test_ds }}', '{{ test_dt }}' from test_table;
Run Code Online (Sandbox Code Playgroud)
我创建了一个继承自 PostgresOperator 的 Operator:
class EtlOperator(PostgresOperator):
template_fields = ('sql', 'test_dt', 'params')
template_ext = PostgresOperator.template_ext
@apply_defaults
def __init__(self, test_dt, params, *args, **kwargs):
super(EtlRunIdOperator, self).__init__(*args, **kwargs)
self.test_dt = test_dt
self.params = params
def execute(self, context):
super(EtlRunIdOperator, self).execute(context)
Run Code Online (Sandbox Code Playgroud)
我创建了这个任务:
test_task00 = EtlOperator(
task_id=f'test_task00',
postgres_conn_id='redshift',
sql='sql/test.sql',
params={
'test_ds': '{{ ds }}'
},
database='default',
test_dt='{{ execution_date }}',
provide_context=True, # tried without it too
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
然而,无论 params 或 test_dt 是 …