标签: spark-csv

Spark DataFrame在OneHotEncoder中处理空字符串

我正在将CSV文件(使用spark-csv)导入到DataFrame具有空String值的文件中.应用时OneHotEncoder,应用程序崩溃并出错requirement failed: Cannot have an empty string for name..有没有办法解决这个问题?

我可以在Spark ml页面上提供示例中重现错误:

val df = sqlContext.createDataFrame(Seq(
  (0, "a"),
  (1, "b"),
  (2, "c"),
  (3, ""),         //<- original example has "a" here
  (4, "a"),
  (5, "c")
)).toDF("id", "category")

val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")
  .fit(df)
val indexed = indexer.transform(df)

val encoder = new OneHotEncoder()
  .setInputCol("categoryIndex")
  .setOutputCol("categoryVec")
val encoded = encoder.transform(indexed)

encoded.show()
Run Code Online (Sandbox Code Playgroud)

这很烦人,因为缺失/空值是一种非常普遍的情况.

提前谢谢,Nikhil

scala apache-spark spark-csv apache-spark-ml apache-spark-mllib

6
推荐指数
2
解决办法
6294
查看次数

Spark-csv 不会将 DataFrame 保存到文件时有解释吗?

dataFrame.coalesce(1).write().save("path")有时仅写入 _SUCCESS 和 ._SUCCESS.crc 文件,即使在非空输入上也没有预期的 *.csv.gzDataFrame

文件保存代码:

private static void writeCsvToDirectory(Dataset<Row> dataFrame, Path directory) {
    dataFrame.coalesce(1)
            .write()
            .format("csv")
            .option("header", "true")
            .option("delimiter", "\t")
            .option("codec", "org.apache.hadoop.io.compress.GzipCodec")
            .mode(SaveMode.Overwrite)
            .save("file:///" + directory);
}

Run Code Online (Sandbox Code Playgroud)

文件获取代码:

static Path getTemporaryCsvFile(Path directory) throws IOException {
    String glob = "*.csv.gz";
    try (DirectoryStream<Path> stream = Files.newDirectoryStream(directory, glob)) {
        return stream.iterator().next();
    } catch (NoSuchElementException e) {
        throw new RuntimeException(getNoSuchElementExceptionMessage(directory, glob), e);
    }
}
Run Code Online (Sandbox Code Playgroud)

文件获取错误示例:

java.lang.RuntimeException: directory /tmp/temp5889805853850415940 does not contain a file with glob *.csv.gz. Directory listing:
    /tmp/temp5889805853850415940/_SUCCESS, 
    /tmp/temp5889805853850415940/._SUCCESS.crc …
Run Code Online (Sandbox Code Playgroud)

apache-spark spark-csv

6
推荐指数
1
解决办法
4159
查看次数

Scala:Spark SQL to_date(unix_timestamp)返回NULL

Spark Version: spark-2.0.1-bin-hadoop2.7 Scala: 2.11.8

我正在将原始csv加载到DataFrame中.在csv中,虽然该列支持日期格式,但它们写成20161025而不是2016-10-25.该参数date_format包括需要转换为yyyy-mm-dd格式的列名称字符串.

在下面的代码,我首先通过加载日期列的CSV作为StringType schema,然后我检查是否date_format是不空的,也就是说有需要被转换为列DateString,然后使用浇铸每一列unix_timestampto_date.但是,在中csv_df.show(),返回的行都是null.

