消息在Kafka + Spark Streaming中丢失

Vik*_*eek 5 apache-kafka apache-spark

我正面临与kafka火花流相关的问题,我的用例如下:

  • Spark流(DirectStream)应用程序从Kafka主题读取消息并对其进行处理.
  • 在处理过的消息的基础上,应用程序将处理后的消息写入不同的Kafka主题,例如,如果消息是协调的,则写入协调的主题其他未协调的主题.

现在,问题是在流式传输期间我们正在丢失一些消息,即所有传入的消息都不会写入协调或非协调的主题.例如,如果应用程序在一个批处理中收到30条消息,那么有时它会将所有消息写入输出主题(这是预期的行为),但有时它只写入27条消息(3条消息丢失,此数字可能会改变).

版本如下:

  • Spark 1.6.0
  • 卡夫卡0.9

Kafka主题配置如下:

  • 经纪人数:3
  • num复制因子:3
  • 分区数量:3

以下是我们用于kafka的属性:

      val props = new Properties() 
      props.put("metadata.broker.list", properties.getProperty("metadataBrokerList")) 
      props.put("auto.offset.reset", properties.getProperty("autoOffsetReset")) 
      props.put("group.id", properties.getProperty("group.id")) 
      props.put("serializer.class", "kafka.serializer.StringEncoder") 
      props.put("outTopicHarmonized", properties.getProperty("outletKafkaTopicHarmonized")) 
      props.put("outTopicUnharmonized", properties.getProperty("outletKafkaTopicUnharmonized")) 
      props.put("acks", "all"); 
      props.put("retries", "5"); 
      props.put("request.required.acks", "-1") 
Run Code Online (Sandbox Code Playgroud)

以下是我们将处理过的消息写入kafka的代码片段:val schemaRdd2 = finalHarmonizedDF.toJSON

      schemaRdd2.foreachPartition { partition => 
        val producerConfig = new ProducerConfig(props) 
        val producer = new Producer[String, String](producerConfig) 

        partition.foreach { row => 
          if (debug) println(row.mkString) 
          val keyedMessage = new KeyedMessage[String, String](props.getProperty("outTopicHarmonized"), 
            null, row.toString()) 
          producer.send(keyedMessage) 

        } 
        //hack, should be done with the flush 
        Thread.sleep(1000) 
        producer.close() 
      } 
Run Code Online (Sandbox Code Playgroud)

我们明确添加了sleep(1000)用于测试目的.但这也没有解决问题:(

任何建议将不胜感激.

Wil*_*ill 0

尝试将batchDuration参数(初始化时StreamingContext)调整为大于每个rdd的处理时间的数字。这解决了我的问题。