我使用Scala编写了Kafka流程序并在Spark独立集群中执行.代码在我的本地工作正常.我在Azure VM中完成了Kafka,Cassandra和Spark设置.我已打开所有入站和出站端口以避免端口阻塞.
开始大师
sbin目录> ./ start-master.sh
开始奴隶
sbin#./ start-slave.sh spark:// vm-hostname:7077
我已在Master WEB UI中验证了此状态.
提交工作
bin#./ spark-submit --class xyStreamJob --master spark:// vm-hostname:7077 /home/user/appl.jar
我注意到在WEB WEB UI中添加并显示了Application.
我已经向主题发布了一些消息,并且没有收到消息并将其保存到Cassandra DB.
我在主Web控制台上单击了应用程序名称,发现该应用程序控制台页面中的Streaming选项卡不可用.
为什么应用程序不能在VM中运行并且在本地运行良好?
如何在VM中调试问题?
def main(args: Array[String]): Unit = {
val spark = SparkHelper.getOrCreateSparkSession()
val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
spark.sparkContext.setLogLevel("WARN")
val kafkaStream = {
val kafkaParams = Map[String, Object](
"bootstrap.servers" ->
"vmip:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "loc",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean) …Run Code Online (Sandbox Code Playgroud) 我不知道如何立即停止 akka 流 Runnable Graph?如何使用killswitch来实现这一点?我开始使用 akka 流才几天。就我而言,我正在从文件中读取行并在流程中执行一些操作并写入接收器。我想要做的是,只要我想立即停止读取文件,我希望这可能会停止整个运行图。对此的任何想法将不胜感激。
提前致谢。
根据scala docs,stream实现了惰性列表,其中元素仅在需要时进行评估.例;
val fibs: Stream[BigInt] = BigInt(0) #:: BigInt(1) #:: fibs.zip(fibs.tail).map(n => {
n._1 + n._2
})
Run Code Online (Sandbox Code Playgroud)
之后在scala repl中;
fibs(4)
fibs
Run Code Online (Sandbox Code Playgroud)
它会打印出来;
res1:Stream [BigInt] = Stream(0,1,1,2,3,?)
由于调用.length或.last会导致无限循环,如何以最有效的方式获得值"3"(最后计算的值)?
递归删除scala文件时语法错误
Files.walk(path, FileVisitOption.FOLLOW_LINKS)
.sorted(Comparator.reverseOrder())
.forEach(Files.deleteIfExists)
Run Code Online (Sandbox Code Playgroud) 我已经开始在 Coursera 上学习 Scala 并且有一些关于squareRootGuess实现的问题如下
我正在尝试实施标准来过滤sqrtGuess定义中的准确猜测,如下所示,但它给了我堆栈溢出错误。
def sqrtGuess(x: Double): Stream[Double] = {
def nextGuess(guess: Double): Double = (guess + x / guess)/2
def isSufficient(guess: Double): Boolean = math.abs(x - guess*guess)/x < 0.001
def guesses: Stream[Double] =
1 #:: guesses.map(nextGuess).filter(isSufficient)
guesses
}
Run Code Online (Sandbox Code Playgroud)
但是,如果我们在外部定义 isSufficientsqrtGuess并应用于sqrtGuess流,效果会很好。
def sqrtGuess(x: Double): Stream[Double] = {
def nextGuess(guess: Double): Double = (guess + x / guess)/2
def guesses: Stream[Double] =
1 #:: guesses.map(nextGuess)
guesses
}
def isSufficient(guess: Double, x: …Run Code Online (Sandbox Code Playgroud) scala ×5
akka ×1
akka-stream ×1
apache-spark ×1
io ×1
memoization ×1
recursion ×1
stream ×1
streaming ×1