我使用Spark 1.0.1处理大量数据.每行包含一个ID号,一些包含重复的ID.我想在同一位置保存具有相同ID号的所有行,但我无法有效地执行此操作.我创建了(ID号,数据行)对的RDD [(String,String)]:
val mapRdd = rdd.map{ x=> (x.split("\\t+")(1), x)}
Run Code Online (Sandbox Code Playgroud)
一种有效但不具备性能的方法是收集ID号,过滤每个ID的RDD,并使用与文本文件相同的ID保存值的RDD.
val ids = rdd.keys.distinct.collect
ids.foreach({ id =>
val dataRows = mapRdd.filter(_._1 == id).values
dataRows.saveAsTextFile(id)
})
Run Code Online (Sandbox Code Playgroud)
我还尝试了groupByKey或reduceByKey,以便RDD中的每个元组包含一个唯一的ID号作为键,以及由该ID号的新行分隔的一组组合数据行.我想只使用foreach迭代RDD一次来保存数据,但是它不能将值作为RDD给出
groupedRdd.foreach({ tup =>
val data = sc.parallelize(List(tup._2)) //nested RDD does not work
data.saveAsTextFile(tup._1)
})
Run Code Online (Sandbox Code Playgroud)
基本上,我想通过ID号将RDD拆分为多个RDD,并将该ID号的值保存到它们自己的位置.
我正在尝试将大数据加载到HDFS,我有时会得到以下错误.任何想法为什么?
错误:
org.apache.hadoop.ipc.RemoteException: org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException: No lease on /data/work/20110926-134514/_temporary/_attempt_201109110407_0167_r_000026_0/hbase/site=3815120/day=20110925/107-107-3815120-20110926-134514-r-00026 File does not exist. Holder DFSClient_attempt_201109110407_0167_r_000026_0 does not have any open files.
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:1557)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:1548)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:1603)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:1591)
at org.apache.hadoop.hdfs.server.namenode.NameNode.complete(NameNode.java:675)
at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:557)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1434)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1430)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1428)
at org.apache.hadoop.ipc.Client.call(Client.java:1107)
at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:226)
at $Proxy1.complete(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
at $Proxy1.complete(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.closeInternal(DFSClient.java:3566)
at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.close(DFSClient.java:3481)
at …Run Code Online (Sandbox Code Playgroud) 我想将数据导出为单独的文本文件; 我可以用这个黑客做到这一点:
for r in sqlContext.sql("SELECT DISTINCT FIPS FROM MY_DF").map(lambda r: r.FIPS).collect():
sqlContext.sql("SELECT * FROM MY_DF WHERE FIPS = '%s'" % r).rdd.saveAsTextFile('county_{}'.format(r))
Run Code Online (Sandbox Code Playgroud)
使用Spark 1.3.1/Python数据框架的正确方法是什么?我希望在一份工作中完成这项工作,而不是N(或N + 1)工作.
也许:
saveAsTextFileByKey()
我有一个包含键值对的rdd.只有3个键,我想将给定键的所有元素写入文本文件.目前我在3次传球中这样做,但我想知道我是否可以一次传球.
这是我到目前为止:
# I have an rdd (called my_rdd) such that a record is a key value pair, e.g.:
# ('data_set_1','value1,value2,value3,...,value100')
my_rdd.cache()
my_keys = ['data_set_1','data_set_2','data_set_3']
for key in my_keys:
my_rdd.filter(lambda l: l[0] == key).map(lambda l: l[1]).saveAsTextFile(my_path+'/'+key)
Run Code Online (Sandbox Code Playgroud)
这是有效的,但缓存它并迭代三次可能是一个漫长的过程.我想知道是否有任何方法可以同时写入所有三个文件?
如何在单个Map Reduce作业中使用Scalding(/ cascading)写入依赖于键的多个输出.我当然可以使用.filter所有可能的密钥,但这是一个可怕的黑客,它将启动许多工作.
是否有任何Spark函数允许根据某些creteria将集合拆分为多个RDD?这样的功能可以避免过度的迭代.例如:
def main(args: Array[String]) {
val logFile = "file.txt"
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile, 2).cache()
val lineAs = logData.filter(line => line.contains("a")).saveAsTextFile("linesA.txt")
val lineBs = logData.filter(line => line.contains("b")).saveAsTextFile("linesB.txt")
}
Run Code Online (Sandbox Code Playgroud)
在这个例子中,我必须迭代'logData`两次只是为了在两个单独的文件中写入结果:
val lineAs = logData.filter(line => line.contains("a")).saveAsTextFile("linesA.txt")
val lineBs = logData.filter(line => line.contains("b")).saveAsTextFile("linesB.txt")
Run Code Online (Sandbox Code Playgroud)
有这样的事情会很好:
val resultMap = logData.map(line => if line.contains("a") ("a", line) else if line.contains("b") ("b", line) else (" - ", line)
resultMap.writeByKey("a", "linesA.txt")
resultMap.writeByKey("b", "linesB.txt")
Run Code Online (Sandbox Code Playgroud)
这样的事吗?
我搜索了很长一段时间的解决方案,但没有得到任何正确的算法.
在scala中使用Spark RDD,如何将a RDD[(Key, Value)]转换为a Map[key, RDD[Value]],知道我不能使用collect或其他可能将数据加载到内存中的方法?
实际上,我的最终目标是Map[Key, RDD[Value]]按键循环并saveAsNewAPIHadoopFile为每个调用RDD[Value]
例如,如果我得到:
RDD[("A", 1), ("A", 2), ("A", 3), ("B", 4), ("B", 5), ("C", 6)]
Run Code Online (Sandbox Code Playgroud)
我想要 :
Map[("A" -> RDD[1, 2, 3]), ("B" -> RDD[4, 5]), ("C" -> RDD[6])]
Run Code Online (Sandbox Code Playgroud)
我想知道filter在每个键A,B,C上使用它是否花费不太多RDD[(Key, Value)],但是我不知道是否多次调用过滤器有不同的键会有效吗?(当然不是,但可能使用cache?)
谢谢
我正在尝试将存储在S3中的数据作为JSON-per-line文本文件转换为结构化的柱状格式,如ORC或Parquet on S3.
源文件包含多个方案的数据(例如,HTTP请求,HTTP响应......),需要将其解析为正确类型的不同Spark数据帧.
示例模式:
val Request = StructType(Seq(
StructField("timestamp", TimestampType, nullable=false),
StructField("requestId", LongType),
StructField("requestMethod", StringType),
StructField("scheme", StringType),
StructField("host", StringType),
StructField("headers", MapType(StringType, StringType, valueContainsNull=false)),
StructField("path", StringType),
StructField("sessionId", StringType),
StructField("userAgent", StringType)
))
val Response = StructType(Seq(
StructField("timestamp", TimestampType, nullable=false),
StructField("requestId", LongType),
StructField("contentType", StringType),
StructField("contentLength", IntegerType),
StructField("statusCode", StringType),
StructField("headers", MapType(keyType=StringType, valueType=StringType, valueContainsNull=false)),
StructField("responseDuration", DoubleType),
StructField("sessionId", StringType)
))
Run Code Online (Sandbox Code Playgroud)
我得到了那个部分工作正常,但是试图尽可能有效地将数据写回S3似乎是一个问题atm.
我尝试了3种方法:
在第一种情况下,JVM内存不足,而在第二种情况下,机器的磁盘空间不足.
第三个我还没有经过彻底的测试,但这似乎并不能有效地利用处理能力(因为只有一个集群节点(这个特定分区所在的节点)实际上会将数据写回S3) .
相关代码:
val allSchemes = Schemes.all().keys.toArray
if (false) {
import com.realo.warehouse.multiplex.implicits._
val input = readRawFromS3(inputPrefix) // returns …Run Code Online (Sandbox Code Playgroud) 嗨,有一个主题是使用MultipleTextOutputFormat在一个spark作业中将文本数据写入多个输出目录
我会问是否有类似的方法将avro数据写入多个目录
我想要的是将avro文件中的数据写入不同的目录(基于时间戳字段,时间戳中的同一天转到同一目录)