在Scala中生成已知数量的列表的组合非常简单.你可以使用for-understanding:
for {
elem1 <- list1
elem2 <- list2 } yield List(elem1, elem2)
Run Code Online (Sandbox Code Playgroud)
或者您可以使用脱糖版本:
list1.flatMap( elem1 => list2.map(elem2 => List(elem1,elem2)))
Run Code Online (Sandbox Code Playgroud)
在套件之后,我想创建N个列表中的元素组合(N在运行时已知).在组合器示例之后,3个列表将是:
list1.flatMap( elem1 => list2.flatMap(elem2 => list3.map(elem3 => List(elem1,elem2,elem3)))
Run Code Online (Sandbox Code Playgroud)
所以我看到了模式,我知道那里有一个递归,但我一直在努力将它固定下来.
def combinations[T](lists:List[List[T]]): List[List[T]] = ???
Run Code Online (Sandbox Code Playgroud)
有任何想法吗?
你可以通过以下问题把我推向正确的方向吗?(即使链接到包含所需信息的文档也会受到赞赏.)
是否有能力将多个数据流合并到元组流中.
例如,我们有一个带有元素(A1,t1),(A2,t2),...(An,tn)的流A和带有元素(B1,t1'),(B2,t2'),...的流B, (Bn,tn').
其中t是值的时间(值实际上是时间序列).
我想接收带有值的流C.
(A1",B1","t1"),...,("an","Bn","tn")
来自流A和B的时间可能不同(这就是我使用'和'的原因).度量可以在不同的时间和不同的速率下消耗.在这种情况下,必须在合并流时采用具有最新所需时间戳的值.
http://www.datastax.com/dev/blog/accessing-cassandra-from-spark-in-java
上面的链接显示了JavaRDD以cassandra这种方式保存到的方法:
import static com.datastax.spark.connector.CassandraJavaUtil.*;
JavaRDD<Product> productsRDD = sc.parallelize(products);
javaFunctions(productsRDD, Product.class).saveToCassandra("java_api", "products");
Run Code Online (Sandbox Code Playgroud)
但com.datastax.spark.connector.CassandraJavaUtil.*似乎已经弃用了.更新的API应为:
import static com.datastax.spark.connector.japi.CassandraJavaUtil.*;
Run Code Online (Sandbox Code Playgroud)
有人能告诉我一些代码来存储JavaRDD到Cassandra使用上述更新API?
我已经在Java中成功构建了一个非常简单的Spark Streaming应用程序,该应用程序基于Scala中的HdfsCount示例.
当我将此应用程序提交给我的本地Spark时,它会等待将文件写入给定目录,当我创建该文件时,它会成功打印出单词数.我按Ctrl + C终止应用程序.
现在我已经尝试为这个功能创建一个非常基本的单元测试,但在测试中我无法打印相同的信息,即单词的数量.
我错过了什么?
下面是单元测试文件,之后我还包含了显示countWords方法的代码片段:
import com.google.common.io.Files;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.junit.*;
import java.io.*;
public class StarterAppTest {
JavaStreamingContext ssc;
File tempDir;
@Before
public void setUp() {
ssc = new JavaStreamingContext("local", "test", new Duration(3000));
tempDir = Files.createTempDir();
tempDir.deleteOnExit();
}
@After
public void tearDown() {
ssc.stop();
ssc = null;
}
@Test
public void testInitialization() {
Assert.assertNotNull(ssc.sc());
}
@Test
public void testCountWords() {
StarterApp starterApp = new StarterApp();
try …Run Code Online (Sandbox Code Playgroud) 所以我正在kafka流中的同一个rdd上执行多个操作.是否缓存RDD会提高性能?
我们正在Scala中尝试并行收集,并想检查结果是否已订购.为此,我在REPL上编写了一个小函数来检查我们生成的非常大的List:
def isOrdered(l:List[Int]):Boolean = { l match {
case Nil => true
case x::Nil => true
case x::y::Nil => x>y
case x::y::tail => x>y & isOrdered(tail)
}
}
Run Code Online (Sandbox Code Playgroud)
它失败了stackOverflow(这里的问题是多么合适!).我期待它是尾部优化的.怎么了?
Scala的REPL是交互式测试某些代码片段的绝佳场所.最近,我一直在使用REPL进行一些性能比较,以重复执行操作并相对地测量挂钟时间.
这是我最近创建的一个例子,用于帮助回答SO问题[1] [2]:
// Figure out the perfomance difference between direct method invocation and reflection-based method.invoke
def invoke1[T,U](obj:Any, method:Method)(param:T):U = method.invoke(obj,Seq(param.asInstanceOf[java.lang.Object]):_*) match {
case x: java.lang.Object if x==null => null.asInstanceOf[U]
case x => x.asInstanceOf[U]
}
def time[T](b: => T):(T, Long) = {
val t0 = System.nanoTime()
val res = b
val t = System.nanoTime() - t0
(res,t )
}
class Test {
def op(l:Long): Long = (2 until math.sqrt(l).toInt).filter(x=>l%x==0).sum
}
val t0 = new Test
val method = classOf[Test].getMethods.find(_.getName=="op").get
def …Run Code Online (Sandbox Code Playgroud) 我在Amazon S3上有一个包含JSON对象的大文本文件.我计划在Amazon EMR上使用Spark处理这些数据.
这是我的问题:
我在Spark文档中关注此示例,用于计算一堆文档的TF-IDF.Spark使用散列技巧进行此计算,所以最后你得到一个包含散列词和相应权重的Vector但是......我如何从散列中取回单词?
我是否真的需要哈希所有单词并将它们保存在地图中以便以后迭代查找关键字?内置Spark没有更有效的方法吗?
提前致谢
我得到了一个名为索引的rdd:RDD [(String,String)],我想使用索引来处理我的文件。这是代码:
val get = file.map({x =>
val tmp = index.lookup(x).head
tmp
})
Run Code Online (Sandbox Code Playgroud)
问题是我不能在file.map函数中使用索引,我运行了该程序,它给了我这样的反馈:
14/12/11 16:22:27 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 602, spark2): scala.MatchError: null
org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:770)
com.ynu.App$$anonfun$12.apply(App.scala:270)
com.ynu.App$$anonfun$12.apply(App.scala:265)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
scala.collection.AbstractIterator.to(Iterator.scala:1157)
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1080)
org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1080)
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)
我不知道为什么 如果要实现此功能,该怎么办?谢谢
apache-spark ×7
scala ×4
java ×2
rdd ×2
amazon-s3 ×1
cassandra ×1
combinators ×1
hash ×1
json ×1
list ×1
merge ×1
recursion ×1
stream ×1
tf-idf ×1
unit-testing ×1