def read_csv(csv_source:String, delimiter:String, is_first_line_header:Boolean, 
    schema:StructType, date_format:List[String]): DataFrame = {
    println("|||| Reading CSV Input ||||")

    var csv_df = sqlContext.read
        .format("com.databricks.spark.csv")
        .schema(schema)
        .option("header", is_first_line_header)
        .option("delimiter", delimiter)
        .load(csv_source)
    println("|||| Successfully read CSV. Number of rows -> " + csv_df.count() + " ||||")
    if(date_format.length > 0) {
        for (i <- 0 until date_format.length) {
            csv_df = csv_df.select(to_date(unix_timestamp(
                csv_df(date_format(i)), "yyyy-­MM-­dd").cast("timestamp"))) …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql spark-dataframe spark-csv

5
推荐指数
1
解决办法
2万
查看次数

如何使用引用的所有字段保存 CSV?

下面的代码没有添加默认的双引号。我还尝试使用选项添加 # 和单引号quote,但没有成功。我还使用了quoteModewithALLNON_NUMERICoptions,输出仍然没有变化。

s2d.coalesce(64).write
  .format("com.databricks.spark.csv")
  .option("header", "false")
  .save(fname)
Run Code Online (Sandbox Code Playgroud)

我可以尝试其他任何选择吗?我使用 spark-csv 2.11 而不是 spark 2.1。

它产生的输出:

d4c354ef,2017-03-14 16:31:33,2017-03-14 16:31:46,104617772177,340618697
Run Code Online (Sandbox Code Playgroud)

我正在寻找的输出:

“d4c354ef”,”2017-03-14 16:31:33”,”2017-03-14 16:31:46”,104617772177,340618697  
Run Code Online (Sandbox Code Playgroud)

scala apache-spark spark-csv

5
推荐指数
1
解决办法
1万
查看次数

如何强制CSV的inferSchema将整数视为日期(使用“ dateFormat”选项)?

我使用Spark 2.2.0

我正在读取csv文件,如下所示:

val dataFrame = spark.read.option("inferSchema", "true")
                          .option("header", true)
                          .option("dateFormat", "yyyyMMdd")
                          .csv(pathToCSVFile)
Run Code Online (Sandbox Code Playgroud)

该文件中只有一个日期列,并且所有记录的值都等于20171001该特定列的值。

问题是spark推断出此列的类型integer不是date。当我删除该"inferSchema"选项时,该列的类型为string

null此文件中没有值,也没有格式错误的行。

此问题的原因/解决方案是什么?

dataframe apache-spark apache-spark-sql spark-csv

5
推荐指数
1
解决办法
4210
查看次数

为什么 sqlContext.read.load 和 sqlContext.read.text 有区别?

我只是想将文本文件读入 pyspark RDD,我注意到sqlContext.read.loadsqlContext.read.text.

s3_single_file_inpath='s3a://bucket-name/file_name'

indata = sqlContext.read.load(s3_single_file_inpath, format='com.databricks.spark.csv', header='true', inferSchema='false',sep=',')
indata = sqlContext.read.text(s3_single_file_inpath)
Run Code Online (Sandbox Code Playgroud)

sqlContext.read.load上面的命令失败

Py4JJavaError: An error occurred while calling o227.load.
: java.lang.ClassNotFoundException: Failed to find data source: com.databricks.spark.csv. Please find packages at http://spark-packages.org
Run Code Online (Sandbox Code Playgroud)

但是第二个成功了吗?

现在,我对此感到困惑,因为我在网上看到的所有资源都说要使用,sqlContext.read.load包括这个:https : //spark.apache.org/docs/1.6.1/sql-programming-guide.html

我不清楚何时使用这些中的哪一个。这些有明显的区别吗?

apache-spark apache-spark-sql pyspark spark-csv

5
推荐指数
1
解决办法
7933
查看次数

inferSchema=true 不适用于读取 Spark 结构化流的 csv 文件

我收到错误消息

\n
java.lang.IllegalArgumentException: Schema must be specified when creating a streaming source DataFrame. If some files already exist in the directory, then depending on the file format you may be able to create a static DataFrame on that directory with 'spark.read.load(directory)' and infer schema from it.\n\n    at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:251)\n    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:115)\n    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:115)\n    at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:35)\n    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:232)\n    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:242)\n    at org.apache.spark.sql.streaming.DataStreamReader.csv(DataStreamReader.scala:404)\n    at io.sekai.core.streaming.KafkaDataGenerator.readFromCSVFile(KafkaDataGenerator.scala:38)\n
Run Code Online (Sandbox Code Playgroud)\n

当我加载 csv 文件时

\n
spark2\n  .readStream\n  .format("csv")\n  .option("inferSchema", "true")\n  .option("header", "true")\n  //.schema(schema)\n  .option("delimiter", ",")\n  .option("maxFilesPerTrigger", 1)\n  .csv(path)\n …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark spark-csv spark-structured-streaming

5
推荐指数
1
解决办法
4713
查看次数

Spark - CSV - 写入选项 - 报价

希望每个人都做得很好。

在查看问题的Spark csv 数据源选项时,我对可用的各种报价相关选项之间的差异感到非常困惑。

Spark-csv-报价选项

  1. 它们之间有什么详细的区别吗?
  2. 是否有一个选项会覆盖另一个选项,或者它们都一起工作?

使用链接问题中提到的示例来理解差异,但仍然有点困惑。感谢您的所有帮助。

csv apache-spark spark-csv databricks

5
推荐指数
1
解决办法
5123
查看次数

来自案例类的Spark模式具有正确的可空性

对于自定义Estimator的transformSchema方法,我需要能够将输入数据帧的模式与案例类中定义的模式进行比较.通常,这可以像从案例类生成Spark StructType/Schema一样执行,如下所述.但是,使用了错误的可空性:

推断出的df的真实模式spark.read.csv().as[MyClass]可能如下所示:

root
 |-- CUSTOMER_ID: integer (nullable = false)
Run Code Online (Sandbox Code Playgroud)

案例类:

case class MySchema(CUSTOMER_ID: Int)
Run Code Online (Sandbox Code Playgroud)

比较我使用:

val rawSchema = ScalaReflection.schemaFor[MySchema].dataType.asInstanceOf[StructType]
  if (!rawSchema.equals(rawDf.schema))
Run Code Online (Sandbox Code Playgroud)

不幸的是,这总是产生false,因为从case类手动推断的新模式设置为可为空true(因为ja java.Integer实际上可能为null)

root
 |-- CUSTOMER_ID: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)

如何nullable = false在创建架构时指定?

apache-spark apache-spark-sql spark-csv apache-spark-ml apache-spark-dataset

4
推荐指数
1
解决办法
4110
查看次数

将UUID添加到Spark数据集

我正在尝试将UUID列添加到我的数据集中。

getDataset(Transaction.class)).withColumn("uniqueId", functions.lit(UUID.randomUUID().toString())).show(false);
Run Code Online (Sandbox Code Playgroud)

但是结果是所有行都具有相同的UUID。我如何使其独特?

+-----------------------------------+
uniqueId                            |
+----------------+-------+-----------
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
----------+----------------+--------+
Run Code Online (Sandbox Code Playgroud)

apache-spark spark-csv apache-spark-dataset

3
推荐指数
1
解决办法
4025
查看次数