我正在尝试使用 Apache Flink 1.11 创建一个源表,我可以在其中访问 JSON 消息中的嵌套属性。我可以从根属性中提取值,但我不确定如何访问嵌套对象。
文档建议它应该是一种MAP类型,但是当我设置它时,出现以下错误
: java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlIdentifier: MAP
Run Code Online (Sandbox Code Playgroud)
这是我的 SQL
: java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlIdentifier: MAP
Run Code Online (Sandbox Code Playgroud)
我的 JSON 看起来像这样:
CREATE TABLE input(
id VARCHAR,
title VARCHAR,
properties MAP
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'python-test',
'format' = 'json'
)
Run Code Online (Sandbox Code Playgroud) 我正在使用 pyflink 1.17.1,并且收到此错误“RuntimeError:java.lang.UnsupportedOperationException:序列化程序已为该状态注册;不允许重新注册”。需要你的帮助。当我尝试使用 JDBC Sink 接收数据时,它不起作用,它显示此错误,但当我尝试简单地打印时,它会按预期工作
这是main.py:
db_host = config.get("POSTGRES_HOST")
db_port = config.get("POSTGRES_PORT")
db_name = config.get("POSTGRES_DB")
db_url = f"jdbc:postgresql://{db_host}:{db_port}/{db_name}"
db_driver_name = "org.postgresql.Driver"
db_username = config.get("POSTGRES_USER")
db_password = config.get("POSTGRES_PASSWORD")
db_table = config.get("POSTGRES_TABLE")
db_connection = Connection(
username=db_username,
password=db_password,
host=db_host,
port=db_port,
database=db_name,
)
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars(
f'file://{abspath("connector_jars/flink-sql-connector-kafka-1.17.1.jar")}',
f'file://{abspath("connector_jars/flink-connector-jdbc-3.1.1-1.17.jar")}',
f'file://{abspath("connector_jars/postgresql-42.6.0.jar")}',
)
env.set_parallelism(8)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
kafka_source_type_info = Types.ROW_NAMED(
[
"open_time",
"open_price",
"high_price",
"low_price",
"close_price",
"volume",
"close_time",
"base_asset_value",
"number_of_trades",
"taker_buy_base_asset_volume",
"taker_buy_quote_asset_volume",
"ignore_value",
"symbol",
"x",
],
[
Types.LONG(),
Types.DOUBLE(),
Types.DOUBLE(),
Types.DOUBLE(),
Types.DOUBLE(),
Types.DOUBLE(),
Types.LONG(),
Types.DOUBLE(),
Types.INT(), …Run Code Online (Sandbox Code Playgroud) 非常感谢您的帮助!
\n代码:
\nfrom pyflink.common.typeinfo import RowTypeInfo, Types, BasicTypeInfo, TupleTypeInfo\nfrom pyflink.table import EnvironmentSettings, StreamTableEnvironment\n\n# stream \xe6\xa8\xa1\xe5\xbc\x8f\xe7\x9a\x84env\xe5\x88\x9b\xe5\xbb\xba\nenv_settings_stream = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()\nenv_stream = StreamTableEnvironment.create(environment_settings=env_settings_stream)\n\ntable1 = env_stream.from_elements([(1, 23.4, \'lili\'), (2, 33.4, \'er\'), (3, 45.6, \'yu\')], [\'id\', \'order_amt\', \'name\'])\ntable2 = env_stream.from_elements([(1, 43.4, \'xixi\'), (2, 53.4, \'rr\'), (3, 65.6, \'ww\')], [\'id2\', \'order_amt2\', \'name\'])\n\n# types: List[TypeInformation], field_names: List[str]\n# row_type_info = RowTypeInfo([BasicTypeInfo.STRING_TYPE_INFO(), BasicTypeInfo.FLOAT_TYPE_INFO(), BasicTypeInfo.STRING_TYPE_INFO()], [\'id\', \'order_amt\', \'name\'])\nrow_type_info = TupleTypeInfo([BasicTypeInfo.STRING_TYPE_INFO(), BasicTypeInfo.FLOAT_TYPE_INFO(), BasicTypeInfo.STRING_TYPE_INFO()])\n\n\nstream = env_stream.to_append_stream(table1, row_type_info)\nRun Code Online (Sandbox Code Playgroud)\n错误信息:
\nTraceback (most recent call last):\n File "/Users/hulc/anaconda3/envs/myenv_3_6/lib/python3.6/site-packages/pyflink/util/exceptions.py", line 147, in deco\n …Run Code Online (Sandbox Code Playgroud) 目前,我有一个正在运行的 Flink Kubernetes 会话集群(Flink 版本 1.13.2),我还可以通过以下方式访问 Web UI ,我可以通过此命令从本地环境port-forward提交 WordCount jar 示例。./bin/flink run -m localhost:8081 examples/batch/WordCount.jar
但是,当我尝试通过命令提交 pyFlink 示例时,./bin/flink run -m localhost:8081 -py examples/python/table/batch/word_count.py作业冻结,日志显示正在等待结果。
我尝试了很多方法,包括创建 virtualenv、传递 pyClientExecutable 和 pyexec、同步本地和远程 python 版本,但是没有一个起作用。
我缺少什么?如何将 python 示例提交到远程会话集群?
注意:当我在作业管理器 pod 中提交 pyFlink word_count 示例时,它运行没有任何问题。
我想在 pyflink 中创建一个自定义的用户定义连接器/源。我在 Java / Scala 中看到了这样做的文档,但没有看到关于 Python 的文档。这可能吗?
我正在尝试从 Kafka 收到的 JSON 数据中提取 PyFlink 中的一些嵌套字段。JSON 记录架构如下。基本上,每个记录都有一个Result对象,其中有一个名为 的对象数组data。我正在尝试value从第一个数组元素中提取字段,即data[0]。
{
'ID': 'some-id',
'Result': {
'data': [
{
'value': 65537,
...
...
}
]
}
}
Run Code Online (Sandbox Code Playgroud)
我正在使用 Table API 从 Kafka 主题读取数据并将提取的字段写入另一个主题。
源DDL如下:
source_ddl = """
CREATE TABLE InTable (
`ID` STRING,
`Timestamp` TIMESTAMP(3),
`Result` ROW(
`data` ROW(`value` BIGINT) ARRAY),
WATERMARK FOR `Timestamp` AS `Timestamp`
) WITH (
'connector' = 'kafka',
'topic' = 'in-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'my-group-id',
'scan.startup.mode' = 'earliest-offset', …Run Code Online (Sandbox Code Playgroud) 我正在尝试通过安装 pyflink pip3 install apache-flink。
我得到:
\n\n正在安装构建依赖项...错误错误:\nsubprocess-exited-with-error\n\xc3\x97 用于安装构建依赖项的 pip 子进程未成功运行。\xe2\x94\x82 退出代码:1 \xe2\x95\xb0\xe2\x94\x80> [4479 行输出]\n忽略 numpy:标记 'python_version == "3.6" 和 platform_system != "AIX"' don与您的环境不匹配\n忽略 numpy:标记 'python_version == "3.7" 和 platform_system != "AIX"' 与您的环境不匹配\n忽略 numpy:标记 'python_version == "3.6" 和 platform_system == "AIX" ' 与您的环境不匹配\n忽略 numpy: 标记 'python_version == "3.7" 和 platform_system == "AIX"' 与您的环境不匹配\n忽略 numpy: 标记 'python_version == "3.8" 和 platform_system == " AIX"' 与您的环境不匹配\n忽略 numpy: 标记 'python_version >= "3.9"' 与您的环境不匹配\n收集 setuptools
\n\xc3\x97 尝试安装包时遇到错误。\n\xe2\x95\xb0\xe2\x94\x80> numpy
\nRun Code Online (Sandbox Code Playgroud)note: …
我尝试从 PyFlink 和 Kafka 开始,但出现以下错误。
感谢您的支持 !
安装
python -m pip install apache-flink
pip install pyFlink
Run Code Online (Sandbox Code Playgroud)
代码
from pyFlink.datastream import StreamExecutionEnvironment
Run Code Online (Sandbox Code Playgroud)
错误
ModuleNotFoundError: No module named 'pyFlink'
Run Code Online (Sandbox Code Playgroud) pyflink ×8
apache-flink ×7
python ×3
flink-sql ×2
flink-batch ×1
java ×1
kubernetes ×1
numpy ×1
pip ×1
python-3.x ×1