如何使用Airflow获取和处理mysql记录?

gpk*_*k27 12 mysql python-3.x airflow apache-airflow

我需要

1. run a select query on MYSQL DB and fetch the records.              
2. Records are processed by python script.
Run Code Online (Sandbox Code Playgroud)

我不确定我应该怎么做.xcom是走这条路的吗?此外,MYSQLOperator只执行查询,不获取记录.我可以使用内置传输操作符吗?我如何在这里使用MYSQL钩子?

你可能想要使用一个PythonOperator,它使用钩子来获取数据,应用转换并将(现在得分的)行送回其他地方.

有人可以解释如何进行相同的操作.

请参阅 - http://markmail.org/message/x6nfeo6zhjfeakfe

def do_work():
    mysqlserver = MySqlHook(connection_id)
    sql = "SELECT * from table where col > 100 "
    row_count = mysqlserver.get_records(sql, schema='testdb')
    print row_count[0][0]

callMYSQLHook = PythonOperator(
    task_id='fetch_from_testdb',
    python_callable=mysqlHook,
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)

这是正确的方法吗?另外我们如何使用xcoms存储以下MySqlOperator的记录?

t = MySqlOperator(
conn_id='mysql_default',
task_id='basic_mysql',
sql="SELECT count(*) from table1 where id > 10",
dag=dag)
Run Code Online (Sandbox Code Playgroud)

dim*_*ies 6

在过去的 90 分钟里,我真的一直在为此苦苦挣扎,这里有一个适合新手的更具声明性的方法:

from airflow.hooks.mysql_hook import MySqlHook

def fetch_records():
  request = "SELECT * FROM your_table"
  mysql_hook = MySqlHook(mysql_conn_id = 'the_connection_name_sourced_from_the_ui', schema = 'specific_db')
  connection = mysql_hook.get_conn()
  cursor = connection.cursor()
  cursor.execute(request)
  sources = cursor.fetchall()
  print(sources)

...your DAG() as dag: code

task = PythonOperator(
  task_id = 'fetch_records',
  python_callable = fetch_records
)
Run Code Online (Sandbox Code Playgroud)

这会将数据库查询的内容返回到日志。

我希望这对其他人有用。


Bre*_*the 0

当然,只需创建一个钩子或运算符并调用 get_records() 方法:https ://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/hooks/dbapi.html

  • 该链接仍然损坏(2021 年 1 月)。 (4认同)