Khu*_*mar 1 apache-spark rdd spark-dataframe
我有一个文件是 file1snappy.parquet。它有一个复杂的数据结构,比如地图,里面的数组。处理后我得到了最终结果。在将结果写入 csv 时,我收到一些错误消息
"Exception in thread "main" java.lang.UnsupportedOperationException: CSV data source does not support map<string,bigint> data type."
Run Code Online (Sandbox Code Playgroud)
我使用过的代码:
val conf=new SparkConf().setAppName("student-example").setMaster("local")
val sc = new SparkContext(conf)
val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
val datadf = sqlcontext.read.parquet("C:\\file1.snappy.parquet")
def sumaggr=udf((aggr: Map[String, collection.mutable.WrappedArray[Long]]) => if (aggr.keySet.contains("aggr")) aggr("aggr").sum else 0)
datadf.select(col("neid"),sumaggr(col("marks")).as("sum")).filter(col("sum") =!= 0).show(false)
datadf.write.format("com.databricks.spark.csv").option("header", "true").save("C:\\myfile.csv")
Run Code Online (Sandbox Code Playgroud)
我尝试转换 datadf.toString() 但我仍然面临同样的问题。如何将该结果写入 CSV。
SparkCSV源仅支持原子类型。您不能存储任何非原子列
我认为最好是为具有map<string,bigint>作为数据类型的列创建一个 JSON ,并将其保存在 csv 中,如下所示。
import spark.implicits._
import org.apache.spark.sql.functions._
datadf.withColumn("column_name_with_map_type", to_json(struct($"column_name_with_map_type"))).write.csv("outputpath")
Run Code Online (Sandbox Code Playgroud)
希望这可以帮助!