小编maa*_*asg的帖子

将列表组合概括为N个列表

在Scala中生成已知数量的列表的组合非常简单.你可以使用for-understanding:

for {
   elem1 <- list1
   elem2 <- list2 } yield List(elem1, elem2)
Run Code Online (Sandbox Code Playgroud)

或者您可以使用脱糖版本:

list1.flatMap( elem1 => list2.map(elem2 => List(elem1,elem2)))
Run Code Online (Sandbox Code Playgroud)

在套件之后,我想创建N个列表中的元素组合(N在运行时已知).在组合器示例之后,3个列表将是:

list1.flatMap( elem1 => list2.flatMap(elem2 => list3.map(elem3 => List(elem1,elem2,elem3)))
Run Code Online (Sandbox Code Playgroud)

所以我看到了模式,我知道那里有一个递归,但我一直在努力将它固定下来.

def combinations[T](lists:List[List[T]]): List[List[T]] = ???
Run Code Online (Sandbox Code Playgroud)

有任何想法吗?

recursion scala list combinators combinatorics

5
推荐指数
2
解决办法
845
查看次数

在Spark Streaming中合并两个流

你可以通过以下问题把我推向正确的方向吗?(即使链接到包含所需信息的文档也会受到赞赏.)

是否有能力将多个数据流合并到元组流中.

例如,我们有一个带有元素(A1,t1),(A2,t2),...(An,tn)的流A和带有元素(B1,t1'),(B2,t2'),...的流B, (Bn,tn').

其中t是值的时间(值实际上是时间序列).

我想接收带有值的流C.

(A1",B1","t1"),...,("an","Bn","tn")

来自流A和B的时间可能不同(这就是我使用'和'的原因).度量可以在不同的时间和不同的速率下消耗.在这种情况下,必须在合并流时采用具有最新所需时间戳的值.

merge stream apache-spark

5
推荐指数
1
解决办法
2769
查看次数

Spark-Saving JavaRDD到Cassandra

http://www.datastax.com/dev/blog/accessing-cassandra-from-spark-in-java

上面的链接显示了JavaRDDcassandra这种方式保存到的方法:

import static com.datastax.spark.connector.CassandraJavaUtil.*;

JavaRDD<Product> productsRDD = sc.parallelize(products);
javaFunctions(productsRDD, Product.class).saveToCassandra("java_api", "products");
Run Code Online (Sandbox Code Playgroud)

com.datastax.spark.connector.CassandraJavaUtil.*似乎已经弃用了.更新的API应为:

import static com.datastax.spark.connector.japi.CassandraJavaUtil.*;
Run Code Online (Sandbox Code Playgroud)

有人能告诉我一些代码来存储JavaRDDCassandra使用上述更新API?

cassandra apache-spark rdd

5
推荐指数
1
解决办法
4453
查看次数

如何使Spark Streaming计算单元测试中文件中的单词?

我已经在Java中成功构建了一个非常简单的Spark Streaming应用程序,该应用程序基于Scala中HdfsCount示例.

当我将此应用程序提交给我的本地Spark时,它会等待将文件写入给定目录,当我创建该文件时,它会成功打印出单词数.我按Ctrl + C终止应用程序.

现在我已经尝试为这个功能创建一个非常基本的单元测试,但在测试中我无法打印相同的信息,即单词的数量.

我错过了什么?

下面是单元测试文件,之后我还包含了显示countWords方法的代码片段:

StarterAppTest.java

import com.google.common.io.Files;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;


import org.junit.*;

import java.io.*;

public class StarterAppTest {

  JavaStreamingContext ssc;
  File tempDir;

  @Before
  public void setUp() {
    ssc = new JavaStreamingContext("local", "test", new Duration(3000));
    tempDir = Files.createTempDir();
    tempDir.deleteOnExit();
  }

  @After
  public void tearDown() {
    ssc.stop();
    ssc = null;
  }

  @Test
  public void testInitialization() {
    Assert.assertNotNull(ssc.sc());
  }


  @Test
  public void testCountWords() {

    StarterApp starterApp = new StarterApp();

    try …
Run Code Online (Sandbox Code Playgroud)

java unit-testing apache-spark spark-streaming

5
推荐指数
1
解决办法
2612
查看次数

火花串中的缓存是否会提高性能

所以我正在kafka流中的同一个rdd上执行多个操作.是否缓存RDD会提高性能?

apache-spark spark-streaming

5
推荐指数
1
解决办法
3044
查看次数

尾递归问题

我们正在Scala中尝试并行收集,并想检查结果是否已订购.为此,我在REPL上编写了一个小函数来检查我们生成的非常大的List:

def isOrdered(l:List[Int]):Boolean = { l match { 
  case Nil => true
  case x::Nil => true
  case x::y::Nil => x>y
  case x::y::tail => x>y & isOrdered(tail) 
  }
}
Run Code Online (Sandbox Code Playgroud)

它失败了stackOverflow(这里的问题是多么合适!).我期待它是尾部优化的.怎么了?

stack-overflow scala tail-recursion tail-call-optimization

4
推荐指数
2
解决办法
388
查看次数

使用Scala的REPL进行比较性能基准测试是否合理?

Scala的REPL是交互式测试某些代码片段的绝佳场所.最近,我一直在使用REPL进行一些性能比较,以重复执行操作并相对地测量挂钟时间.

这是我最近创建的一个例子,用于帮助回答SO问题[1] [2]:

// Figure out the perfomance difference between direct method invocation and reflection-based method.invoke

def invoke1[T,U](obj:Any, method:Method)(param:T):U = method.invoke(obj,Seq(param.asInstanceOf[java.lang.Object]):_*) match { 
    case x: java.lang.Object if x==null => null.asInstanceOf[U]
    case x => x.asInstanceOf[U]
}

def time[T](b: => T):(T, Long) = {
    val t0 = System.nanoTime()
    val res = b
    val t = System.nanoTime() - t0
    (res,t )
}

class Test {
  def op(l:Long): Long = (2 until math.sqrt(l).toInt).filter(x=>l%x==0).sum
}

val t0 = new Test

val method = classOf[Test].getMethods.find(_.getName=="op").get

def …
Run Code Online (Sandbox Code Playgroud)

scala performance-testing read-eval-print-loop

4
推荐指数
1
解决办法
439
查看次数

在Amazon S3上保留RDD

我在Amazon S3上有一个包含JSON对象的大文本文件.我计划在Amazon EMR上使用Spark处理这些数据.

这是我的问题:

  1. 如何将包含JSON对象的文本文件加载到Spark中?
  2. 在EMR集群关闭后,是否可以在S3上保留此数据的内部RDD表示?
  3. 如果我能够持久保存RDD表示,下次需要分析相同数据时是否可以直接加载RDD格式的数据?

json amazon-s3 apache-spark

4
推荐指数
1
解决办法
3134
查看次数

Spark TF-IDF从哈希中获取单词

我在Spark文档中关注此示例,用于计算一堆文档的TF-IDF.Spark使用散列技巧进行此计算,所以最后你得到一个包含散列词和相应权重的Vector但是......我如何从散列中取回单词?

我是否真的需要哈希所有单词并将它们保存在地图中以便以后迭代查找关键字?内置Spark没有更有效的方法吗?

提前致谢

java hash tf-idf apache-spark

4
推荐指数
2
解决办法
2038
查看次数

如何在其他RDD映射方法中使用RDD?

我得到了一个名为索引的rdd:RDD [(String,String)],我想使用索引来处理我的文件。这是代码:

val get = file.map({x =>
  val tmp = index.lookup(x).head
  tmp
})
Run Code Online (Sandbox Code Playgroud)

问题是我不能在file.map函数中使用索引,我运行了该程序,它给了我这样的反馈:

14/12/11 16:22:27 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 602, spark2): scala.MatchError: null
        org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:770)
        com.ynu.App$$anonfun$12.apply(App.scala:270)
        com.ynu.App$$anonfun$12.apply(App.scala:265)
        scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
        scala.collection.Iterator$class.foreach(Iterator.scala:727)
        scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        scala.collection.AbstractIterator.to(Iterator.scala:1157)
        scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1080)
        org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1080)
        org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
        org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        org.apache.spark.scheduler.Task.run(Task.scala:54)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

我不知道为什么 如果要实现此功能,该怎么办?谢谢

scala apache-spark rdd

4
推荐指数
1
解决办法
2417
查看次数