Dhr*_*jee 7 scala apache-spark spark-streaming
尝试为 spark 数据流 writeStream 函数创建一个测试,如下所示:
SparkSession spark = SparkSession.builder().master("local").appName("spark
session").getOrCreate()
val lakeDF = spark.createDF(List(("hi")), List(("word", StringType, true)))
lakeDF.writeStream
.trigger(Trigger.Once)
.format("parquet")
.option("checkpointLocation", checkpointPath)
.start(dataPath)
Run Code Online (Sandbox Code Playgroud)
但我收到以下异常: org.apache.spark.sql.AnalysisException: 'writeStream' can be called only on streaming Dataset/DataFrame;
我对火花流很陌生,请让我知道如何为我的测试套件创建流数据帧/将上述常规数据帧转换为流数据帧。
在 Spark 结构化流中,数据帧/数据集是使用 SparkSession 上的readStream在流中创建的。如果数据帧/数据集不是使用流创建的,则不允许使用writeStream进行存储。
因此,使用readStream创建数据帧/数据集并使用writeStream存储数据帧/数据集
val kafkaStream = sparkSession.readStream.format("kafka")
.option("kafka.bootstrap.servers", "kafka-broker-hostname:port")
.option("subscribe", "topicname")
.load()
Run Code Online (Sandbox Code Playgroud)
Sam*_*Sam -3
考虑以下示例。
第一个适用于非流式传输的 df。就像你的一样,例如由列表/序列组成的文件或 df。
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save()
Run Code Online (Sandbox Code Playgroud)
第二个适用于流式传输的数据帧。例如从 kafka 或其他一些流媒体源读取。
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
val ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()
Run Code Online (Sandbox Code Playgroud)
要么使用流数据帧,要么像第一个示例一样更改代码。
| 归档时间: |
|
| 查看次数: |
6219 次 |
| 最近记录: |