2 个具有相同消费者组 ID 的 Spark Stream 作业

ven*_*sam 2 apache-kafka apache-spark spark-streaming

我正在尝试对消费者群体进行实验

这是我的代码片段

public final class App {

private static final int INTERVAL = 5000;

public static void main(String[] args) throws Exception {

    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put("bootstrap.servers", "xxx:9092");
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer", StringDeserializer.class);
    kafkaParams.put("auto.offset.reset", "earliest");
    kafkaParams.put("enable.auto.commit", true);
    kafkaParams.put("auto.commit.interval.ms","1000");
    kafkaParams.put("security.protocol","SASL_PLAINTEXT");
    kafkaParams.put("sasl.kerberos.service.name","kafka");
    kafkaParams.put("retries","3");
    kafkaParams.put(GROUP_ID_CONFIG,"mygroup");
    kafkaParams.put("request.timeout.ms","210000");
    kafkaParams.put("session.timeout.ms","180000");
    kafkaParams.put("heartbeat.interval.ms","3000");
    Collection<String> topics = Arrays.asList("venkat4");

    SparkConf conf = new SparkConf();
    JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(INTERVAL));


    final JavaInputDStream<ConsumerRecord<String, String>> stream =
            KafkaUtils.createDirectStream(
                    ssc,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
            );

    stream.mapToPair(
            new PairFunction<ConsumerRecord<String, String>, String, String>() {
                @Override
                public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
                    return new Tuple2<>(record.key(), record.value());
                }
            }).print();


    ssc.start();
    ssc.awaitTermination();


}
Run Code Online (Sandbox Code Playgroud)

}

当我同时运行两个 Spark 流作业时,它失败并出现错误

线程“main”中出现异常 java.lang.IllegalStateException:org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) 上的分区 venkat4-1 当前没有分配。 client.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:315) 在 org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1170) 在 org.apache.spark.streaming.kafka010.DirectKafkaInputDStream。最新Offsets(DirectKafkaInputDStream.scala:197)在org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:214)在org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun $1$$anonfun$apply$7.apply(DStream.scala:341) 在 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala :341) 在 scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 在 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) 在org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) 在 org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) ) 在 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) 在 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream .scala:333) 在 scala.Option.orElse(Option.scala:289)

根据此https://www.wisdomjobs.com/e-university/apache-kafka-tutorial-1342/apache-kafka-consumer-group-example-19004.html创建具有同一组的 kafka 消费者的单独实例将创建一个重新平衡分区。我相信消费者不会容忍这种重新平衡。我应该如何解决这个问题

下面是使用的命令

SPARK_KAFKA_VERSION=0.10 Spark2-submit --num-executors 2 --master yarn --deploy-mode client --files jaas.conf#jaas.conf,hive.keytab#hive.keytab --driver-java-options "-Djava .security.auth.login.config=./jaas.conf" --class Streaming.App --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" -- conf Spark.streaming.kafka.consumer.cache.enabled=false 1-1.0-SNAPSHOT.jar

Rav*_*mar 5

根据此https://www.wisdomjobs.com/e-university/apache-kafka-tutorial-1342/apache-kafka-consumer-group-example-19004.html创建具有同一组的 kafka 消费者的单独实例将创建一个重新平衡分区。我相信消费者不会容忍这种重新平衡。我应该如何解决这个问题

在此输入图像描述

现在所有分区仅由一个消费者使用。如果数据摄取率很高,消费者以摄取速度消费数据的速度可能会很慢。

在此输入图像描述

在同一个consumergroup中添加更多consumer来消费某个topic的数据,提高消费率。Spark Streaming 使用这种方法在 Kafka 分区和 Spark 分区之间实现 1:1 并行度。Spark 将在内部处理它。

如果消费者的数量多于主题分区的数量,那么它将处于空闲状态并且资源未得到充分利用。始终建议消费者应小于或等于分区数。

如果添加更多进程/线程,Kafka 将重新平衡。如果任何消费者或代理未能向 ZooKeeper 发送心跳,ZooKeeper 可以由 Kafka 集群重新配置。

每当任何代理发生故障或向现有主题添加新分区时,Kafka 都会重新平衡分区存储。这是 kafka 特定的如何平衡代理中分区之间的数据。

Spark 流在 Kafka 分区和 Spark 分区之间提供简单的 1:1 并行性。如果您没有使用 ConsumerStraties.Assign 提供任何分区详细信息,则从给定主题的所有分区进行消费。

Kafka将主题的分区分配给一组中的消费者,以便每个分区恰好由该组中的一个消费者消费。Kafka 保证消息只能被组中的单个消费者读取。

当您启动第二个 Spark Streaming 作业时,另一个消费者尝试使用同一消费者组 ID 中的同一分区。所以它会抛出错误。

val alertTopics = Array("testtopic")

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> sparkJobConfig.kafkaBrokers,
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> sparkJobConfig.kafkaConsumerGroup,
  "auto.offset.reset" -> "latest"
)

val streamContext = new StreamingContext(sparkContext, Seconds(sparkJobConfig.streamBatchInterval.toLong))

val streamData = KafkaUtils.createDirectStream(streamContext, PreferConsistent, Subscribe[String, String](alertTopics, kafkaParams))
Run Code Online (Sandbox Code Playgroud)

如果您想使用特定于分区的 Spark 作业,请使用以下代码。

val topicPartitionsList =  List(new TopicPartition("topic",1))

val alertReqStream1 = KafkaUtils.createDirectStream(streamContext, PreferConsistent, ConsumerStrategies.Assign(topicPartitionsList, kafkaParams))
Run Code Online (Sandbox Code Playgroud)

https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html#consumerstrategies

消费者可以使用samegroup.id加入群组。

val topicPartitionsList =  List(new TopicPartition("topic",3), new TopicPartition("topic",4))

    val alertReqStream2 = KafkaUtils.createDirectStream(streamContext, PreferConsistent, ConsumerStrategies.Assign(topicPartitionsList, kafkaParams))
Run Code Online (Sandbox Code Playgroud)

添加两个消费者就是添加到同一个 groupid 中。

请阅读 Spark-Kafka 集成指南。 https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html

希望这可以帮助。