时间序列数据是在 kafka 主题中生成的。我需要读取每条记录并用数据库中的一些数据进行修饰,并最终调用 REST API。收到响应后,输出到 kafka 主题。如何使用 kafka Streams API 高效且可扩展地做到这一点?
脚步 -
我认为,上述算法有两个瓶颈——
进行数据库调用会减慢速度。这可以通过缓存元数据并在存在错误或使用状态存储时加载元数据来避免。
同步进行 REST API 调用会减慢速度。
final KStream<String, String> records = builder.stream(InputTopic);
//This is bad
final KStream<String, String> output = records
.mapValues(value -> { //cache hit otherwise database call});
.mapValues(value -> { //prepare http request and convert the http resonse };
output.to(OutputTopic)
Run Code Online (Sandbox Code Playgroud)
如果数据库调用或 REST API 需要较长时间才能完成,上面的代码将对吞吐量产生依赖性和不利影响。具有相同键的记录不应乱序处理。预计吞吐量约为1m/分钟。当一条记录到达 REST API 时,可以并发进行数据库调用。
不知道如何编写可以在这种情况下扩展的拓扑。我是卡夫卡流的新手。
我将JSON转换为DataFrame并以“ Structure_value”列结尾,该列具有以下值作为字典/词典列表:
Structure_value
[{'Room': 6, 'Length': 7}, {'Room': 6, 'Length': 7}]
[{'Room': 6, 'Length': 22}]
[{'Room': 6, 'Length': 8}, {'Room': 6, 'Length': 9}]
Run Code Online (Sandbox Code Playgroud)
由于它是一个对象,所以我猜它最终会以这种格式出现。
我需要将其分为以下四列:
Structure_value_room_1
Structure_value_length_1
Structure_value_room_2
Structure_value_length_2
StackOverflow上的所有其他解决方案仅处理将简单JSON转换为DataFrame而不是嵌套结构的问题。
PS:我知道我可以通过显式命名字段来做一些事情,但是我需要一个通用的解决方案,以便将来可以处理这种格式的任何JSON
[编辑]:输出应如下所示:
Structure_value_room_1 Structure_value_length_1 Structure_value_room_2 \
0 6 7 6.0
1 6 22 NaN
2 6 8 6.0
Structure_value_length_2
0 7.0
1 NaN
2 9.0
Run Code Online (Sandbox Code Playgroud) apache-kafka ×1
dataframe ×1
json ×1
model ×1
pandas ×1
python ×1
python-3.x ×1
rest ×1
serving ×1