标签: scala-streams

在VM中部署的独立群集中,Spark流不起作用

我使用Scala编写了Kafka流程序并在Spark独立集群中执行.代码在我的本地工作正常.我在Azure VM中完成了Kafka,Cassandra和Spark设置.我已打开所有入站和出站端口以避免端口阻塞.

开始大师

sbin目录> ./ start-master.sh

开始奴隶

sbin#./ start-slave.sh spark:// vm-hostname:7077

我已在Master WEB UI中验证了此状态.

提交工作

bin#./ spark-submit --class xyStreamJob --master spark:// vm-hostname:7077 /home/user/appl.jar

我注意到在WEB WEB UI中添加并显示了Application.

我已经向主题发布了一些消息,并且没有收到消息并将其保存到Cassandra DB.

我在主Web控制台上单击了应用程序名称,发现该应用程序控制台页面中的Streaming选项卡不可用.

为什么应用程序不能在VM中运行并且在本地运行良好?

如何在VM中调试问题?

def main(args: Array[String]): Unit = {
    val spark = SparkHelper.getOrCreateSparkSession()
    val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
    spark.sparkContext.setLogLevel("WARN")
    val kafkaStream = {
      val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> 
                "vmip:9092",
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "loc",
        "auto.offset.reset" -> "latest",
        "enable.auto.commit" -> (false: java.lang.Boolean) …
Run Code Online (Sandbox Code Playgroud)

streaming scala apache-spark spark-streaming scala-streams

7
推荐指数
1
解决办法
408
查看次数

如何突然停止 akka 流 Runnable Graph?

我不知道如何立即停止 akka 流 Runnable Graph?如何使用killswitch来实现这一点?我开始使用 akka 流才几天。就我而言,我正在从文件中读取行并在流程中执行一些操作并写入接收器。我想要做的是,只要我想立即停止读取文件,我希望这可能会停止整个运行图。对此的任何想法将不胜感激。

提前致谢。

io scala akka akka-stream scala-streams

5
推荐指数
1
解决办法
3140
查看次数

Scala如何获取最后一次计算的流量值?

根据scala docs,stream实现了惰性列表,其中元素仅在需要时进行评估.例;

val fibs: Stream[BigInt] = BigInt(0) #:: BigInt(1) #:: fibs.zip(fibs.tail).map(n => { 
    n._1 + n._2
})  
Run Code Online (Sandbox Code Playgroud)

之后在scala repl中;

fibs(4)
fibs
Run Code Online (Sandbox Code Playgroud)

它会打印出来;

res1:Stream [BigInt] = Stream(0,1,1,2,3,?)

由于调用.length.last会导致无限循环,如何以最有效的方式获得值"3"(最后计算的值)?

scala stream memoization lazy-evaluation scala-streams

5
推荐指数
1
解决办法
238
查看次数

scala中的forEach显示了预期的:Consumer [_>:Path]实际的:(Path)=>布尔值

递归删除scala文件时语法错误

Files.walk(path, FileVisitOption.FOLLOW_LINKS)
    .sorted(Comparator.reverseOrder())
    .forEach(Files.deleteIfExists)
Run Code Online (Sandbox Code Playgroud)

scala scala-streams

4
推荐指数
1
解决办法
1368
查看次数

带有 Stream 类型的 Scala 递归实现

我已经开始在 Coursera 上学习 Scala 并且有一些关于squareRootGuess实现的问题如下

我正在尝试实施标准来过滤sqrtGuess定义中的准确猜测,如下所示,但它给了我堆栈溢出错误。

def sqrtGuess(x: Double): Stream[Double] = { 
  def nextGuess(guess: Double): Double = (guess + x / guess)/2

  def isSufficient(guess: Double): Boolean = math.abs(x - guess*guess)/x < 0.001

  def guesses: Stream[Double] = 
    1 #:: guesses.map(nextGuess).filter(isSufficient)

  guesses
}
Run Code Online (Sandbox Code Playgroud)

但是,如果我们在外部定义 isSufficientsqrtGuess并应用于sqrtGuess流,效果会很好。

def sqrtGuess(x: Double): Stream[Double] = { 
    def nextGuess(guess: Double): Double = (guess + x / guess)/2
    def guesses: Stream[Double] = 
        1 #:: guesses.map(nextGuess)
    guesses
}

def isSufficient(guess: Double, x: …
Run Code Online (Sandbox Code Playgroud)

recursion scala scala-streams

2
推荐指数
1
解决办法
94
查看次数