我的流有一个名为'category'的列,我在另一个商店中为每个'category'提供了额外的静态元数据,每隔几天就会更新一次.这种查找的正确方法是什么?Kafka流有两种选择
在Kafka Streams之外加载静态数据,仅用于KStreams#map()添加元数据.这是可能的,因为Kafka Streams只是一个图书馆.
将元数据加载到Kafka主题,将其加载到a KTable和do KStreams#leftJoin(),这似乎更自然,并将分区等留给Kafka Streams.但是,这要求我们保持KTable加载所有值.请注意,我们必须加载整个查找数据,而不仅仅是加载更改.
上述哪一种是查找元数据的正确方法?
是否可以始终强制在重新启动时从头开始只读取一个流,这样就可以加载所有元数据KTable.
还有其他方式使用商店吗?
AVRO序列化的事件将发送到azure事件中心。这些事件使用天蓝色事件中心捕获功能持久存储。捕获的数据以及事件中心元数据均以Apache Avro格式编写。捕获的avro文件中包含的原始事件应使用(py)Spark进行分析。
如何使用(py)Spark反序列化包含在AVRO文件的字段/列中的AVRO序列化事件?(注释:事件的Avro模式无法被阅读器应用程序识别,但是它作为Avro标头包含在消息中)
背景是用于IoT场景的分析平台。消息由在kafka上运行的IoT平台提供。为了更灵活地更改模式,战略决策是坚持使用avro格式。为了启用Azure流分析(ASA)的使用,每条消息均指定avro架构(否则ASA无法反序列化该消息)。
下面列出了由事件中心捕获功能生成的avro文件的架构:
{
"type":"record",
"name":"EventData",
"namespace":"Microsoft.ServiceBus.Messaging",
"fields":[
{"name":"SequenceNumber","type":"long"},
{"name":"Offset","type":"string"},
{"name":"EnqueuedTimeUtc","type":"string"},
{"name":"SystemProperties","type":{"type":"map","values":["long","double","string","bytes"]}},
{"name":"Properties","type":{"type":"map","values":["long","double","string","bytes"]}},
{"name":"Body","type":["null","bytes"]}
]
}
Run Code Online (Sandbox Code Playgroud)
(请注意,实际消息以字节为单位存储在主体字段中)
为了说明起见,我将具有以下avro模式的事件发送到事件中心:
{
"type" : "record",
"name" : "twitter_schema",
"namespace" : "com.test.avro",
"fields" : [
{"name" : "username","type" : "string"},
{"name" : "tweet","type" : "string"},
{"name" : "timestamp","type" : "long"}
],
}
Run Code Online (Sandbox Code Playgroud)
{
"username": "stackoverflow",
"tweet": "please help deserialize me",
"timestamp": 1366150681
}
Run Code Online (Sandbox Code Playgroud)
(编码为字符串/请注意,其中包含avro模式)
Objavro.schema?{"type":"record","name":"twitter_schema","namespace":"com.test.avro","fields":[{"name":"username","type":"string"},{"name":"tweet","type":"string"},{"name":"timestamp","type":"long"}]}
Run Code Online (Sandbox Code Playgroud)
因此,最后,此有效负载将作为字节存储在捕获avro文件的“正文”字段中。
。
。
为了易于使用,测试和调试,我目前使用pyspark jupyter笔记本。
Spark会话的配置:
%%configure
{
"conf": …Run Code Online (Sandbox Code Playgroud)