如何将csv文件转换为rdd

Ram*_*mya 49 scala apache-spark

我是新来的.我想对CSV记录中的特定数据执行一些操作.

我正在尝试读取CSV文件并将其转换为RDD.我的进一步操作基于CSV文件中提供的标题.

(来自评论)这是我的代码到目前为止:

final JavaRDD<String> File = sc.textFile(Filename).cache();
final JavaRDD<String> lines = File.flatMap(new FlatMapFunction<String, String>() { 
    @Override public Iterable<String> call(String s) { 
    return Arrays.asList(EOL.split(s)); 
    } 
});
final String heading=lines.first().toString();
Run Code Online (Sandbox Code Playgroud)

我可以像这样得到标题值.我想将其映射到CSV文件中的每条记录.

final String[] header=heading.split(" "); 
Run Code Online (Sandbox Code Playgroud)

我可以像这样得到标题值.我想将其映射到CSV文件中的每条记录.

在java中,我CSVReader record.getColumnValue(Column header)用来获取特定的值.我需要在这里做类似的事情.

maa*_*asg 55

一种简单的方法是有一种方法来保留标题.

假设您有一个file.csv,如:

user, topic, hits
om,  scala, 120
daniel, spark, 80
3754978, spark, 1
Run Code Online (Sandbox Code Playgroud)

我们可以定义一个使用第一行解析版本的头类:

class SimpleCSVHeader(header:Array[String]) extends Serializable {
  val index = header.zipWithIndex.toMap
  def apply(array:Array[String], key:String):String = array(index(key))
}
Run Code Online (Sandbox Code Playgroud)

我们可以使用该标头来进一步处理数据:

val csv = sc.textFile("file.csv")  // original file
val data = csv.map(line => line.split(",").map(elem => elem.trim)) //lines in rows
val header = new SimpleCSVHeader(data.take(1)(0)) // we build our header with the first line
val rows = data.filter(line => header(line,"user") != "user") // filter the header out
val users = rows.map(row => header(row,"user")
val usersByHits = rows.map(row => header(row,"user") -> header(row,"hits").toInt)
...
Run Code Online (Sandbox Code Playgroud)

请注意,这header不仅仅是数组索引的助记符的简单映射.几乎所有这些都可以在数组中元素的序数位置完成,比如user = row(0)

PS:欢迎来到Scala :-)


Sam*_*man 16

您可以使用spark-csv库:https://github.com/databricks/spark-csv

这直接来自文档:

import org.apache.spark.sql.SQLContext

SQLContext sqlContext = new SQLContext(sc);

HashMap<String, String> options = new HashMap<String, String>();
options.put("header", "true");
options.put("path", "cars.csv");

DataFrame df = sqlContext.load("com.databricks.spark.csv", options);
Run Code Online (Sandbox Code Playgroud)

  • @javadba我会争辩说这是这里唯一的普遍答案.这里的所有其他尝试都假定您可以在逗号上天真地分割csv,并且仅在一些非常简单的情况下才是真的. (6认同)
  • @ zero323几次尝试后,spark-csv包失败了.我编写了自己的csv解析器.这就是为什么我不是这个答案的忠实粉丝.显然其他人也有更好的运气. (3认同)

sam*_*est 9

首先,我必须说,如果将标题放在单独的文件中,这要简单得多 - 这是大数据中的惯例.

无论如何,丹尼尔的答案非常好,但它效率低下且存在缺陷,所以我要发布自己的答案.低效率是您不需要检查每条记录以查看它是否是标题,您只需要检查每个分区的第一条记录.错误是通过使用.split(",")你可以得到一个异常抛出或当条目是空字符串时出现错误的列并发生在记录的开头或结尾 - 以纠正你需要使用.split(",", -1).所以这是完整的代码:

val header =
  scala.io.Source.fromInputStream(
    hadoop.fs.FileSystem.get(new java.net.URI(filename), sc.hadoopConfiguration)
    .open(new hadoop.fs.Path(path)))
  .getLines.head

val columnIndex = header.split(",").indexOf(columnName)

sc.textFile(path).mapPartitions(iterator => {
  val head = iterator.next()
  if (head == header) iterator else Iterator(head) ++ iterator
})
.map(_.split(",", -1)(columnIndex))
Run Code Online (Sandbox Code Playgroud)

最后一点,如果你只想钓掉某些专栏,可以考虑Parquet.或者,如果您有宽行,至少考虑实施一个延迟评估的拆分函数.


Aja*_*pta 5

我们可以使用新的DataFrameRDD来读取和写入CSV数据.DataFrameRDD优于NormalRDD的优点很少:

  1. DataFrameRDD比NormalRDD快一点,因为我们确定了模式,这有助于在运行时优化很多并为我们提供显着的性能提升.
  2. 即使列以CSV格式移动,它也会自动采用正确的列,因为我们不会对将数据读取为textFile时出现的列号进行硬编码,然后将其拆分然后使用列数来获取数据.
  3. 在几行代码中,您可以直接读取CSV文件.

您将需要拥有此库:在build.sbt中添加它

libraryDependencies += "com.databricks" % "spark-csv_2.10" % "1.2.0"
Run Code Online (Sandbox Code Playgroud)

Spark Scala代码:

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val csvInPath = "/path/to/csv/abc.csv"
val df = sqlContext.read.format("com.databricks.spark.csv").option("header","true").load(csvInPath)
//format is for specifying the type of file you are reading
//header = true indicates that the first line is header in it
Run Code Online (Sandbox Code Playgroud)

通过从中获取一些列来转换为普通RDD

val rddData = df.map(x=>Row(x.getAs("colA")))
//Do other RDD operation on it
Run Code Online (Sandbox Code Playgroud)

将RDD保存为CSV格式:

val aDf = sqlContext.createDataFrame(rddData,StructType(Array(StructField("colANew",StringType,true))))
aDF.write.format("com.databricks.spark.csv").option("header","true").save("/csvOutPath/aCSVOp")
Run Code Online (Sandbox Code Playgroud)

由于标头设置为true,我们将在所有输出文件中获取标头名称.