相关疑难解决方法(0)

如何在 spark 3.0 结构化流中使用 kafka.group.id 和检查点以继续从重启后停止的 Kafka 读取?

基于 Spark 3.0 中的介绍,https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html。应该可以设置“kafka.group.id”来跟踪偏移量。对于我们的用例,如果流式 Spark 作业失败并重新启动,我想避免潜在的数据丢失。根据我之前的问题,我觉得 Spark 3.0 中的 kafka.group.id 会有所帮助。

如何为 Spark 结构化流指定 kafka 消费者的组 ID?

如何通过 Spark Structured Streaming 确保 kafka 数据摄取不会丢失数据?

但是,我在 spark 3.0 中尝试了如下设置。

package com.example

/**
 * @author ${user.name}
 */
import scala.math.random

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, BooleanType, LongType}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SaveMode
import org.apache.spark.SparkFiles
import java.util.Properties
import org.postgresql.Driver
import org.apache.spark.sql.streaming.Trigger
import java.time.Instant
import org.apache.hadoop.fs.{FileSystem, Path}
import java.net.URI
import java.sql.Connection
import java.sql.DriverManager
import java.sql.ResultSet
import …
Run Code Online (Sandbox Code Playgroud)

scala apache-kafka apache-spark spark-structured-streaming spark-kafka-integration

3
推荐指数
1
解决办法
1159
查看次数

Spark 结构化流 未授权访问组

我正在尝试通过 Spark 结构化流从 Kafka 读取数据。但是,在Spark 2.4.0中,您无法为流设置group id(请参阅如何在Structured Streaming中为kafka数据源中的消费者组设置group.id?)。

然而,由于没有设置,spark 只是生成组 Id,而我陷入了 GroupAuthorizationException:

19/12/10 15:15:00 ERROR streaming.MicroBatchExecution: Query [id = 747090ff-120f-4a6d-b20e-634eb77ac7b8, runId = 63aa4cce-ad72-47f2-80f6-e87947b69685] terminated with error
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-d2420426-13d5-4bda-ad21-7d8e43ebf518-1874352823-driver-2
Run Code Online (Sandbox Code Playgroud)

请问有什么想法可以绕过这个吗?有趣的是,我可以通过 kafka-console-consumer.sh 读取这些数据,我可以在 .properties 文件中传递组 ID。

抛出异常的代码:

val df = spark
  .readStream
  .format("kafka")
  .option("subscribe", "topic")
  .option("startingOffsets", "earliest")
  .option("kafka.group.id", "idThatShouldBeUsed")
  .option("kafka.bootstrap.servers", "server")
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.sasl.mechanism", "PLAIN")
  .option("kafka.ssl.truststore.location", "/location)
  .option("kafka.ssl.truststore.password", "pass")
  .option("kafka.sasl.jaas.config", """jaasToUse""")
  .load()
  .writeStream
  .outputMode("append")
  .format("console")
  .option("startingOffsets", "earliest")
  .start().awaitTermination()
Run Code Online (Sandbox Code Playgroud)

scala apache-kafka apache-spark spark-streaming spark-structured-streaming

2
推荐指数
1
解决办法
4696
查看次数