如何在Kafka Direct Stream中使用Spark Structured Streaming?

Ser*_*eyB 11 scala apache-kafka apache-spark spark-structured-streaming

我遇到了使用Spark的Structured Streaming,它有一个连续消耗S3存储桶并将处理结果写入MySQL数据库的示例.

// Read data continuously from an S3 location
val inputDF = spark.readStream.json("s3://logs")

// Do operations using the standard DataFrame API and write to MySQL
inputDF.groupBy($"action", window($"time", "1 hour")).count()
       .writeStream.format("jdbc")
       .start("jdbc:mysql//...")
Run Code Online (Sandbox Code Playgroud)

如何在Spark Kafka Streaming中使用它

val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)
Run Code Online (Sandbox Code Playgroud)

有没有办法结合这两个例子而不使用stream.foreachRDD(rdd => {})

Yuv*_*kov 12

有没有办法结合这两个例子而不使用 stream.foreachRDD(rdd => {})

还没.Spark 2.0.0没有Kafka sink支持结构化流.根据Tathagata Das,Spark Streaming的创建者之一,这个功能应该在Spark 2.1.0中出现. 以下是相关的JIRA问题.

编辑:(29/11/2018)

是的,可以使用Spark 2.2版开始.

stream
  .writeStream // use `write` for batch, like DataFrame
  .format("kafka")
  .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
  .option("topic", "target-topic1")
  .start()
Run Code Online (Sandbox Code Playgroud)

查看此SO帖子(使用Spark流媒体读取和写入Kafka主题)了解更多信息.

编辑:(2016年12月6日)

结构化流的Kafka 0.10集成现在在Spark 2.0.2中支持expiramentaly:

val ds1 = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

ds1
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
Run Code Online (Sandbox Code Playgroud)