小编Ash*_*mar的帖子

如何在日志中查看MySqlHook结果

我正在使用MySqlHook与airflow_db建立连接,并且正在执行一些查询,但我需要在某处查看查询结果(比如说日志),我怎样才能看到?

这是示例代码

t1 = MySqlOperator(
    task_id='basic_mysql',
    mysql_conn_id='airflow_db',
    sql="select * from xcom",
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

python-3.x airflow

5
推荐指数
1
解决办法
9837
查看次数

如何将气流 DataFlowPythonOperator 用于梁管道?

在使用 DataFlowPythonOperator 之前,我使用了气流的 BashOperator。它工作正常。我的光束管道需要一个特定的参数,这是我在 BashOperator 中使用的命令。

仅供参考 - 此光束管道用于将 CSV 文件转换为镶木地板。

python /home/airflow/gcs/pyFile.py --runner DataflowRunner --project my-project --jobname my-job--num-workers 3 --temp_location gs://path/Temp/ --staging_location gs://path/Staging/ --input gs://path/*.txt --odir gs://path/output --ofile current
Run Code Online (Sandbox Code Playgroud)

这些是我必须传递的必需参数,以使我的光束管道正常工作。

现在如何在DataFlowPythonOperator 中传递这些参数?

我试过了,但我不知道我应该在哪里提到所有参数。我试过这样的事情:

    task1 = DataFlowPythonOperator(
    task_id = 'my_task',
    py_file = '/home/airflow/gcs/pyfile.py',
    gcp_conn_id='google_cloud_default',
    options={
        "num-workers" : 3,
        "input" : 'gs://path/*.txt',
        "odir" : 'gs://path/',
        "ofile" : 'current',
        "jobname" : 'my-job'
    },
    dataflow_default_options={
        "project": 'my-project',
        "staging_location": 'gs://path/Staging/',
        "temp_location": 'gs://path/Temp/',    
  },
  dag=dag
)
Run Code Online (Sandbox Code Playgroud)

使用当前的脚本(虽然我不确定它的格式是否正确),这是我在日志中得到的:

    [2020-03-06 05:08:48,070] {base_task_runner.py:115} INFO - Job …
Run Code Online (Sandbox Code Playgroud)

google-cloud-platform airflow google-cloud-composer

5
推荐指数
1
解决办法
1006
查看次数

有没有办法在 DAG 中找到任何随机任务所花费的时间?

我需要通过DAG 中的任何特定任务查找时间量例如:

SampleTask=DummyOperator(
    task_id='SampleTask',
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

现在我想知道上述任务所花费的时间是如何基于其 task_id 的,其他任务也是如此。

python directed-acyclic-graphs google-cloud-platform airflow

2
推荐指数
1
解决办法
1320
查看次数