Spark Dataframe到Kafka

Gur*_*han 2 apache-kafka apache-spark

我正在尝试将Spark Dataframe流式传输到Kafka使用者。我做不到,请您告诉我。

我可以将数据从Kafka生产者选择到Spark,并且已经执行了一些操作。在处理完数据之后,我有兴趣将其流回Kafka(消费者)。

Bri*_*ian 7

这是在流媒体中制作kafka的示例,但批处理版本几乎相同

从源流到kafka:

 val ds = df
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("topic", "topic1")
      .start()
Run Code Online (Sandbox Code Playgroud)

将静态数据帧(不是从源流式传输)写入kafka

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .save()
Run Code Online (Sandbox Code Playgroud)

请记住

  1. 每行将是一条消息。
  2. 该数据帧必须是流数据帧。如果您有静态数据框,则使用静态版本。

看看基本文档:https : //spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html

听起来您有一个静态数据框,而不是从源流式传输的。