我正在使用MySqlHook与airflow_db建立连接,并且正在执行一些查询,但我需要在某处查看查询结果(比如说日志),我怎样才能看到?
这是示例代码
t1 = MySqlOperator(
task_id='basic_mysql',
mysql_conn_id='airflow_db',
sql="select * from xcom",
dag=dag)
Run Code Online (Sandbox Code Playgroud) 在使用 DataFlowPythonOperator 之前,我使用了气流的 BashOperator。它工作正常。我的光束管道需要一个特定的参数,这是我在 BashOperator 中使用的命令。
仅供参考 - 此光束管道用于将 CSV 文件转换为镶木地板。
python /home/airflow/gcs/pyFile.py --runner DataflowRunner --project my-project --jobname my-job--num-workers 3 --temp_location gs://path/Temp/ --staging_location gs://path/Staging/ --input gs://path/*.txt --odir gs://path/output --ofile current
Run Code Online (Sandbox Code Playgroud)
这些是我必须传递的必需参数,以使我的光束管道正常工作。
现在如何在DataFlowPythonOperator 中传递这些参数?
我试过了,但我不知道我应该在哪里提到所有参数。我试过这样的事情:
task1 = DataFlowPythonOperator(
task_id = 'my_task',
py_file = '/home/airflow/gcs/pyfile.py',
gcp_conn_id='google_cloud_default',
options={
"num-workers" : 3,
"input" : 'gs://path/*.txt',
"odir" : 'gs://path/',
"ofile" : 'current',
"jobname" : 'my-job'
},
dataflow_default_options={
"project": 'my-project',
"staging_location": 'gs://path/Staging/',
"temp_location": 'gs://path/Temp/',
},
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
使用当前的脚本(虽然我不确定它的格式是否正确),这是我在日志中得到的:
[2020-03-06 05:08:48,070] {base_task_runner.py:115} INFO - Job …Run Code Online (Sandbox Code Playgroud) 我需要通过DAG 中的任何特定任务查找时间量。
例如:
SampleTask=DummyOperator(
task_id='SampleTask',
dag=dag)
Run Code Online (Sandbox Code Playgroud)
现在我想知道上述任务所花费的时间是如何基于其 task_id 的,其他任务也是如此。
python directed-acyclic-graphs google-cloud-platform airflow