例外:“writeStream”只能在流式数据集/数据帧上调用

Dhr*_*jee 7 scala apache-spark spark-streaming

尝试为 spark 数据流 writeStream 函数创建一个测试,如下所示:

SparkSession spark = SparkSession.builder().master("local").appName("spark 
session").getOrCreate()

val lakeDF = spark.createDF(List(("hi")), List(("word", StringType, true)))

lakeDF.writeStream
  .trigger(Trigger.Once)
  .format("parquet")
  .option("checkpointLocation", checkpointPath)
  .start(dataPath)
Run Code Online (Sandbox Code Playgroud)

但我收到以下异常: org.apache.spark.sql.AnalysisException: 'writeStream' can be called only on streaming Dataset/DataFrame;

我对火花流很陌生,请让我知道如何为我的测试套件创建流数据帧/将上述常规数据帧转换为流数据帧。

Nag*_*aga 6

在 Spark 结构化流中,数据帧/数据集是使用 SparkSession 上的readStream在流中创建的。如果数据帧/数据集不是使用流创建的,则不允许使用writeStream进行存储。

因此,使用readStream创建数据帧/数据集并使用writeStream存储数据帧/数据集

 val kafkaStream = sparkSession.readStream.format("kafka")
.option("kafka.bootstrap.servers", "kafka-broker-hostname:port")
.option("subscribe", "topicname")
.load()
Run Code Online (Sandbox Code Playgroud)


Sam*_*Sam -3

考虑以下示例。

第一个适用于非流式传输的 df。就像你的一样,例如由列表/序列组成的文件或 df。

// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .save()
Run Code Online (Sandbox Code Playgroud)

第二个适用于流式传输的数据帧。例如从 kafka 或其他一些流媒体源读取。

// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
val ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .start()
Run Code Online (Sandbox Code Playgroud)

要么使用流数据帧,要么像第一个示例一样更改代码。

  • 有没有办法在不使用 Kafka/流框架的情况下创建具有一些硬编码值的流 df ? (4认同)