How to Convert DataSet<Row> to DataSet of JSON messages to write to Kafka?

use*_*400 2 java apache-kafka apache-spark apache-spark-sql spark-structured-streaming

I use Spark 2.1.1.

I have the following DataSet<Row> ds1;

 name   | ratio | count  // column names
"hello" |  1.56 | 34 
Run Code Online (Sandbox Code Playgroud)

(ds1.isStreaming gives true)

and I am trying to generate DataSet<String> ds2. other words when I write to a kafka sink I want to write something like this

{"name": "hello", "ratio": 1.56, "count": 34}
Run Code Online (Sandbox Code Playgroud)

I have tried something like this df2.toJSON().writeStream().foreach(new KafkaSink()).start() but then it gives the following error

Queries with streaming sources must be executed with writeStream.start()
Run Code Online (Sandbox Code Playgroud)

There are to_json and json_tuple however I am not sure how to leverage them here ?


I tried the following using json_tuple() function

 Dataset<String> df4 = df3.select(json_tuple(new Column("result"), " name", "ratio", "count")).as(Encoders.STRING());
Run Code Online (Sandbox Code Playgroud)

and I get the following error:

cannot resolve 'result' given input columns: [name, ratio, count];;

Jac*_*ski 5

tl;dr使用struct函数后跟to_jsontoJSON由于SPARK-17029仅在20 天前得到修复,流数据集已被破坏)。


引用struct的 scaladoc :

struct(colName: String, colNames: String*): Column创建一个由多个输入列组成的新结构列。

鉴于您使用 Java API,您也有 4 种不同的struct函数变

public static Column struct(Column... cols)创建一个新的结构列。

使用to_json函数可以涵盖您的情况:

public static Column to_json(Column e)将包含 StructType 的列转换为具有指定架构的 JSON 字符串。

以下是 Scala 代码(将其转换为 Java 是您的家庭练习):

val ds1 = Seq(("hello", 1.56, 34)).toDF("name", "ratio", "count")
val recordCol = to_json(struct("name", "ratio", "count")) as "record"
scala> ds1.select(recordCol).show(truncate = false)
+----------------------------------------+
|record                                  |
+----------------------------------------+
|{"name":"hello","ratio":1.56,"count":34}|
+----------------------------------------+
Run Code Online (Sandbox Code Playgroud)

我还尝试了您的解决方案(使用今天构建的 Spark 2.3.0-SNAPSHOT),它似乎完美运行。

val fromKafka = spark.
  readStream.
  format("kafka").
  option("subscribe", "topic1").
  option("kafka.bootstrap.servers", "localhost:9092").
  load.
  select('value cast "string")
fromKafka.
  toJSON. // <-- JSON conversion
  writeStream.
  format("console"). // using console sink
  start
Run Code Online (Sandbox Code Playgroud)

format("kafka")SPARK-19719 中添加,在 2.1.0 中不可用。