小编Vig*_*han的帖子

使用查找数据丰富KStream的理想方式

我的流有一个名为'category'的列,我在另一个商店中为每个'category'提供了额外的静态元数据,每隔几天就会更新一次.这种查找的正确方法是什么?Kafka流有两种选择

  1. 在Kafka Streams之外加载静态数据,仅用于KStreams#map()添加元数据.这是可能的,因为Kafka Streams只是一个图书馆.

  2. 将元数据加载到Kafka主题,将其加载到a KTable和do KStreams#leftJoin(),这似乎更自然,并将分区等留给Kafka Streams.但是,这要求我们保持KTable加载所有值.请注意,我们必须加载整个查找数据,而不仅仅是加载更改.

    • 例如,最初说只有一个类别'c1'.Kafka流应用程序优雅地停止,然后重新启动.重新启动后,添加了一个新类别"c2".我的假设是,table = KStreamBuilder().table('metadataTopic')只有值'c2',因为这是应用程序第二次启动以来唯一发生的变化.我希望它有'c1'和'c2'.
    • 如果它也有"c1",那么数据是否会从KTable中删除(可能是通过设置发送key = null消息?)?

上述哪一种是查找元数据的正确方法?

是否可以始终强制在重新启动时从头开始只读取一个流,这样就可以加载所有元数据KTable.

还有其他方式使用商店吗?

apache-kafka-streams

9
推荐指数
2
解决办法
3091
查看次数

PySpark:反序列化eventhub捕获avro文件中包含的Avro序列化消息

初始情况

AVRO序列化的事件将发送到azure事件中心。这些事件使用天蓝色事件中心捕获功能持久存储。捕获的数据以及事件中心元数据均以Apache Avro格式编写。捕获的avro文件中包含的原始事件应使用(py)Spark进行分析。


如何使用(py)Spark反序列化包含在AVRO文件的字段/列中的AVRO序列化事件?(注释:事件的Avro模式无法被阅读器应用程序识别,但是它作为Avro标头包含在消息中)


背景

背景是用于IoT场景的分析平台。消息由在kafka上运行的IoT平台提供。为了更灵活地更改模式,战略决策是坚持使用avro格式。为了启用Azure流分析(ASA)的使用,每条消息均指定avro架构(否则ASA无法反序列化该消息)。

捕获文件平均模式

下面列出了由事件中心捕获功能生成的avro文件的架构:

{
    "type":"record",
    "name":"EventData",
    "namespace":"Microsoft.ServiceBus.Messaging",
    "fields":[
                 {"name":"SequenceNumber","type":"long"},
                 {"name":"Offset","type":"string"},
                 {"name":"EnqueuedTimeUtc","type":"string"},
                 {"name":"SystemProperties","type":{"type":"map","values":["long","double","string","bytes"]}},
                 {"name":"Properties","type":{"type":"map","values":["long","double","string","bytes"]}},
                 {"name":"Body","type":["null","bytes"]}
             ]
}
Run Code Online (Sandbox Code Playgroud)

(请注意,实际消息以字节为单位存储在主体字段中)

示例事件Avro模式

为了说明起见,我将具有以下avro模式的事件发送到事件中心:

{
    "type" : "record",
    "name" : "twitter_schema",
    "namespace" : "com.test.avro",
    "fields" : [ 
                {"name" : "username","type" : "string"}, 
                {"name" : "tweet","type" : "string"},
                {"name" : "timestamp","type" : "long"}
    ],
}
Run Code Online (Sandbox Code Playgroud)

示例事件

{
    "username": "stackoverflow",
    "tweet": "please help deserialize me",
    "timestamp": 1366150681
}
Run Code Online (Sandbox Code Playgroud)

示例Avro消息有效负载

(编码为字符串/请注意,其中包含avro模式)

Objavro.schema?{"type":"record","name":"twitter_schema","namespace":"com.test.avro","fields":[{"name":"username","type":"string"},{"name":"tweet","type":"string"},{"name":"timestamp","type":"long"}]}
Run Code Online (Sandbox Code Playgroud)

因此,最后,此有效负载将作为字节存储在捕获avro文件的“正文”字段中。



我目前的做法

为了易于使用,测试和调试,我目前使用pyspark jupyter笔记本。

Spark会话的配置:

%%configure
{
    "conf": …
Run Code Online (Sandbox Code Playgroud)

avro apache-spark pyspark azure-eventhub-capture

5
推荐指数
1
解决办法
860
查看次数