标签: azure-eventhub-capture

Azure eventhub 捕获重放

设想

我在 2018 年 1 月 1 日创建了一个 eventhub。

我的 eventhub 保留期设置为 1 天。

我启用了“捕获”,保留每 5 分钟或 300 mb 的默认捕获参数,将特征存储在名为“ customerevents ”的容器中

到今天,即 2018 年 10 月 22 日,我向我的 eventhub 发送并处理了 1000,000 个客户事件。

我为另一个对历史数据感兴趣的部门创建了一个新服务,现在需要重播我自 2018 年 1 月 1 日以来收到的所有1000,000 条消息。

我的customerevents存储容器中有很多很多“文件夹”,用于年/月/日和每 5 分钟,每个.avro文件中包含我捕获的事件。

我如何为我的新服务“重播”所有这些事件?

任何建议表示赞赏。

c# azure-storage avro azure-eventhub azure-eventhub-capture

7
推荐指数
0
解决办法
457
查看次数

事件中心是否旨在用于事件溯源/仅追加日志架构

事件中心不允许您存储超过 7(最多 30)天的消息。具有这些限制的 Azure 建议的 PaaS 事件溯源架构是什么?如果是事件中心 + 快照,如果我们需要以某种方式重建该状态会发生什么?另外,事件中心是对 KSQL/Spark Azure 流分析的回答吗?

transaction-log azure-eventhub azure-eventhub-capture

6
推荐指数
2
解决办法
2035
查看次数

PySpark:反序列化eventhub捕获avro文件中包含的Avro序列化消息

初始情况

AVRO序列化的事件将发送到azure事件中心。这些事件使用天蓝色事件中心捕获功能持久存储。捕获的数据以及事件中心元数据均以Apache Avro格式编写。捕获的avro文件中包含的原始事件应使用(py)Spark进行分析。


如何使用(py)Spark反序列化包含在AVRO文件的字段/列中的AVRO序列化事件?(注释:事件的Avro模式无法被阅读器应用程序识别,但是它作为Avro标头包含在消息中)


背景

背景是用于IoT场景的分析平台。消息由在kafka上运行的IoT平台提供。为了更灵活地更改模式,战略决策是坚持使用avro格式。为了启用Azure流分析(ASA)的使用,每条消息均指定avro架构(否则ASA无法反序列化该消息)。

捕获文件平均模式

下面列出了由事件中心捕获功能生成的avro文件的架构:

{
    "type":"record",
    "name":"EventData",
    "namespace":"Microsoft.ServiceBus.Messaging",
    "fields":[
                 {"name":"SequenceNumber","type":"long"},
                 {"name":"Offset","type":"string"},
                 {"name":"EnqueuedTimeUtc","type":"string"},
                 {"name":"SystemProperties","type":{"type":"map","values":["long","double","string","bytes"]}},
                 {"name":"Properties","type":{"type":"map","values":["long","double","string","bytes"]}},
                 {"name":"Body","type":["null","bytes"]}
             ]
}
Run Code Online (Sandbox Code Playgroud)

(请注意,实际消息以字节为单位存储在主体字段中)

示例事件Avro模式

为了说明起见,我将具有以下avro模式的事件发送到事件中心:

{
    "type" : "record",
    "name" : "twitter_schema",
    "namespace" : "com.test.avro",
    "fields" : [ 
                {"name" : "username","type" : "string"}, 
                {"name" : "tweet","type" : "string"},
                {"name" : "timestamp","type" : "long"}
    ],
}
Run Code Online (Sandbox Code Playgroud)

示例事件

{
    "username": "stackoverflow",
    "tweet": "please help deserialize me",
    "timestamp": 1366150681
}
Run Code Online (Sandbox Code Playgroud)

示例Avro消息有效负载

(编码为字符串/请注意,其中包含avro模式)

Objavro.schema?{"type":"record","name":"twitter_schema","namespace":"com.test.avro","fields":[{"name":"username","type":"string"},{"name":"tweet","type":"string"},{"name":"timestamp","type":"long"}]}
Run Code Online (Sandbox Code Playgroud)

因此,最后,此有效负载将作为字节存储在捕获avro文件的“正文”字段中。



我目前的做法

为了易于使用,测试和调试,我目前使用pyspark jupyter笔记本。

Spark会话的配置:

