设想
我在 2018 年 1 月 1 日创建了一个 eventhub。
我的 eventhub 保留期设置为 1 天。
我启用了“捕获”,保留每 5 分钟或 300 mb 的默认捕获参数,将特征存储在名为“ customerevents ”的容器中
到今天,即 2018 年 10 月 22 日,我向我的 eventhub 发送并处理了 1000,000 个客户事件。
我为另一个对历史数据感兴趣的部门创建了一个新服务,现在需要重播我自 2018 年 1 月 1 日以来收到的所有1000,000 条消息。
我的customerevents存储容器中有很多很多“文件夹”,用于年/月/日和每 5 分钟,每个.avro文件中包含我捕获的事件。
我如何为我的新服务“重播”所有这些事件?
任何建议表示赞赏。
事件中心不允许您存储超过 7(最多 30)天的消息。具有这些限制的 Azure 建议的 PaaS 事件溯源架构是什么?如果是事件中心 + 快照,如果我们需要以某种方式重建该状态会发生什么?另外,事件中心是对 KSQL/Spark Azure 流分析的回答吗?
AVRO序列化的事件将发送到azure事件中心。这些事件使用天蓝色事件中心捕获功能持久存储。捕获的数据以及事件中心元数据均以Apache Avro格式编写。捕获的avro文件中包含的原始事件应使用(py)Spark进行分析。
如何使用(py)Spark反序列化包含在AVRO文件的字段/列中的AVRO序列化事件?(注释:事件的Avro模式无法被阅读器应用程序识别,但是它作为Avro标头包含在消息中)
背景是用于IoT场景的分析平台。消息由在kafka上运行的IoT平台提供。为了更灵活地更改模式,战略决策是坚持使用avro格式。为了启用Azure流分析(ASA)的使用,每条消息均指定avro架构(否则ASA无法反序列化该消息)。
下面列出了由事件中心捕获功能生成的avro文件的架构:
{
"type":"record",
"name":"EventData",
"namespace":"Microsoft.ServiceBus.Messaging",
"fields":[
{"name":"SequenceNumber","type":"long"},
{"name":"Offset","type":"string"},
{"name":"EnqueuedTimeUtc","type":"string"},
{"name":"SystemProperties","type":{"type":"map","values":["long","double","string","bytes"]}},
{"name":"Properties","type":{"type":"map","values":["long","double","string","bytes"]}},
{"name":"Body","type":["null","bytes"]}
]
}
Run Code Online (Sandbox Code Playgroud)
(请注意,实际消息以字节为单位存储在主体字段中)
为了说明起见,我将具有以下avro模式的事件发送到事件中心:
{
"type" : "record",
"name" : "twitter_schema",
"namespace" : "com.test.avro",
"fields" : [
{"name" : "username","type" : "string"},
{"name" : "tweet","type" : "string"},
{"name" : "timestamp","type" : "long"}
],
}
Run Code Online (Sandbox Code Playgroud)
{
"username": "stackoverflow",
"tweet": "please help deserialize me",
"timestamp": 1366150681
}
Run Code Online (Sandbox Code Playgroud)
(编码为字符串/请注意,其中包含avro模式)
Objavro.schema?{"type":"record","name":"twitter_schema","namespace":"com.test.avro","fields":[{"name":"username","type":"string"},{"name":"tweet","type":"string"},{"name":"timestamp","type":"long"}]}
Run Code Online (Sandbox Code Playgroud)
因此,最后,此有效负载将作为字节存储在捕获avro文件的“正文”字段中。
。
。
为了易于使用,测试和调试,我目前使用pyspark jupyter笔记本。
Spark会话的配置:
%%configure
{
"conf": …
Run Code Online (Sandbox Code Playgroud) 任何人都可以弄清楚 Azure 事件中心中的消费者组是什么。它有什么用?我浏览了很多网站,但我无法得到明确的答案。
我正在评估 Azure 事件中心与 Kafka 作为服务代理的使用。我希望我能够并排创建两个本地应用程序,一个使用 Kafka 使用消息,另一个使用 Azure 事件中心。我已经设置了一个 docker 容器,它是一个 Kafka 实例,我正在使用我的 Azure 帐户设置 Azure 事件中心(据我所知,没有其他方法可以为 Azure 创建本地/开发实例事件中心)。
有没有人有关于这两者的任何信息,在比较它们的功能时可能有用?
这与 Azure 事件中心有关,我正在尝试使用 POSTMAN 的 POST api 调用将数据发送到我的事件中心。
我遵循的步骤:
创建事件中心、生成 SAS 发送令牌、创建消费者组
现在在邮递员中,我正在努力格式化正确的标题:
我发送的请求:
POST: https://testeventhu.servicebus.windows.net/myhub
Run Code Online (Sandbox Code Playgroud)
2个标题:
Content-Type : application/atom+xml;type=entry;charset=utf-8
Authorization: SharedAccessSignature sig=kjheh/f6SqR8dIW2nRpGUCHuhdshss2KoCKo7Q6ozmY=&se=1571140739&skn=saspolicy&sr=https://testeventhu.servicebus.windows.net/myhub
Run Code Online (Sandbox Code Playgroud)
我收到错误 401 MalformedToken: Failed to parse simple web token
我在这里做错了什么?使用的参考来自 https://docs.microsoft.com/en-us/rest/api/eventhub/Send-event?redirectedfrom=MSDN
提前致谢
下面是从微软网站复制的示例代码。我做了更换事件枢纽<namespace>
,<eventhub>
,<AccessKeyName>
,并<primary key value>
与所需的值。
import sys
import logging
import datetime
import time
import os
from azure.eventhub import EventHubClient, Sender, EventData
logger = logging.getLogger("azure")
# Address can be in either of these formats:
# "amqps://<URL-encoded-SAS-policy>:<URL-encoded-SAS-key>@<namespace>.servicebus.windows.net/eventhub"
# "amqps://<namespace>.servicebus.windows.net/<eventhub>"
# SAS policy and key are not required if they are encoded in the URL
ADDRESS = "amqps://<namespace>.servicebus.windows.net/<eventhub>"
USER = "<AccessKeyName>"
KEY = "<primary key value>"
try:
if not ADDRESS:
raise ValueError("No EventHubs URL supplied.") …
Run Code Online (Sandbox Code Playgroud) 我正在尝试从 Azure Data Lake Gen1 读取 avro 数据,这些数据是从 Azure EventHubs 生成的,在 Azure Databricks 中使用 pyspark 启用了 Azure Event Hubs Capture:
inputdata = "evenhubscapturepath/*/*"
rawData = spark.read.format("avro").load(inputdata)
Run Code Online (Sandbox Code Playgroud)
以下语句失败
rawData.count()
Run Code Online (Sandbox Code Playgroud)
和
org.apache.spark.SparkException: Job aborted due to stage failure: Task 162 in stage 48.0 failed 4 times, most recent failure: Lost task 162.3 in stage 48.0 (TID 2807, 10.3.2.4, executor 1): java.io.IOException: Not an Avro data file
Run Code Online (Sandbox Code Playgroud)
EventHub-Capture 是否正在写入非 Avro 数据?是否有使用 Spark 读取 EventHub 捕获数据的最佳实践?
azure azure-eventhub pyspark azure-eventhub-capture azure-databricks
azure ×5
avro ×2
pyspark ×2
apache-kafka ×1
apache-spark ×1
c# ×1
integration ×1
post ×1
python ×1