有人可以举个例子来理解这个吗?
执行查询后,MySQLCursorBuffered游标从服务器获取整个结果集并缓冲行.对于使用缓冲游标执行的查询,诸如fetchone()之类的行获取方法从缓冲行集中返回行.对于非缓冲游标,在调用行读取方法之前,不会从服务器获取行.在这种情况下,您必须确保在执行同一连接上的任何其他语句之前获取结果集的所有行,否则将引发InternalError(未读结果)异常.
谢谢
我需要
1. run a select query on MYSQL DB and fetch the records.
2. Records are processed by python script.
Run Code Online (Sandbox Code Playgroud)
我不确定我应该怎么做.xcom是走这条路的吗?此外,MYSQLOperator只执行查询,不获取记录.我可以使用内置传输操作符吗?我如何在这里使用MYSQL钩子?
你可能想要使用一个PythonOperator,它使用钩子来获取数据,应用转换并将(现在得分的)行送回其他地方.
有人可以解释如何进行相同的操作.
请参阅 - http://markmail.org/message/x6nfeo6zhjfeakfe
def do_work():
mysqlserver = MySqlHook(connection_id)
sql = "SELECT * from table where col > 100 "
row_count = mysqlserver.get_records(sql, schema='testdb')
print row_count[0][0]
callMYSQLHook = PythonOperator(
task_id='fetch_from_testdb',
python_callable=mysqlHook,
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
这是正确的方法吗?另外我们如何使用xcoms存储以下MySqlOperator的记录?
t = MySqlOperator(
conn_id='mysql_default',
task_id='basic_mysql',
sql="SELECT count(*) from table1 where id > 10",
dag=dag)
Run Code Online (Sandbox Code Playgroud) def mysql_operator_test():
DEFAULT_DATE = datetime(2017, 10, 9)
t = MySqlOperator(
task_id='basic_mysql',
sql="SELECT count(*) from table 1 where id>100;",
mysql_conn_id='mysql_default',
dag=dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=False)
run_this = PythonOperator(
task_id='getRecoReq',
python_callable=mysql_operator_test,
# xcom_push=True,
dag=dag)
task2 = PythonOperator(
task_id= 'mysql_select',
provide_context=True,
python_callable = blah,
templates_dict = {'requests': "{{ ti.xcom_pull(task_ids='getReq') }}" },
dag=dag)
run_this.set_downstream(task2)
Run Code Online (Sandbox Code Playgroud)
我想使用xcoms捕获MySqlOperator返回的计数.有人可以指导一下吗?