我使用spark-csv将数据加载到DataFrame中.我想做一个简单的查询并显示内容:
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("my.csv")
df.registerTempTable("tasks")
results = sqlContext.sql("select col from tasks");
results.show()
Run Code Online (Sandbox Code Playgroud)
col似乎被截断了:
scala> results.show();
+--------------------+
| col|
+--------------------+
|2015-11-16 07:15:...|
|2015-11-16 07:15:...|
|2015-11-16 07:15:...|
|2015-11-16 07:15:...|
|2015-11-16 07:15:...|
|2015-11-16 07:15:...|
|2015-11-16 07:15:...|
|2015-11-16 07:15:...|
|2015-11-16 07:15:...|
|2015-11-16 07:15:...|
|2015-11-16 07:15:...|
|2015-11-16 07:15:...|
|2015-11-16 07:15:...|
|2015-11-16 07:15:...|
|2015-11-16 07:15:...|
|2015-11-06 07:15:...|
|2015-11-16 07:15:...|
|2015-11-16 07:21:...|
|2015-11-16 07:21:...|
|2015-11-16 07:21:...|
+--------------------+
Run Code Online (Sandbox Code Playgroud)
如何显示列的完整内容?
我正在使用https://github.com/databricks/spark-csv,我正在尝试编写单个CSV,但不能,它正在创建一个文件夹.
需要一个Scala函数,它将获取路径和文件名等参数并写入该CSV文件.
我试图将csv文件读入数据帧.我知道我的数据帧的架构应该是什么,因为我知道我的csv文件.另外我使用spark csv包来读取文件.我试图指定如下的架构.
val pagecount = sqlContext.read.format("csv")
.option("delimiter"," ").option("quote","")
.option("schema","project: string ,article: string ,requests: integer ,bytes_served: long")
.load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
Run Code Online (Sandbox Code Playgroud)
但是当我检查我创建的数据框架的模式时,它似乎采用了自己的模式.我做错了吗?如何制作火花来接收我提到的架构?
> pagecount.printSchema
root
|-- _c0: string (nullable = true)
|-- _c1: string (nullable = true)
|-- _c2: string (nullable = true)
|-- _c3: string (nullable = true)
Run Code Online (Sandbox Code Playgroud) 如何确定数据框大小?
现在我估计数据帧的实际大小如下:
headers_size = key for key in df.first().asDict()
rows_size = df.map(lambda row: len(value for key, value in row.asDict()).sum()
total_size = headers_size + rows_size
Run Code Online (Sandbox Code Playgroud)
它太慢了,我正在寻找更好的方法.
我在HDFS上有一个很大的分布式文件,每次我使用带有spark-csv包的sqlContext时,它首先加载整个文件,这需要相当长的时间.
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path")
Run Code Online (Sandbox Code Playgroud)
现在因为我只想做一些快速检查,所有我需要的是整个文件的少数/任意n行.
df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path").take(n)
df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path").head(n)
Run Code Online (Sandbox Code Playgroud)
但所有这些都在文件加载完成后运行.我不能在读取文件本身时限制行数吗?我指的是spark-csv中n_rows等效的pandas,如:
pd_df = pandas.read_csv("file_path", nrows=20)
Run Code Online (Sandbox Code Playgroud)
或者可能是火花实际上没有加载文件,第一步,但在这种情况下,为什么我的文件加载步骤需要花费太多时间呢?
我想要
df.count()
Run Code Online (Sandbox Code Playgroud)
只给我n而不是所有的行,是否可能?
非常新的火花和蜂巢和大数据和scala等等.我正在尝试编写一个简单的函数,它接受一个sqlContext,从s3加载一个csv文件并返回一个DataFrame.问题是这个特定的csv使用^ A(即\ 001)字符作为分隔符,数据集很大,所以我不能只对它做一个"s /\001 /,/ g".此外,这些字段可能包含逗号或我可能用作分隔符的其他字符.
我知道我正在使用的spark-csv包有一个分隔符选项,但是我不知道如何设置它以便它将\ 001作为一个字符读取而不是像转义的0,0和1那样.也许我应该使用hiveContext或其他东西?
我有一个看起来像这样的CSV:
+-----------------+-----------------+-----------------+
| Column One | Column Two | Column Three |
+-----------------+-----------------+-----------------+
| This is a value | This is a value | This is a value |
+-----------------+-----------------+-----------------+
| This is a value | This is a value | This is a value |
+-----------------+-----------------+-----------------+
| This is a value | This is a value | This is a value |
+-----------------+-----------------+-----------------+
Run Code Online (Sandbox Code Playgroud)
在纯文本中,它实际上看起来像这样:
Column One,Column Two,Column Three
This is a value,This is a value,This is a value
This …Run Code Online (Sandbox Code Playgroud) 我正在将CSV文件(使用spark-csv)导入到DataFrame具有空String值的文件中.应用时OneHotEncoder,应用程序崩溃并出错requirement failed: Cannot have an empty string for name..有没有办法解决这个问题?
val df = sqlContext.createDataFrame(Seq(
(0, "a"),
(1, "b"),
(2, "c"),
(3, ""), //<- original example has "a" here
(4, "a"),
(5, "c")
)).toDF("id", "category")
val indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
.fit(df)
val indexed = indexer.transform(df)
val encoder = new OneHotEncoder()
.setInputCol("categoryIndex")
.setOutputCol("categoryVec")
val encoded = encoder.transform(indexed)
encoded.show()
Run Code Online (Sandbox Code Playgroud)
这很烦人,因为缺失/空值是一种非常普遍的情况.
提前谢谢,Nikhil
scala apache-spark spark-csv apache-spark-ml apache-spark-mllib
我知道如何使用spark-csv(https://github.com/databricks/spark-csv)将csv文件读入spark ,但我已经将csv文件表示为字符串,并希望将此字符串直接转换为数据帧.这可能吗?
dataFrame.coalesce(1).write().save("path")有时仅写入 _SUCCESS 和 ._SUCCESS.crc 文件,即使在非空输入上也没有预期的 *.csv.gzDataFrame
文件保存代码:
private static void writeCsvToDirectory(Dataset<Row> dataFrame, Path directory) {
dataFrame.coalesce(1)
.write()
.format("csv")
.option("header", "true")
.option("delimiter", "\t")
.option("codec", "org.apache.hadoop.io.compress.GzipCodec")
.mode(SaveMode.Overwrite)
.save("file:///" + directory);
}
Run Code Online (Sandbox Code Playgroud)
文件获取代码:
static Path getTemporaryCsvFile(Path directory) throws IOException {
String glob = "*.csv.gz";
try (DirectoryStream<Path> stream = Files.newDirectoryStream(directory, glob)) {
return stream.iterator().next();
} catch (NoSuchElementException e) {
throw new RuntimeException(getNoSuchElementExceptionMessage(directory, glob), e);
}
}
Run Code Online (Sandbox Code Playgroud)
文件获取错误示例:
java.lang.RuntimeException: directory /tmp/temp5889805853850415940 does not contain a file with glob *.csv.gz. Directory listing:
/tmp/temp5889805853850415940/_SUCCESS,
/tmp/temp5889805853850415940/._SUCCESS.crc …Run Code Online (Sandbox Code Playgroud)