相关疑难解决方法(0)

Apache Spark RDD过滤成两个RDD

我需要将RDD分成两部分:

满足条件的1部分; 另一部分没有.我可以filter在原始RDD上做两次但看起来效率低下.有没有办法可以做我想要的事情?我在API和文献中都找不到任何东西.

apache-spark rdd

18
推荐指数
2
解决办法
8097
查看次数

Sparks RDD.randomSplit如何实际拆分RDD

所以假设我有一个3000行的rdd.2000个第一行是1类,最后1000行是class2.RDD分区为100个分区.

打电话的时候 RDD.randomSplit(0.8,0.2)

该功能是否也会改变rdd?我们的分裂只是连续20%的rdd样品?或者它是随机选择20%的分区?

理想情况下,生成的拆分与原始RDD具有相同的类分布.(即2:1)

谢谢

apache-spark rdd

17
推荐指数
1
解决办法
9993
查看次数

如何将数据框拆分为具有相同列值的数据框?

使用Scala,我如何将dataFrame拆分为具有相同列值的多个dataFrame(无论是数组还是集合).例如,我想拆分以下DataFrame:

ID  Rate    State
1   24  AL
2   35  MN
3   46  FL
4   34  AL
5   78  MN
6   99  FL
Run Code Online (Sandbox Code Playgroud)

至:

数据集1

ID  Rate    State
1   24  AL  
4   34  AL
Run Code Online (Sandbox Code Playgroud)

数据集2

ID  Rate    State
2   35  MN
5   78  MN
Run Code Online (Sandbox Code Playgroud)

数据集3

ID  Rate    State
3   46  FL
6   99  FL
Run Code Online (Sandbox Code Playgroud)

scala dataframe apache-spark apache-spark-sql

16
推荐指数
1
解决办法
2万
查看次数

在同一个RDD上使用多个管道阻止更多IO

例如,如果我运行相同的RDD数字,其中一个流过滤偶数并平均它们,其他过滤器为奇数并对它们求和.如果我在同一个RDD上将其写为两个管道,这将创建两个执行,这将扫描RDD两次,这在IO方面可能很昂贵.

如何将此IO简化为只读取一次数据而不将逻辑重写为一个管道?当然,只要开发人员继续独立地处理每个管道(在实际情况下,这些管道从单独的模块加载),一个采用两个管道并将它们合并为一个的框架是可以的.

关键是不要使用cache()来实现这一点

apache-spark

10
推荐指数
1
解决办法
183
查看次数

Scala Spark:拆分成几个RDD?

是否有任何Spark函数允许根据某些creteria将集合拆分为多个RDD?这样的功能可以避免过度的迭代.例如:

def main(args: Array[String]) {
    val logFile = "file.txt" 
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val lineAs = logData.filter(line => line.contains("a")).saveAsTextFile("linesA.txt")
    val lineBs = logData.filter(line => line.contains("b")).saveAsTextFile("linesB.txt")
  }
Run Code Online (Sandbox Code Playgroud)

在这个例子中,我必须迭代'logData`两次只是为了在两个单独的文件中写入结果:

    val lineAs = logData.filter(line => line.contains("a")).saveAsTextFile("linesA.txt")
    val lineBs = logData.filter(line => line.contains("b")).saveAsTextFile("linesB.txt")
Run Code Online (Sandbox Code Playgroud)

