gpk*_*k27 12 mysql python-3.x airflow apache-airflow
我需要
1. run a select query on MYSQL DB and fetch the records.
2. Records are processed by python script.
Run Code Online (Sandbox Code Playgroud)
我不确定我应该怎么做.xcom是走这条路的吗?此外,MYSQLOperator只执行查询,不获取记录.我可以使用内置传输操作符吗?我如何在这里使用MYSQL钩子?
你可能想要使用一个PythonOperator,它使用钩子来获取数据,应用转换并将(现在得分的)行送回其他地方.
有人可以解释如何进行相同的操作.
请参阅 - http://markmail.org/message/x6nfeo6zhjfeakfe
def do_work():
mysqlserver = MySqlHook(connection_id)
sql = "SELECT * from table where col > 100 "
row_count = mysqlserver.get_records(sql, schema='testdb')
print row_count[0][0]
callMYSQLHook = PythonOperator(
task_id='fetch_from_testdb',
python_callable=mysqlHook,
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
这是正确的方法吗?另外我们如何使用xcoms存储以下MySqlOperator的记录?
t = MySqlOperator(
conn_id='mysql_default',
task_id='basic_mysql',
sql="SELECT count(*) from table1 where id > 10",
dag=dag)
Run Code Online (Sandbox Code Playgroud)
在过去的 90 分钟里,我真的一直在为此苦苦挣扎,这里有一个适合新手的更具声明性的方法:
from airflow.hooks.mysql_hook import MySqlHook
def fetch_records():
request = "SELECT * FROM your_table"
mysql_hook = MySqlHook(mysql_conn_id = 'the_connection_name_sourced_from_the_ui', schema = 'specific_db')
connection = mysql_hook.get_conn()
cursor = connection.cursor()
cursor.execute(request)
sources = cursor.fetchall()
print(sources)
...your DAG() as dag: code
task = PythonOperator(
task_id = 'fetch_records',
python_callable = fetch_records
)
Run Code Online (Sandbox Code Playgroud)
这会将数据库查询的内容返回到日志。
我希望这对其他人有用。
当然,只需创建一个钩子或运算符并调用 get_records() 方法:https ://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/hooks/dbapi.html
| 归档时间: |
|
| 查看次数: |
7279 次 |
| 最近记录: |