从 Azure EventHubs Capture 生成的 Azure Data Lake Gen1 中使用 Databricks 读取 avro 数据失败

Fie*_*ete 1 azure azure-eventhub pyspark azure-eventhub-capture azure-databricks

我正在尝试从 Azure Data Lake Gen1 读取 avro 数据,这些数据是从 Azure EventHubs 生成的,在 Azure Databricks 中使用 pyspark 启用了 Azure Event Hubs Capture:

inputdata = "evenhubscapturepath/*/*"
rawData = spark.read.format("avro").load(inputdata)
Run Code Online (Sandbox Code Playgroud)

以下语句失败

rawData.count()
Run Code Online (Sandbox Code Playgroud)

org.apache.spark.SparkException: Job aborted due to stage failure: Task 162 in stage 48.0 failed 4 times, most recent failure: Lost task 162.3 in stage 48.0 (TID 2807, 10.3.2.4, executor 1): java.io.IOException: Not an Avro data file
Run Code Online (Sandbox Code Playgroud)

EventHub-Capture 是否正在写入非 Avro 数据?是否有使用 Spark 读取 EventHub 捕获数据的最佳实践?

Hau*_*low 8

实现冷摄取路径的一种模式是使用事件中心捕获。EventHubs 捕获按窗口参数定义的每个分区写入一个文件。数据以 avro 格式写入,可以使用 Apache Spark 进行分析。

那么使用此功能的最佳实践是什么?

1.不要过度分区

我经常看到人们使用默认配置,这最终经常导致许多小文件。如果要使用通过 EventHubs Capture with Spark 摄取的数据,请记住Azure Data Lake Store 中文件大小和Spark分区的最佳做法。文件大小应为 ~256 MB,分区在 10 到 50 GB 之间。所以最后配置取决于您正在使用的消息的数量和大小。在大多数情况下,您只需按摄取日期对数据进行分区就可以了。

2.勾选“不发出空文件选项”

您应该选中“不发出空文件选项”。如果你想用 Spark 消费数据,节省不必要的文件操作。

3. 在文件路径中使用数据源

使用流式架构,您的 EventHub 就像面向批处理的架构方法中的着陆区。因此,您将在原始数据层中摄取数据。好的做法是在目录路径中使用数据源而不是 EventHub 的名称。因此,例如,如果您从工厂中的机器人摄取遥测数据,这可能是目录路径/raw/robots/

存储命名需要使用所有属性,例如 {Namesapce}、{PartitionId}。因此,最终,具有明确定义的路径、每日分区和 Azure Data Lake Gen 2 中文件名剩余属性的使用的良好捕获文件格式定义可能如下所示:

 /raw/robots/ingest_date={Year}-{Month}-{Day}/{Hour}{Minute}{Second}-{Namespace}-{EventHub}-{PartitionId}
Run Code Online (Sandbox Code Playgroud)

在此处输入图片说明

4. 考虑一个压缩工作

捕获的数据未压缩,在您的用例中也可能最终生成小文件(因为最小写入频率为 15 分钟)。因此,如有必要,请编写每天运行一次的压缩作业。就像是

df.repartition(5).write.format("avro").save(targetpath)
Run Code Online (Sandbox Code Playgroud)

会做这个工作。

那么现在读取捕获数据的最佳实践是什么?

5. 忽略读取数据的非 avro 文件

Azure EventHubs Capture 将临时数据写入 Azure Data Lake Gen1。最佳做法是仅使用 avro-extension 读取数据。您可以通过 spark 配置轻松实现此目的:

spark.conf.set("avro.mapred.ignore.inputs.without.extension", "true")
Run Code Online (Sandbox Code Playgroud)

6. 只读相关分区

考虑仅读取相关分区,例如过滤当前摄取日。

7. 使用共享元数据

读取捕获的数据与直接从 Azure EventHubs 读取数据的工作方式类似。所以你必须有一个模式。假设您还有直接使用 Spark Structured Streaming 读取数据的作业,一个好的模式是存储元数据并共享它。您可以将此元数据存储在 Data Lake Store json 文件中:

[{"MeasurementTS":"timestamp","Location":"string", "Temperature":"double"}]
Run Code Online (Sandbox Code Playgroud)

并使用这个简单的解析函数读取它:

# parse the metadata to get the schema
from collections import OrderedDict 
from pyspark.sql.types import *
import json

ds = dbutils.fs.head (metadata)                                                 # read metadata file

items = (json
  .JSONDecoder(object_pairs_hook=OrderedDict)
  .decode(ds)[0].items())

#Schema mapping 
mapping = {"string": StringType, "integer": IntegerType, "double" : DoubleType, "timestamp" : TimestampType, "boolean" : BooleanType}

schema = StructType([
    StructField(k, mapping.get(v.lower())(), True) for (k, v) in items])
Run Code Online (Sandbox Code Playgroud)

所以你可以重用你的架构:

from pyspark.sql.functions import *

parsedData = spark.read.format("avro").load(rawpath). \
  selectExpr("EnqueuedTimeUtc", "cast(Body as string) as json") \
 .select("EnqueuedTimeUtc", from_json("json", schema=Schema).alias("data")) \
 .select("EnqueuedTimeUtc", "data.*")
Run Code Online (Sandbox Code Playgroud)