我需要将RDD分成两部分:
满足条件的1部分; 另一部分没有.我可以filter在原始RDD上做两次但看起来效率低下.有没有办法可以做我想要的事情?我在API和文献中都找不到任何东西.
所以假设我有一个3000行的rdd.2000个第一行是1类,最后1000行是class2.RDD分区为100个分区.
打电话的时候 RDD.randomSplit(0.8,0.2)
该功能是否也会改变rdd?我们的分裂只是连续20%的rdd样品?或者它是随机选择20%的分区?
理想情况下,生成的拆分与原始RDD具有相同的类分布.(即2:1)
谢谢
使用Scala,我如何将dataFrame拆分为具有相同列值的多个dataFrame(无论是数组还是集合).例如,我想拆分以下DataFrame:
ID Rate State
1 24 AL
2 35 MN
3 46 FL
4 34 AL
5 78 MN
6 99 FL
Run Code Online (Sandbox Code Playgroud)
至:
数据集1
ID Rate State
1 24 AL
4 34 AL
Run Code Online (Sandbox Code Playgroud)
数据集2
ID Rate State
2 35 MN
5 78 MN
Run Code Online (Sandbox Code Playgroud)
数据集3
ID Rate State
3 46 FL
6 99 FL
Run Code Online (Sandbox Code Playgroud) 例如,如果我运行相同的RDD数字,其中一个流过滤偶数并平均它们,其他过滤器为奇数并对它们求和.如果我在同一个RDD上将其写为两个管道,这将创建两个执行,这将扫描RDD两次,这在IO方面可能很昂贵.
如何将此IO简化为只读取一次数据而不将逻辑重写为一个管道?当然,只要开发人员继续独立地处理每个管道(在实际情况下,这些管道从单独的模块加载),一个采用两个管道并将它们合并为一个的框架是可以的.
关键是不要使用cache()来实现这一点
是否有任何Spark函数允许根据某些creteria将集合拆分为多个RDD?这样的功能可以避免过度的迭代.例如:
def main(args: Array[String]) {
val logFile = "file.txt"
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile, 2).cache()
val lineAs = logData.filter(line => line.contains("a")).saveAsTextFile("linesA.txt")
val lineBs = logData.filter(line => line.contains("b")).saveAsTextFile("linesB.txt")
}
Run Code Online (Sandbox Code Playgroud)
在这个例子中,我必须迭代'logData`两次只是为了在两个单独的文件中写入结果:
val lineAs = logData.filter(line => line.contains("a")).saveAsTextFile("linesA.txt")
val lineBs = logData.filter(line => line.contains("b")).saveAsTextFile("linesB.txt")
Run Code Online (Sandbox Code Playgroud)
有这样的事情会很好:
val resultMap = logData.map(line => if line.contains("a") ("a", line) else if line.contains("b") ("b", line) else (" - ", line)
resultMap.writeByKey("a", "linesA.txt")
resultMap.writeByKey("b", "linesB.txt")
Run Code Online (Sandbox Code Playgroud)
这样的事吗?
我正在尝试将存储在S3中的数据作为JSON-per-line文本文件转换为结构化的柱状格式,如ORC或Parquet on S3.
源文件包含多个方案的数据(例如,HTTP请求,HTTP响应......),需要将其解析为正确类型的不同Spark数据帧.
示例模式:
val Request = StructType(Seq(
StructField("timestamp", TimestampType, nullable=false),
StructField("requestId", LongType),
StructField("requestMethod", StringType),
StructField("scheme", StringType),
StructField("host", StringType),
StructField("headers", MapType(StringType, StringType, valueContainsNull=false)),
StructField("path", StringType),
StructField("sessionId", StringType),
StructField("userAgent", StringType)
))
val Response = StructType(Seq(
StructField("timestamp", TimestampType, nullable=false),
StructField("requestId", LongType),
StructField("contentType", StringType),
StructField("contentLength", IntegerType),
StructField("statusCode", StringType),
StructField("headers", MapType(keyType=StringType, valueType=StringType, valueContainsNull=false)),
StructField("responseDuration", DoubleType),
StructField("sessionId", StringType)
))
Run Code Online (Sandbox Code Playgroud)
我得到了那个部分工作正常,但是试图尽可能有效地将数据写回S3似乎是一个问题atm.
我尝试了3种方法:
在第一种情况下,JVM内存不足,而在第二种情况下,机器的磁盘空间不足.
第三个我还没有经过彻底的测试,但这似乎并不能有效地利用处理能力(因为只有一个集群节点(这个特定分区所在的节点)实际上会将数据写回S3) .
相关代码:
val allSchemes = Schemes.all().keys.toArray
if (false) {
import com.realo.warehouse.multiplex.implicits._
val input = readRawFromS3(inputPrefix) // returns …Run Code Online (Sandbox Code Playgroud) 我在Spark RDD中有一个数据,我想将其分为两个部分,比例为0.7。例如,如果RDD如下所示:
[1,2,3,4,5,6,7,8,9,10]
Run Code Online (Sandbox Code Playgroud)
我想将其分为rdd1:
[1,2,3,4,5,6,7]
Run Code Online (Sandbox Code Playgroud)
和rdd2:
[8,9,10]
Run Code Online (Sandbox Code Playgroud)
比例为0.7。在rdd1和rdd2应该是随机的,每次。我这样尝试:
seed = random.randint(0,10000)
rdd1 = data.sample(False,scale,seed)
rdd2 = data.subtract(rdd1)
Run Code Online (Sandbox Code Playgroud)
有时可以,但是当我的数据包含时,dict我遇到了一些问题。例如,数据如下:
[{1:2},{3:1},{5:4,2;6}]
Run Code Online (Sandbox Code Playgroud)
我懂了
TypeError:无法散列的类型:'dict'
我需要类似于randomSplit函数的东西:
val Array(df1, df2) = myDataFrame.randomSplit(Array(0.6, 0.4))
Run Code Online (Sandbox Code Playgroud)
但是,我需要根据布尔条件拆分myDataFrame.是否存在以下任何内容?
val Array(df1, df2) = myDataFrame.booleanSplit(col("myColumn") > 100)
Run Code Online (Sandbox Code Playgroud)
我不想做两个单独的.filter调用.
是否可以使用单个过滤器操作将 DF 分为两部分。例如
假设 df 有以下记录
UID Col
1 a
2 b
3 c
Run Code Online (Sandbox Code Playgroud)
如果我做
df1 = df.filter(UID <=> 2)
Run Code Online (Sandbox Code Playgroud)
我可以在单个操作中将过滤和未过滤的记录保存在不同的 RDD 中吗?
df1 can have records where uid = 2
df2 can have records with uid 1 and 3
Run Code Online (Sandbox Code Playgroud) 假设我们有两个非常大的数据帧 - A 和 B。现在,我明白如果我对两个 RDD 使用相同的哈希分区器然后进行连接,键将位于同一位置,并且连接可能会更快,同时减少混洗(唯一会发生的改组将是分区程序在 A 和 B 上发生变化时)。
我想尝试一些不同的东西 - 我想像这样尝试广播连接 -> 假设 B 比 A 小,所以我们选择 B 进行广播,但 B 仍然是一个非常大的数据帧。所以,我们想要做的是从 B 中制作多个数据帧,然后将每个数据帧作为广播发送到 A 上。
有没有人试过这个?要将一个数据帧拆分为多个数据帧,我只看到了 randomSplit 方法,但这看起来并不是一个很好的选择。
还有其他更好的方法来完成这项任务吗?
谢谢!