标签: rdd

Spark:如何按时间范围加入RDD

我有一个微妙的Spark问题,我无法绕过头.

我们有两个RDD(来自Cassandra).RDD1包含Actions和RDD2包含Historic数据.两者都有一个id可以匹配/加入.但问题是这两个表有一个N:N关系.Actions包含多个具有相同ID的行,因此也是如此Historic.以下是两个表的一些示例日期.

Actions 时间实际上是一个时间戳

id  |  time  | valueX
1   |  12:05 | 500
1   |  12:30 | 500
2   |  12:30 | 125
Run Code Online (Sandbox Code Playgroud)

Historic set_at实际上是一个时间戳

id  |  set_at| valueY
1   |  11:00 | 400
1   |  12:15 | 450
2   |  12:20 | 50
2   |  12:25 | 75
Run Code Online (Sandbox Code Playgroud)

我们如何以某种方式加入这两个表,我们得到这样的结果

1   |  100  # 500 - 400 for Actions#1 with time 12:05 because Historic was in that time at 400
1 …
Run Code Online (Sandbox Code Playgroud)

cassandra apache-spark rdd

14
推荐指数
1
解决办法
2471
查看次数

为什么Spark RDD分区对HDFS有2GB的限制?

使用mllib RandomForest训练数据时出错.由于我的数据集很大,默认分区相对较小.所以抛出异常表示"Size超过Integer.MAX_VALUE",原始堆栈跟踪如下,

15/04/16 14:13:03 WARN scheduler.TaskSetManager:阶段6.0中的丢失任务19.0(TID 120,10.215.149.47):java.lang.IllegalArgumentException:大小超过
sun.nio.ch.FileChannelImpl处的Integer.MAX_VALUE.在org.apache上的org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)的org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)中映射(FileChannelImpl.java:828) .spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:432)atg.apache.spark.storage.BlockManager.get(BlockManager.scala) :618)org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:146)at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)

Integer.MAX_SIZE是2GB,似乎有些分区内存不足.所以我将我的rdd分区重新分配到1000,这样每个分区可以保存比以前少得多的数据.最后,问题解决了!

所以,我的问题是:为什么分区大小有2G限制?似乎没有为spark中的限制设置配置

scala apache-spark rdd

14
推荐指数
1
解决办法
8565
查看次数

在Spark API中,makeRDD函数和parallelize函数有什么区别?

在make spark app期间,我有一个问题.在Spark API中,makeRDD函数和parallelize函数有什么区别?

scala apache-spark rdd

14
推荐指数
1
解决办法
5935
查看次数

groupByKey是否比reduceByKey更受欢迎

我总是reduceByKey在需要在RDD中对数据进行分组时使用,因为它在对数据进行混洗之前执行地图侧减少,这通常意味着更少的数据被改组,因此我获得了更好的性能.即使地图侧缩减功能收集所有值并且实际上并没有减少数据量,我仍然使用reduceByKey,因为我假设性能reduceByKey永远不会差groupByKey.但是,我想知道这个假设是否正确,或者确实存在groupByKey应该首选的情况?

apache-spark rdd

14
推荐指数
2
解决办法
1万
查看次数

如何从RDD [PYSPARK]中删除重复值

我有下表作为RDD:

Key Value
1    y
1    y
1    y
1    n
1    n
2    y
2    n
2    n
Run Code Online (Sandbox Code Playgroud)

我想从中删除所有重复项Value.

输出应该是这样的:

Key Value
1    y
1    n
2    y
2    n
Run Code Online (Sandbox Code Playgroud)

在pyspark中工作时,输出应该是键值对列表,如下所示:

[(u'1',u'n'),(u'2',u'n')]
Run Code Online (Sandbox Code Playgroud)

我不知道如何在for这里应用循环.在普通的Python程序中,它会非常简单.

我想知道是否有pyspark相同的功能.

python apache-spark rdd

13
推荐指数
2
解决办法
2万
查看次数

如何反转RDD.takeOrdered()的排序?

在Spark中反转RDD的takeOrdered()方法的顺序的语法是什么?

对于奖励积分,Spark中RDD的自定义排序语法是什么?

apache-spark rdd

13
推荐指数
2
解决办法
2万
查看次数

将RDD转换为可迭代:PySpark?

我有一个RDD,我通过加载文本文件并预处理它来创建.我不想收集它并将其保存到磁盘或内存(整个数据),而是想将它传递给python中的一些其他函数,它们一个接一个地使用迭代的形式.

这怎么可能?

data =  sc.textFile('file.txt').map(lambda x: some_func(x))

an_iterable = data. ##  what should I do here to make it give me one element at a time?
def model1(an_iterable):
 for i in an_iterable:
  do_that(i)

model(an_iterable)
Run Code Online (Sandbox Code Playgroud)

python apache-spark rdd pyspark

13
推荐指数
1
解决办法
2万
查看次数

Apache spark处理case语句

我正在处理将SQL代码转换为PySpark代码并遇到一些SQL语句.我不知道如何处理pyspark中的案例陈述?我打算创建一个RDD然后使用rdd.map然后做一些逻辑检查.这是正确的方法吗?请帮忙!

基本上我需要遍历RDD或DF中的每一行,并根据我需要编辑其中一个列值的逻辑.

     case  
               when (e."a" Like 'a%' Or e."b" Like 'b%') 
                And e."aa"='BW' And cast(e."abc" as decimal(10,4))=75.0 Then 'callitA'

               when (e."a" Like 'b%' Or e."b" Like 'a%') 
                And e."aa"='AW' And cast(e."abc" as decimal(10,4))=75.0 Then 'callitB'

else

'CallitC'
Run Code Online (Sandbox Code Playgroud)

apache-spark rdd pyspark spark-dataframe pyspark-sql

13
推荐指数
2
解决办法
3万
查看次数

如何在Spark的RDD中获取元素位置?

我是Apache Spark的新手,我知道核心数据结构是RDD.现在我正在编写一些需要元素位置信息的应用程序.例如,在将ArrayList转换为(Java)RDD之后,对于RDD中的每个整数,我需要知道它的(全局)数组下标.有可能吗?

据我所知,RDD 有一个take(int)函数,所以我相信位置信息仍然保留在RDD中.

position apache-spark rdd

12
推荐指数
2
解决办法
2万
查看次数

Spark groupByKey另类

根据Databricks的最佳实践,groupByKey应该避免使用Spark ,因为Spark groupByKey处理的工作方式是首先将信息拖放到工作人员之间,然后进行处理.说明

所以,我的问题是,有哪些替代方案能够groupByKey以分布式和快速的方式返回以下内容?

// want this
{"key1": "1", "key1": "2", "key1": "3", "key2": "55", "key2": "66"}
// to become this
{"key1": ["1","2","3"], "key2": ["55","66"]}
Run Code Online (Sandbox Code Playgroud)

在我看来,可能aggregateByKey或者glom可以先在partition(map)中执行,然后将所有列表连接在一起(reduce).

python reduce apache-spark rdd pyspark

12
推荐指数
1
解决办法
4651
查看次数