Kar*_*ikJ 5 scala apache-kafka apache-spark spark-structured-streaming spark-kafka-integration
如果我的 Kafka 主题收到类似的记录
CHANNEL | VIEWERS | .....
ABC | 100 | .....
CBS | 200 | .....
Run Code Online (Sandbox Code Playgroud)
我有 Spark 结构化流代码来读取和处理 Kafka 记录,如下所示:
val spark = SparkSession
.builder
.appName("TestPartition")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val dataFrame = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers",
"1.2.3.184:9092,1.2.3.185:9092,1.2.3.186:9092")
.option("subscribe", "partition_test")
.option("failOnDataLoss", "false")
.load()
.selectExpr("CAST(value AS STRING)")
// I will use a custom UDF to transform to a specific object
Run Code Online (Sandbox Code Playgroud)
目前,我使用 foreachwriter 处理记录如下:
val writer = new ForeachWriter[testRec] {
def open(partitionId: Long, version: Long): Boolean = {
true
}
def process(record: testRec) = {
handle(record)
}
def close(errorOrNull: Throwable): Unit = {
}
}
val query = dataFrame.writeStream
.format("console")
.foreach(writer)
.outputMode("append")
.start()
Run Code Online (Sandbox Code Playgroud)
代码工作得很好。但是,我想做的是按通道对传入数据进行分区,以便每个工作人员负责特定的通道,并且我在 handle() 块内执行与该通道相关的内存计算。那可能吗 ?如果是,我该怎么做?
代码按原样handle在记录级别应用该方法,并且与记录的分区无关。
我看到两个选项可以确保同一通道的所有消息都在同一执行器上处理:
如果您可以控制 KafkaProducer 将数据生成到主题“partition_test”中,则可以将 的值设置channel为 Kafka 消息的键。默认情况下,KafkaProducer 使用键来定义数据写入的分区。这将确保具有相同密钥的所有消息都将落在同一个 Kafka 主题分区中。由于使用 Kafka 主题的 Spark 结构化流作业将与 Kafka 分区匹配,因此您的结果dataFrame将具有与 Kafka 主题相同数量的分区,并且同一通道的所有消息都位于同一分区中。
正如评论中已经写的,您可以通过执行以下操作,dataFrame根据列的值简单地重新分区,其中是分区数。这样,具有相同通道的所有记录将落在同一分区中,因此在同一执行器上进行处理。channeldataFrame.repartition(n, col("columnName"))n
两个重要注意事项:
获得分区(Dataframes 或 Kafka 主题)的所有权需要额外的注意,因为您可能最终会遇到所谓的“数据倾斜”。当分区包含大量消息而不是仅包含少量消息时,就会发生数据倾斜。这将对您的整体表现产生负面影响。
只要您使用foreach输出接收器,当您在记录级别进行处理时,数据如何分区并不重要。如果您正在寻找更多控制,您可能更愿意使用foreachBatch接收器(Spark 2.4+ 中提供)。foreachBatch 输出接收器使您可以控制每个微批次的批数据帧,并且您可以使用foreachPartitions或执行基于分区的逻辑mapPartitions。
| 归档时间: |
|
| 查看次数: |
904 次 |
| 最近记录: |