我正在将CSV文件(使用spark-csv)导入到DataFrame具有空String值的文件中.应用时OneHotEncoder,应用程序崩溃并出错requirement failed: Cannot have an empty string for name..有没有办法解决这个问题?
val df = sqlContext.createDataFrame(Seq(
(0, "a"),
(1, "b"),
(2, "c"),
(3, ""), //<- original example has "a" here
(4, "a"),
(5, "c")
)).toDF("id", "category")
val indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
.fit(df)
val indexed = indexer.transform(df)
val encoder = new OneHotEncoder()
.setInputCol("categoryIndex")
.setOutputCol("categoryVec")
val encoded = encoder.transform(indexed)
encoded.show()
Run Code Online (Sandbox Code Playgroud)
这很烦人,因为缺失/空值是一种非常普遍的情况.
提前谢谢,Nikhil
scala apache-spark spark-csv apache-spark-ml apache-spark-mllib
dataFrame.coalesce(1).write().save("path")有时仅写入 _SUCCESS 和 ._SUCCESS.crc 文件,即使在非空输入上也没有预期的 *.csv.gzDataFrame
文件保存代码:
private static void writeCsvToDirectory(Dataset<Row> dataFrame, Path directory) {
dataFrame.coalesce(1)
.write()
.format("csv")
.option("header", "true")
.option("delimiter", "\t")
.option("codec", "org.apache.hadoop.io.compress.GzipCodec")
.mode(SaveMode.Overwrite)
.save("file:///" + directory);
}
Run Code Online (Sandbox Code Playgroud)
文件获取代码:
static Path getTemporaryCsvFile(Path directory) throws IOException {
String glob = "*.csv.gz";
try (DirectoryStream<Path> stream = Files.newDirectoryStream(directory, glob)) {
return stream.iterator().next();
} catch (NoSuchElementException e) {
throw new RuntimeException(getNoSuchElementExceptionMessage(directory, glob), e);
}
}
Run Code Online (Sandbox Code Playgroud)
文件获取错误示例:
java.lang.RuntimeException: directory /tmp/temp5889805853850415940 does not contain a file with glob *.csv.gz. Directory listing:
/tmp/temp5889805853850415940/_SUCCESS,
/tmp/temp5889805853850415940/._SUCCESS.crc …Run Code Online (Sandbox Code Playgroud) Spark Version: spark-2.0.1-bin-hadoop2.7
Scala: 2.11.8
我正在将原始csv加载到DataFrame中.在csv中,虽然该列支持日期格式,但它们写成20161025而不是2016-10-25.该参数date_format包括需要转换为yyyy-mm-dd格式的列名称字符串.
在下面的代码,我首先通过加载日期列的CSV作为StringType schema,然后我检查是否date_format是不空的,也就是说有需要被转换为列Date从String,然后使用浇铸每一列unix_timestamp和to_date.但是,在中csv_df.show(),返回的行都是null.
def read_csv(csv_source:String, delimiter:String, is_first_line_header:Boolean,
schema:StructType, date_format:List[String]): DataFrame = {
println("|||| Reading CSV Input ||||")
var csv_df = sqlContext.read
.format("com.databricks.spark.csv")
.schema(schema)
.option("header", is_first_line_header)
.option("delimiter", delimiter)
.load(csv_source)
println("|||| Successfully read CSV. Number of rows -> " + csv_df.count() + " ||||")
if(date_format.length > 0) {
for (i <- 0 until date_format.length) {
csv_df = csv_df.select(to_date(unix_timestamp(
csv_df(date_format(i)), "yyyy-MM-dd").cast("timestamp"))) …Run Code Online (Sandbox Code Playgroud) scala apache-spark apache-spark-sql spark-dataframe spark-csv
下面的代码没有添加默认的双引号。我还尝试使用选项添加 # 和单引号quote,但没有成功。我还使用了quoteModewithALL和NON_NUMERICoptions,输出仍然没有变化。
s2d.coalesce(64).write
.format("com.databricks.spark.csv")
.option("header", "false")
.save(fname)
Run Code Online (Sandbox Code Playgroud)
我可以尝试其他任何选择吗?我使用 spark-csv 2.11 而不是 spark 2.1。
它产生的输出:
d4c354ef,2017-03-14 16:31:33,2017-03-14 16:31:46,104617772177,340618697
Run Code Online (Sandbox Code Playgroud)
我正在寻找的输出:
“d4c354ef”,”2017-03-14 16:31:33”,”2017-03-14 16:31:46”,104617772177,340618697
Run Code Online (Sandbox Code Playgroud) 我使用Spark 2.2.0
我正在读取csv文件,如下所示:
val dataFrame = spark.read.option("inferSchema", "true")
.option("header", true)
.option("dateFormat", "yyyyMMdd")
.csv(pathToCSVFile)
Run Code Online (Sandbox Code Playgroud)
该文件中只有一个日期列,并且所有记录的值都等于20171001该特定列的值。
问题是spark推断出此列的类型integer不是date。当我删除该"inferSchema"选项时,该列的类型为string。
null此文件中没有值,也没有格式错误的行。
此问题的原因/解决方案是什么?
我只是想将文本文件读入 pyspark RDD,我注意到sqlContext.read.load和sqlContext.read.text.
s3_single_file_inpath='s3a://bucket-name/file_name'
indata = sqlContext.read.load(s3_single_file_inpath, format='com.databricks.spark.csv', header='true', inferSchema='false',sep=',')
indata = sqlContext.read.text(s3_single_file_inpath)
Run Code Online (Sandbox Code Playgroud)
sqlContext.read.load上面的命令失败
Py4JJavaError: An error occurred while calling o227.load.
: java.lang.ClassNotFoundException: Failed to find data source: com.databricks.spark.csv. Please find packages at http://spark-packages.org
Run Code Online (Sandbox Code Playgroud)
但是第二个成功了吗?
现在,我对此感到困惑,因为我在网上看到的所有资源都说要使用,sqlContext.read.load包括这个:https : //spark.apache.org/docs/1.6.1/sql-programming-guide.html。
我不清楚何时使用这些中的哪一个。这些有明显的区别吗?
我收到错误消息
\njava.lang.IllegalArgumentException: Schema must be specified when creating a streaming source DataFrame. If some files already exist in the directory, then depending on the file format you may be able to create a static DataFrame on that directory with 'spark.read.load(directory)' and infer schema from it.\n\n at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:251)\n at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:115)\n at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:115)\n at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:35)\n at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:232)\n at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:242)\n at org.apache.spark.sql.streaming.DataStreamReader.csv(DataStreamReader.scala:404)\n at io.sekai.core.streaming.KafkaDataGenerator.readFromCSVFile(KafkaDataGenerator.scala:38)\nRun Code Online (Sandbox Code Playgroud)\n当我加载 csv 文件时
\nspark2\n .readStream\n .format("csv")\n .option("inferSchema", "true")\n .option("header", "true")\n //.schema(schema)\n .option("delimiter", ",")\n .option("maxFilesPerTrigger", 1)\n .csv(path)\n …Run Code Online (Sandbox Code Playgroud) 对于自定义Estimator的transformSchema方法,我需要能够将输入数据帧的模式与案例类中定义的模式进行比较.通常,这可以像从案例类生成Spark StructType/Schema一样执行,如下所述.但是,使用了错误的可空性:
推断出的df的真实模式spark.read.csv().as[MyClass]可能如下所示:
root
|-- CUSTOMER_ID: integer (nullable = false)
Run Code Online (Sandbox Code Playgroud)
案例类:
case class MySchema(CUSTOMER_ID: Int)
Run Code Online (Sandbox Code Playgroud)
比较我使用:
val rawSchema = ScalaReflection.schemaFor[MySchema].dataType.asInstanceOf[StructType]
if (!rawSchema.equals(rawDf.schema))
Run Code Online (Sandbox Code Playgroud)
不幸的是,这总是产生false,因为从case类手动推断的新模式设置为可为空true(因为ja java.Integer实际上可能为null)
root
|-- CUSTOMER_ID: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)
如何nullable = false在创建架构时指定?
apache-spark apache-spark-sql spark-csv apache-spark-ml apache-spark-dataset
我正在尝试将UUID列添加到我的数据集中。
getDataset(Transaction.class)).withColumn("uniqueId", functions.lit(UUID.randomUUID().toString())).show(false);
Run Code Online (Sandbox Code Playgroud)
但是结果是所有行都具有相同的UUID。我如何使其独特?
+-----------------------------------+
uniqueId |
+----------------+-------+-----------
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
----------+----------------+--------+
Run Code Online (Sandbox Code Playgroud)