Spark Streaming - 读写Kafka主题

Cho*_*eat 33 scala apache-kafka apache-spark spark-streaming spark-streaming-kafka

我正在使用Spark Streaming处理两个Kafka队列之间的数据,但我似乎找不到从Spark写Kafka的好方法.我试过这个:

input.foreachRDD(rdd =>
  rdd.foreachPartition(partition =>
    partition.foreach {
      case x: String => {
        val props = new HashMap[String, Object]()

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")

        println(x)
        val producer = new KafkaProducer[String, String](props)
        val message = new ProducerRecord[String, String]("output", null, x)
        producer.send(message)
      }
    }
  )
)
Run Code Online (Sandbox Code Playgroud)

并且它按预期工作,但是为每个消息实例化一个新的KafkaProducer在真实环境中显然是不可行的,我正在尝试解决它.

我想为每个进程保留一个实例的引用,并在需要发送消息时访问它.如何从Spark Streaming写入Kafka?

Mic*_*oll 29

是的,不幸的是Spark(1.x,2.x)并没有直截了当地如何以有效的方式写信给Kafka.

我建议采用以下方法:

  • KafkaProducer每个执行程序进程/ JVM 使用(并重用)一个实例.

以下是此方法的高级设置:

  1. 首先,你必须"包装"卡夫卡,KafkaProducer因为正如你所提到的,它不是可序列化的.包装它允许您将其"运送"给执行者.这里的关键思想是使用a lazy val来延迟实例化生成器直到它第一次使用,这实际上是一种解决方法,因此您不必担心KafkaProducer不可序列化.
  2. 您可以使用广播变量将包装的生成器"运送"到每个执行程序.
  3. 在实际的处理逻辑中,您可以通过广播变量访问包装的生产者,并使用它将处理结果写回Kafka.

下面的代码片段与Spark 2.0的Spark Streaming一起使用.

第1步:包装 KafkaProducer

import java.util.concurrent.Future

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}

class MySparkKafkaProducer[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {

  /* This is the key idea that allows us to work around running into
     NotSerializableExceptions. */
  lazy val producer = createProducer()

  def send(topic: String, key: K, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, key, value))

  def send(topic: String, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, value))

}

object MySparkKafkaProducer {

  import scala.collection.JavaConversions._

  def apply[K, V](config: Map[String, Object]): MySparkKafkaProducer[K, V] = {
    val createProducerFunc = () => {
      val producer = new KafkaProducer[K, V](config)

      sys.addShutdownHook {
        // Ensure that, on executor JVM shutdown, the Kafka producer sends
        // any buffered messages to Kafka before shutting down.
        producer.close()
      }

      producer
    }
    new MySparkKafkaProducer(createProducerFunc)
  }

  def apply[K, V](config: java.util.Properties): MySparkKafkaProducer[K, V] = apply(config.toMap)

}
Run Code Online (Sandbox Code Playgroud)

第2步:使用广播变量为每个执行程序提供自己的包装KafkaProducer实例

import org.apache.kafka.clients.producer.ProducerConfig

val ssc: StreamingContext = {
  val sparkConf = new SparkConf().setAppName("spark-streaming-kafka-example").setMaster("local[2]")
  new StreamingContext(sparkConf, Seconds(1))
}

ssc.checkpoint("checkpoint-directory")

val kafkaProducer: Broadcast[MySparkKafkaProducer[Array[Byte], String]] = {
  val kafkaProducerConfig = {
    val p = new Properties()
    p.setProperty("bootstrap.servers", "broker1:9092")
    p.setProperty("key.serializer", classOf[ByteArraySerializer].getName)
    p.setProperty("value.serializer", classOf[StringSerializer].getName)
    p
  }
  ssc.sparkContext.broadcast(MySparkKafkaProducer[Array[Byte], String](kafkaProducerConfig))
}
Run Code Online (Sandbox Code Playgroud)

第3步:从Spark Streaming写入Kafka,重新使用相同的包装KafkaProducer实例(对于每个执行程序)

import java.util.concurrent.Future
import org.apache.kafka.clients.producer.RecordMetadata

