Ram*_*mya 49 scala apache-spark
我是新来的.我想对CSV记录中的特定数据执行一些操作.
我正在尝试读取CSV文件并将其转换为RDD.我的进一步操作基于CSV文件中提供的标题.
(来自评论)这是我的代码到目前为止:
final JavaRDD<String> File = sc.textFile(Filename).cache();
final JavaRDD<String> lines = File.flatMap(new FlatMapFunction<String, String>() {
@Override public Iterable<String> call(String s) {
return Arrays.asList(EOL.split(s));
}
});
final String heading=lines.first().toString();
Run Code Online (Sandbox Code Playgroud)
我可以像这样得到标题值.我想将其映射到CSV文件中的每条记录.
final String[] header=heading.split(" ");
Run Code Online (Sandbox Code Playgroud)
我可以像这样得到标题值.我想将其映射到CSV文件中的每条记录.
在java中,我CSVReader record.getColumnValue(Column header)
用来获取特定的值.我需要在这里做类似的事情.
maa*_*asg 55
一种简单的方法是有一种方法来保留标题.
假设您有一个file.csv,如:
user, topic, hits
om, scala, 120
daniel, spark, 80
3754978, spark, 1
Run Code Online (Sandbox Code Playgroud)
我们可以定义一个使用第一行解析版本的头类:
class SimpleCSVHeader(header:Array[String]) extends Serializable {
val index = header.zipWithIndex.toMap
def apply(array:Array[String], key:String):String = array(index(key))
}
Run Code Online (Sandbox Code Playgroud)
我们可以使用该标头来进一步处理数据:
val csv = sc.textFile("file.csv") // original file
val data = csv.map(line => line.split(",").map(elem => elem.trim)) //lines in rows
val header = new SimpleCSVHeader(data.take(1)(0)) // we build our header with the first line
val rows = data.filter(line => header(line,"user") != "user") // filter the header out
val users = rows.map(row => header(row,"user")
val usersByHits = rows.map(row => header(row,"user") -> header(row,"hits").toInt)
...
Run Code Online (Sandbox Code Playgroud)
请注意,这header
不仅仅是数组索引的助记符的简单映射.几乎所有这些都可以在数组中元素的序数位置完成,比如user = row(0)
PS:欢迎来到Scala :-)
Sam*_*man 16
您可以使用spark-csv库:https://github.com/databricks/spark-csv
这直接来自文档:
import org.apache.spark.sql.SQLContext
SQLContext sqlContext = new SQLContext(sc);
HashMap<String, String> options = new HashMap<String, String>();
options.put("header", "true");
options.put("path", "cars.csv");
DataFrame df = sqlContext.load("com.databricks.spark.csv", options);
Run Code Online (Sandbox Code Playgroud)
首先,我必须说,如果将标题放在单独的文件中,这要简单得多 - 这是大数据中的惯例.
无论如何,丹尼尔的答案非常好,但它效率低下且存在缺陷,所以我要发布自己的答案.低效率是您不需要检查每条记录以查看它是否是标题,您只需要检查每个分区的第一条记录.错误是通过使用.split(",")
你可以得到一个异常抛出或当条目是空字符串时出现错误的列并发生在记录的开头或结尾 - 以纠正你需要使用.split(",", -1)
.所以这是完整的代码:
val header =
scala.io.Source.fromInputStream(
hadoop.fs.FileSystem.get(new java.net.URI(filename), sc.hadoopConfiguration)
.open(new hadoop.fs.Path(path)))
.getLines.head
val columnIndex = header.split(",").indexOf(columnName)
sc.textFile(path).mapPartitions(iterator => {
val head = iterator.next()
if (head == header) iterator else Iterator(head) ++ iterator
})
.map(_.split(",", -1)(columnIndex))
Run Code Online (Sandbox Code Playgroud)
最后一点,如果你只想钓掉某些专栏,可以考虑Parquet.或者,如果您有宽行,至少考虑实施一个延迟评估的拆分函数.
我们可以使用新的DataFrameRDD来读取和写入CSV数据.DataFrameRDD优于NormalRDD的优点很少:
您将需要拥有此库:在build.sbt中添加它
libraryDependencies += "com.databricks" % "spark-csv_2.10" % "1.2.0"
Run Code Online (Sandbox Code Playgroud)
Spark Scala代码:
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val csvInPath = "/path/to/csv/abc.csv"
val df = sqlContext.read.format("com.databricks.spark.csv").option("header","true").load(csvInPath)
//format is for specifying the type of file you are reading
//header = true indicates that the first line is header in it
Run Code Online (Sandbox Code Playgroud)
通过从中获取一些列来转换为普通RDD
val rddData = df.map(x=>Row(x.getAs("colA")))
//Do other RDD operation on it
Run Code Online (Sandbox Code Playgroud)
将RDD保存为CSV格式:
val aDf = sqlContext.createDataFrame(rddData,StructType(Array(StructField("colANew",StringType,true))))
aDF.write.format("com.databricks.spark.csv").option("header","true").save("/csvOutPath/aCSVOp")
Run Code Online (Sandbox Code Playgroud)
由于标头设置为true,我们将在所有输出文件中获取标头名称.
归档时间: |
|
查看次数: |
94296 次 |
最近记录: |