小编Noa*_*oah的帖子

Spark:使用单键RDD加入2元组密钥RDD的最佳策略是什么?

我有两个我想加入的RDD,它们看起来像这样:

val rdd1:RDD[(T,U)]
val rdd2:RDD[((T,W), V)]
Run Code Online (Sandbox Code Playgroud)

碰巧的是,键值rdd1是唯一的,并且元组键值rdd2是唯一的.我想加入这两个数据集,以便获得以下rdd:

val rdd_joined:RDD[((T,W), (U,V))]
Run Code Online (Sandbox Code Playgroud)

实现这一目标的最有效方法是什么?以下是我想到的一些想法.

选项1:

val m = rdd1.collectAsMap
val rdd_joined = rdd2.map({case ((t,w), u) => ((t,w), u, m.get(t))})
Run Code Online (Sandbox Code Playgroud)

选项2:

val distinct_w = rdd2.map({case ((t,w), u) => w}).distinct
val rdd_joined = rdd1.cartesian(distinct_w).join(rdd2)
Run Code Online (Sandbox Code Playgroud)

选项1将收集掌握的所有数据,对吧?因此,如果rdd1很大(在我的情况下它相对较大,虽然比rdd2小一个数量级),这似乎不是一个好的选择.选项2做了一个丑陋的独特和笛卡尔产品,这似乎也非常低效.我想到的另一种可能性(但尚未尝试)是做选项1并广播地图,尽管以"智能"方式进行广播会更好,这样地图的按键与钥匙rdd2.

有没有人遇到过这种情况?我很乐意有你的想法.

谢谢!

scala apache-spark

48
推荐指数
2
解决办法
5万
查看次数

访问scala期货返回的价值

我是scala期货的新手,我对scala期货的回报价值有疑问.

因此,scala未来的语法通常是

 def downloadPage(url: URL) = Future[List[Int]] {

 }
Run Code Online (Sandbox Code Playgroud)

我想知道如何访问List[Int]调用此方法的其他方法.

换一种说法,

val result = downloadPage("localhost") 
Run Code Online (Sandbox Code Playgroud)

那么List[Int]走出未来的方法应该是什么?

我尝试过使用map方法,但无法成功完成此操作

scala future

43
推荐指数
4
解决办法
6万
查看次数

访问由Source.actorRef创建的akka​​流源的基础ActorRef

我正在尝试使用Source.actorRef方法来创建akka.stream.scaladsl.Source对象.形式的东西

import akka.stream.OverflowStrategy.fail
import akka.stream.scaladsl.Source

case class Weather(zip : String, temp : Double, raining : Boolean)

val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail)

val sunnySource = weatherSource.filter(!_.raining)
...
Run Code Online (Sandbox Code Playgroud)

我的问题是:如何将数据发送到基于ActorRef的Source对象

我假设向Source发送消息是一种形式

//does not compile
weatherSource ! Weather("90210", 72.0, false)
weatherSource ! Weather("02139", 32.0, true)
Run Code Online (Sandbox Code Playgroud)

但是weatherSource没有!操作员或tell方法.

文件是不是关于如何使用Source.actorRef太过描述,它只是说,你可以...

提前感谢您的审核和回复.

scala akka akka-stream

27
推荐指数
3
解决办法
7396
查看次数

如何在Scala中找到每个未来的时间?

我有一个scala代码一次运行多个期货.我想分析执行每个时间所花费的时间.例如:

for (i <- 1 to 100) {
   val f = future { runAndTime(doSomething()) }
   f.onComplete {
       case Success(timeTaken) => println(timeTaken)
       case Failure(t) => println(t.getMessage())
    }
}
Run Code Online (Sandbox Code Playgroud)

一个天真的实现runAndTime可能是:

   def runAndTime(func: => Unit) = {
     var time = System.currentTimeMillis()
     func
     System.currentTimeMillis() - time
   }
Run Code Online (Sandbox Code Playgroud)

这个问题runAndTime是当线程没有执行时(例如,如果在func的中间它从cpu中出队并且其他一些线程开始运行)系统仍在花费时间,所以我们没有花时间在该特定线程中但是线程启动和线程结束之间的总时间差.

如何编写一个runAndTime将计算未来在CPU中实际执行的时间?

multithreading scala future timing akka

5
推荐指数
1
解决办法
1296
查看次数

在 C# 中散列一个委托函数

如何在 C# 中获取委托函数的哈希值。我希望能够判断是否将不同的代表发送到我的函数中。我的代码看起来像这样:

public string GetContent(Func<string, bool> isValid)
{
// Do some work
SomeFunctionToHashAFunction(isValid)
}
Run Code Online (Sandbox Code Playgroud)

我会使用 .GetHashCode() 但 .NET 框架不保证这些将是唯一的。

编辑 我有一些正在验证的缓存内容,但我只想验证一次。但是,如果验证功能发生变化,那么我需要重新验证缓存的内容。我不确定 ObjectIdGenerator 是否可以在这个实例中工作,因为我需要确定两个匿名函数是否具有相同的实现。

c# hash delegates

4
推荐指数
1
解决办法
1351
查看次数