Spark SQL删除空格

Man*_*esh 0 apache-spark spark-streaming apache-spark-sql spark-dataframe apache-spark-mllib

我有一个简单的Spark程序,它读取JSON文件并发出CSV文件.在JSON数据中,值包含前导和尾随空格,当我发出CSV时,前导和尾随空格都消失了.有没有办法可以保留空间.我尝试了很多选项,如ignoreTrailingWhiteSpace,ignoreLeadingWhiteSpace,但没有运气

input.json

{"key" : "k1", "value1": "Good String", "value2": "Good String"}
{"key" : "k1", "value1": "With Spaces      ", "value2": "With Spaces      "}
{"key" : "k1", "value1": "with tab\t", "value2": "with tab\t"}
Run Code Online (Sandbox Code Playgroud)

output.csv

_corrupt_record,key,value1,value2
,k1,Good String,Good String
,k1,With Spaces,With Spaces
,k1,with tab,with tab
Run Code Online (Sandbox Code Playgroud)

expected.csv

_corrupt_record,key,value1,value2
,k1,Good String,Good String
,k1,With Spaces      ,With Spaces      
,k1,with tab\t,with tab\t
Run Code Online (Sandbox Code Playgroud)

我的代码:

public static void main(String[] args) {
    SparkSession sparkSession = SparkSession
            .builder()
            .appName(TestSpark.class.getName())
            .master("local[1]").getOrCreate();

    SparkContext context = sparkSession.sparkContext();
    context.setLogLevel("ERROR");
    SQLContext sqlCtx = sparkSession.sqlContext();
    System.out.println("Spark context established");

    List<StructField> kvFields = new ArrayList<>();
    kvFields.add(DataTypes.createStructField("_corrupt_record", DataTypes.StringType, true));
    kvFields.add(DataTypes.createStructField("key", DataTypes.StringType, true));
    kvFields.add(DataTypes.createStructField("value1", DataTypes.StringType, true));
    kvFields.add(DataTypes.createStructField("value2", DataTypes.StringType, true));
    StructType employeeSchema = DataTypes.createStructType(kvFields);

    Dataset<Row> dataset =
            sparkSession.read()
                    .option("inferSchema", false)
                    .format("json")
                    .schema(employeeSchema)
                    .load("D:\\dev\\workspace\\java\\simple-kafka\\key_value.json");

    dataset.createOrReplaceTempView("sourceView");
    sqlCtx.sql("select * from sourceView")
            .write()
            .option("header", true)
            .format("csv")
            .save("D:\\dev\\workspace\\java\\simple-kafka\\output\\" + UUID.randomUUID().toString());
    sparkSession.close();
}
Run Code Online (Sandbox Code Playgroud)

更新

添加了POM依赖项

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.10</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.22</version>
    </dependency>
</dependencies>
Run Code Online (Sandbox Code Playgroud)

Rob*_*giu 8

CSV编写器默认修剪前导和尾随空格.你可以把它关掉

   sqlCtx.sql("select * from sourceView").write.
       option("header", true).
       option("ignoreLeadingWhiteSpace",false). // you need this
       option("ignoreTrailingWhiteSpace",false). // and this
       format("csv").save("/my/file/location")
Run Code Online (Sandbox Code Playgroud)

这对我有用.如果它对您不起作用,您可以发布您尝试过的内容吗?您使用的是哪种火花版本?如果我没记错的话,他们去年就推出了这个功能.