%%configure
{
    "conf": …
Run Code Online (Sandbox Code Playgroud)

avro apache-spark pyspark azure-eventhub-capture

5
推荐指数
1
解决办法
860
查看次数

azure 事件中心中的消费者组是什么?

任何人都可以弄清楚 Azure 事件中心中的消费者组是什么。它有什么用?我浏览了很多网站,但我无法得到明确的答案。

integration azure azure-eventhub azure-eventhub-capture

5
推荐指数
3
解决办法
6194
查看次数

Azure 事件中心与 Kafka 即服务代理

我正在评估 Azure 事件中心与 Kafka 作为服务代理的使用。我希望我能够并排创建两个本地应用程序,一个使用 Kafka 使用消息,另一个使用 Azure 事件中心。我已经设置了一个 docker 容器,它是一个 Kafka 实例,我正在使用我的 Azure 帐户设置 Azure 事件中心(据我所知,没有其他方法可以为 Azure 创建本地/开发实例事件中心)。

有没有人有关于这两者的任何信息,在比较它们的功能时可能有用?

azure apache-kafka azure-eventhub-capture

5
推荐指数
4
解决办法
6780
查看次数

Azure 事件中心上的 POST 数据失败

这与 Azure 事件中心有关,我正在尝试使用 POSTMAN 的 POST api 调用将数据发送到我的事件中心。

我遵循的步骤:

创建事件中心、生成 SAS 发送令牌、创建消费者组

现在在邮递员中,我正在努力格式化正确的标题:

我发送的请求:

POST:  https://testeventhu.servicebus.windows.net/myhub 
Run Code Online (Sandbox Code Playgroud)

2个标题:

Content-Type : application/atom+xml;type=entry;charset=utf-8

Authorization:  SharedAccessSignature sig=kjheh/f6SqR8dIW2nRpGUCHuhdshss2KoCKo7Q6ozmY=&se=1571140739&skn=saspolicy&sr=https://testeventhu.servicebus.windows.net/myhub
Run Code Online (Sandbox Code Playgroud)

我收到错误 401 MalformedToken: Failed to parse simple web token

我在这里做错了什么?使用的参考来自 https://docs.microsoft.com/en-us/rest/api/eventhub/Send-event?redirectedfrom=MSDN

提前致谢

post azure azure-eventhub azure-eventhub-capture

2
推荐指数
1
解决办法
1227
查看次数

使用 Python 将事件发送到 Azure 事件中心

下面是从微软网站复制的示例代码。我做了更换事件枢纽<namespace><eventhub><AccessKeyName>,并<primary key value>与所需的值。

import sys
import logging
import datetime
import time
import os

from azure.eventhub import EventHubClient, Sender, EventData

logger = logging.getLogger("azure")

# Address can be in either of these formats:
# "amqps://<URL-encoded-SAS-policy>:<URL-encoded-SAS-key>@<namespace>.servicebus.windows.net/eventhub"
# "amqps://<namespace>.servicebus.windows.net/<eventhub>"
# SAS policy and key are not required if they are encoded in the URL

ADDRESS = "amqps://<namespace>.servicebus.windows.net/<eventhub>"
USER = "<AccessKeyName>"
KEY = "<primary key value>"

try:
    if not ADDRESS:
        raise ValueError("No EventHubs URL supplied.") …
Run Code Online (Sandbox Code Playgroud)

python azure azure-eventhub azure-eventhub-capture

2
推荐指数
1
解决办法
2353
查看次数

从 Azure EventHubs Capture 生成的 Azure Data Lake Gen1 中使用 Databricks 读取 avro 数据失败

我正在尝试从 Azure Data Lake Gen1 读取 avro 数据,这些数据是从 Azure EventHubs 生成的,在 Azure Databricks 中使用 pyspark 启用了 Azure Event Hubs Capture:

inputdata = "evenhubscapturepath/*/*"
rawData = spark.read.format("avro").load(inputdata)
Run Code Online (Sandbox Code Playgroud)

以下语句失败

rawData.count()
Run Code Online (Sandbox Code Playgroud)

org.apache.spark.SparkException: Job aborted due to stage failure: Task 162 in stage 48.0 failed 4 times, most recent failure: Lost task 162.3 in stage 48.0 (TID 2807, 10.3.2.4, executor 1): java.io.IOException: Not an Avro data file
Run Code Online (Sandbox Code Playgroud)

EventHub-Capture 是否正在写入非 Avro 数据?是否有使用 Spark 读取 EventHub 捕获数据的最佳实践?

azure azure-eventhub pyspark azure-eventhub-capture azure-databricks

1
推荐指数
1
解决办法
1459
查看次数