djW*_*ann 3 scala apache-spark apache-spark-sql
我正在使用具有以下结构的Spark结构化流读取流:
col1
col2
col3
Run Code Online (Sandbox Code Playgroud)
经过一些转换后,我想将数据帧以json格式写入控制台。我正在尝试以下方法:
df.select(to_json($"*"))
.writeStream
.outputMode("append")
.format("console")
.start()
Run Code Online (Sandbox Code Playgroud)
但是我明白了 Invalid usage of '*' in expression 'structstojson';
有没有一种方法可以将所有行连接到同一列中以便使用to_json?
预期的输出是一个数据框,其中的一列在每一行上都有json数据:
{"col1":"val11","col2":"val12","col3":"val13"}
{"col1":"val21","col2":"val22","col3":"val23"}
Run Code Online (Sandbox Code Playgroud)
to_json 具有以下定义:
def to_json(e: org.apache.spark.sql.Column): org.apache.spark.sql.Column
def to_json(e: org.apache.spark.sql.Column,options: java.util.Map[String,String]): org.apache.spark.sql.Column
def to_json(e: org.apache.spark.sql.Column,options: Map[String,String]): org.apache.spark.sql.Column
Run Code Online (Sandbox Code Playgroud)
这是我们的数据框:
df.show
+----+----+----+
|col1|col2|col3|
+----+----+----+
| a| b| c|
| d| e| f|
+----+----+----+
Run Code Online (Sandbox Code Playgroud)
您需要创建一个struct,然后调用to_json它。就像是 :
df.select(to_json( struct( df.columns.map(col(_)):_* ) ) as "json").show(false)
+----------------------------------+
|json |
+----------------------------------+
|{"col1":"a","col2":"b","col3":"c"}|
|{"col1":"d","col2":"e","col3":"f"}|
+----------------------------------+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1414 次 |
| 最近记录: |