pyflink(flink) 1.12.0 当表通过 to_append_stream 转换为数据流时出现错误(java api 是:toAppendStream)

SZ_*_*ong 6 apache-flink flink-streaming pyflink

非常感谢您的帮助!

\n

代码:

\n
from pyflink.common.typeinfo import RowTypeInfo, Types, BasicTypeInfo, TupleTypeInfo\nfrom pyflink.table import EnvironmentSettings, StreamTableEnvironment\n\n# stream \xe6\xa8\xa1\xe5\xbc\x8f\xe7\x9a\x84env\xe5\x88\x9b\xe5\xbb\xba\nenv_settings_stream = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()\nenv_stream = StreamTableEnvironment.create(environment_settings=env_settings_stream)\n\ntable1 = env_stream.from_elements([(1, 23.4, \'lili\'), (2, 33.4, \'er\'), (3, 45.6, \'yu\')], [\'id\', \'order_amt\', \'name\'])\ntable2 = env_stream.from_elements([(1, 43.4, \'xixi\'), (2, 53.4, \'rr\'), (3, 65.6, \'ww\')], [\'id2\', \'order_amt2\', \'name\'])\n\n# types: List[TypeInformation], field_names: List[str]\n# row_type_info = RowTypeInfo([BasicTypeInfo.STRING_TYPE_INFO(), BasicTypeInfo.FLOAT_TYPE_INFO(), BasicTypeInfo.STRING_TYPE_INFO()], [\'id\', \'order_amt\', \'name\'])\nrow_type_info = TupleTypeInfo([BasicTypeInfo.STRING_TYPE_INFO(), BasicTypeInfo.FLOAT_TYPE_INFO(), BasicTypeInfo.STRING_TYPE_INFO()])\n\n\nstream = env_stream.to_append_stream(table1, row_type_info)\n
Run Code Online (Sandbox Code Playgroud)\n

错误信息:

\n
Traceback (most recent call last):\n  File "/Users/hulc/anaconda3/envs/myenv_3_6/lib/python3.6/site-packages/pyflink/util/exceptions.py", line 147, in deco\n    return f(*a, **kw)\n  File "/Users/hulc/anaconda3/envs/myenv_3_6/lib/python3.6/site-packages/py4j/protocol.py", line 332, in get_return_value\n    format(target_id, ".", name, value))\npy4j.protocol.Py4JError: An error occurred while calling o4.toAppendStream. Trace:\norg.apache.flink.api.python.shaded.py4j.Py4JException: Method toAppendStream([class org.apache.flink.table.api.internal.TableImpl, class org.apache.flink.api.java.typeutils.TupleTypeInfo]) does not exist\n    at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)\n    at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)\n    at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:274)\n    at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n    at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)\n    at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)\n    at java.lang.Thread.run(Thread.java:748)\n
Run Code Online (Sandbox Code Playgroud)\n

环境:

\n
    \n
  1. apache-flink 1.12.0(python flink)
  2. \n
  3. py4j 0.10.8.1(当pip3安装apache-flink时,py4j将自动安装依赖)
  4. \n
  5. 蟒蛇3.7(蟒蛇)
  6. \n
  7. pycharm 2020.1.1版本
  8. \n
  9. 苹果操作系统 11.1
  10. \n
\n

调试信息图1:

\n

调试信息图1

\n

调试信息图2:

\n

调试信息图2

\n

复制步骤:

\n
    \n
  1. 相同环境,本地运行代码(本地模式)
  2. \n
  3. 代码行处的断点:“stream = env_stream.to_append_stream(table1, row_type_info)”
  4. \n
  5. debug运行,断点会被触发两次,第一次没有找到toAppendStream方法,第二次找到toAppendStream方法。但第一次提出例外。
  6. \n
\n