val stream: DStream[String] = ???
stream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val metadata: Stream[Future[RecordMetadata]] = partitionOfRecords.map { record =>
      kafkaProducer.value.send("my-output-topic", record)
    }.toStream
    metadata.foreach { metadata => metadata.get() }
  }
}
Run Code Online (Sandbox Code Playgroud)

希望这可以帮助.


Mar*_*ier 19

我的第一个建议是尝试在foreachPartition中创建一个新实例,并测量它是否足够快以满足您的需求(在foreachPartition中实例化重型对象是官方文档所建议的).

另一种选择是使用对象池,如下例所示:

https://github.com/miguno/kafka-storm-starter/blob/develop/src/main/scala/com/miguno/kafkastorm/kafka/PooledKafkaProducerAppFactory.scala

然而,我发现使用检查点时很难实现.

另一个适合我的版本是如下文博客文章中所述的工厂,您只需检查它是否提供了足够的并行性以满足您的需求(请查看评论部分):

http://allegro.tech/2015/08/spark-kafka-integration.html

  • 如果我们使用固定数量的RDD,`foreachPartition`会很好,但是在Spark Streaming(我们有微批处理)中,RDD是永久创建的,分区也是如此.如何在Spark Streaming中规避这一点? (3认同)

maa*_*asg 8

有一个由Cloudera维护的Streaming Kafka Writer(实际上是从Spark JIRA [1]中分离出来的).它基本上为每个分区创建一个生产者,它分摊了在(希望很大的)元素集合上创建"重"对象所花费的时间.

可以在这里找到Writer:https://github.com/cloudera/spark-kafka-writer

  • 404找不到该项目,已删除?https://github.com/cloudera/spark-kafka-writer (2认同)

mrs*_*vas 8

使用Spark> = 2.2

无论读写操作都是可能的卡夫卡使用结构化流API

从Kafka主题构建流

// Subscribe to a topic and read messages from the earliest to latest offsets
val ds= spark
  .readStream // use `read` for batch, like DataFrame
  .format("kafka")
  .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
  .option("subscribe", "source-topic1")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
Run Code Online (Sandbox Code Playgroud)

阅读键和值,并对两者都应用架构,为简单起见,我们正在将它们都转换为String类型。

val dsStruc = ds.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
Run Code Online (Sandbox Code Playgroud)

由于dsStruc有模式,它是接受所有的SQL样操作,例如filteraggselect...等就可以了。

将流写入Kafka主题

dsStruc
  .writeStream // use `write` for batch, like DataFrame
  .format("kafka")
  .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
  .option("topic", "target-topic1")
  .start()
Run Code Online (Sandbox Code Playgroud)

Kafka集成的更多配置,可读写

要添加到应用程序中的关键工件

 "org.apache.spark" % "spark-core_2.11" % 2.2.0,
 "org.apache.spark" % "spark-streaming_2.11" % 2.2.0,
 "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % 2.2.0,
Run Code Online (Sandbox Code Playgroud)


gca*_*ari 7

我遇到了同样的问题并发现了这篇文章.

作者通过为每个执行者创建1个生成器来解决问题.他只发送一个"配方",而不是发送生产者本身,如何通过广播来在执行者中创建生产者.

    val kafkaSink = sparkContext.broadcast(KafkaSink(conf))
Run Code Online (Sandbox Code Playgroud)

他使用了一个懒洋洋地创建生产者的包装器:

    class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {

      lazy val producer = createProducer()

      def send(topic: String, value: String): Unit = producer.send(new     ProducerRecord(topic, value))
    }


    object KafkaSink {
      def apply(config: Map[String, Object]): KafkaSink = {
        val f = () => {
          val producer = new KafkaProducer[String, String](config)

          sys.addShutdownHook {
            producer.close()
          }

          producer
        }
        new KafkaSink(f)
      }
    }
Run Code Online (Sandbox Code Playgroud)

包装器是可序列化的,因为Kafka生成器在首次使用执行器之前初始化.驱动程序保留对包装器的引用,包装器使用每个执行程序的生成器发送消息:

    dstream.foreachRDD { rdd =>
      rdd.foreach { message =>
        kafkaSink.value.send("topicName", message)
      }
    }
Run Code Online (Sandbox Code Playgroud)