Gur*_*han 2 apache-kafka apache-spark
我正在尝试将Spark Dataframe流式传输到Kafka使用者。我做不到,请您告诉我。
我可以将数据从Kafka生产者选择到Spark,并且已经执行了一些操作。在处理完数据之后,我有兴趣将其流回Kafka(消费者)。
这是在流媒体中制作kafka的示例,但批处理版本几乎相同
从源流到kafka:
val ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()
Run Code Online (Sandbox Code Playgroud)
将静态数据帧(不是从源流式传输)写入kafka
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save()
Run Code Online (Sandbox Code Playgroud)
请记住
看看基本文档:https : //spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html
听起来您有一个静态数据框,而不是从源流式传输的。
| 归档时间: |
|
| 查看次数: |
3713 次 |
| 最近记录: |