有这样的事情会很好:

    val resultMap = logData.map(line => if line.contains("a") ("a", line) else if line.contains("b") ("b", line) else (" - ", line)
    resultMap.writeByKey("a", "linesA.txt") 
    resultMap.writeByKey("b", "linesB.txt")
Run Code Online (Sandbox Code Playgroud)

这样的事吗?

scala apache-spark

6
推荐指数
2
解决办法
5700
查看次数

将RDD解复用到多个ORC表上

我正在尝试将存储在S3中的数据作为JSON-per-line文本文件转换为结构化的柱状格式,如ORC或Parquet on S3.

源文件包含多个方案的数据(例如,HTTP请求,HTTP响应......),需要将其解析为正确类型的不同Spark数据帧.

示例模式:

  val Request = StructType(Seq(
    StructField("timestamp", TimestampType, nullable=false),
    StructField("requestId", LongType),
    StructField("requestMethod", StringType),
    StructField("scheme", StringType),
    StructField("host", StringType),
    StructField("headers", MapType(StringType, StringType, valueContainsNull=false)),
    StructField("path", StringType),
    StructField("sessionId", StringType),
    StructField("userAgent", StringType)
  ))

  val Response = StructType(Seq(
    StructField("timestamp", TimestampType, nullable=false),
    StructField("requestId", LongType),
    StructField("contentType", StringType),
    StructField("contentLength", IntegerType),
    StructField("statusCode", StringType),
    StructField("headers", MapType(keyType=StringType, valueType=StringType, valueContainsNull=false)),
    StructField("responseDuration", DoubleType),
    StructField("sessionId", StringType)
  ))
Run Code Online (Sandbox Code Playgroud)

我得到了那个部分工作正常,但是试图尽可能有效地将数据写回S3似乎是一个问题atm.

我尝试了3种方法:

  1. 来自silex项目的muxPartitions
  2. 缓存已解析的S3输入并多次循环它
  3. 使每个方案类型成为RDD的单独分区

在第一种情况下,JVM内存不足,而在第二种情况下,机器的磁盘空间不足.

第三个我还没有经过彻底的测试,但这似乎并不能有效地利用处理能力(因为只有一个集群节点(这个特定分区所在的节点)实际上会将数据写回S3) .

相关代码:

val allSchemes = Schemes.all().keys.toArray

if (false) {
  import com.realo.warehouse.multiplex.implicits._

  val input = readRawFromS3(inputPrefix) // returns …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql

6
推荐指数
1
解决办法
313
查看次数

如何在火花中将rdd数据一分为二?

我在Spark RDD中有一个数据,我想将其分为两个部分,比例为0.7。例如,如果RDD如下所示:

[1,2,3,4,5,6,7,8,9,10]
Run Code Online (Sandbox Code Playgroud)

我想将其分为rdd1

 [1,2,3,4,5,6,7]
Run Code Online (Sandbox Code Playgroud)

rdd2

[8,9,10]
Run Code Online (Sandbox Code Playgroud)

比例为0.7。在rdd1rdd2应该是随机的,每次。我这样尝试:

seed = random.randint(0,10000)
rdd1 = data.sample(False,scale,seed)
rdd2 = data.subtract(rdd1)
Run Code Online (Sandbox Code Playgroud)

有时可以,但是当我的数据包含时,dict我遇到了一些问题。例如,数据如下:

[{1:2},{3:1},{5:4,2;6}]
Run Code Online (Sandbox Code Playgroud)

我懂了

TypeError:无法散列的类型:'dict'

python apache-spark rdd pyspark

5
推荐指数
1
解决办法
3491
查看次数

根据条件拆分Spark DataFrame

我需要类似于randomSplit函数的东西:

val Array(df1, df2) = myDataFrame.randomSplit(Array(0.6, 0.4))
Run Code Online (Sandbox Code Playgroud)

但是,我需要根据布尔条件拆分myDataFrame.是否存在以下任何内容?

val Array(df1, df2) = myDataFrame.booleanSplit(col("myColumn") > 100)
Run Code Online (Sandbox Code Playgroud)

我不想做两个单独的.filter调用.

scala dataframe apache-spark apache-spark-sql

5
推荐指数
2
解决办法
5432
查看次数

如何根据 Spark-scala 中的过滤器将数据集分为两部分

是否可以使用单个过滤器操作将 DF 分为两部分。例如

假设 df 有以下记录

UID    Col
 1       a
 2       b
 3       c
Run Code Online (Sandbox Code Playgroud)

如果我做

df1 = df.filter(UID <=> 2)
Run Code Online (Sandbox Code Playgroud)

我可以在单个操作中将过滤和未过滤的记录保存在不同的 RDD 中吗?

 df1 can have records where uid = 2
 df2 can have records with uid 1 and 3 
Run Code Online (Sandbox Code Playgroud)

scala apache-spark

3
推荐指数
1
解决办法
4437
查看次数

如何拆分大数据帧并使用较小的部分在 Spark 中进行多个广播连接?

假设我们有两个非常大的数据帧 - A 和 B。现在,我明白如果我对两个 RDD 使用相同的哈希分区器然后进行连接,键将位于同一位置,并且连接可能会更快,同时减少混洗(唯一会发生的改组将是分区程序在 A 和 B 上发生变化时)。

我想尝试一些不同的东西 - 我想像这样尝试广播连接 -> 假设 B 比 A 小,所以我们选择 B 进行广播,但 B 仍然是一个非常大的数据帧。所以,我们想要做的是从 B 中制作多个数据帧,然后将每个数据帧作为广播发送到 A 上。

有没有人试过这个?要将一个数据帧拆分为多个数据帧,我只看到了 randomSplit 方法,但这看起来并不是一个很好的选择。

还有其他更好的方法来完成这项任务吗?

谢谢!

scala apache-spark

3
推荐指数
1
解决办法
1314
查看次数

标签 统计

apache-spark ×10

scala ×5

apache-spark-sql ×3

rdd ×3

dataframe ×2

pyspark ×1

python ×1