小编wip*_*man的帖子

来自Actor的Spark-Streaming

我希望有一个消费者演员订阅Kafka主题并流式传输数据,以便在消费者之外使用Spark Streaming进行进一步处理.为什么演员呢?因为我读到它的主管策略是处理Kafka失败的好方法(例如,重启失败).

我找到了两个选择:

  • Java KafkaConsumer类:它的poll()方法返回一个Map[String, Object].我希望像我一样DStream返回KafkaUtils.createDirectStream,而且我不知道如何从actor外部获取流.
  • 扩展ActorHelper特性并使用actorStream()如此示例中所示.后一个选项不显示与主题的连接,而是显示与套接字的连接.

有人能指出我正确的方向吗?

scala actor apache-kafka spark-streaming

9
推荐指数
1
解决办法
285
查看次数

关于ubuntu 16.04安装失败的问题

我面临的一个failed downloads问题,同时要安装sbt 0.13.13ubuntu 16.04上下列文件:http://www.scala-sbt.org/release/docs/Installing-sbt-on-Linux.html.

下面是消息日志:

在此输入图像描述

我也尝试了paradigmatic这里的答案:在ubuntu上安装sbt,并且:https://askubuntu.com/questions/732092/how-to-install-sbt-in-its-latest-version-in-ubuntu-14-04.这些帖子有点旧.从来没有,官方的方法似乎到目前为止.

有人遇到同样的障碍吗?

install sbt ubuntu-16.04

6
推荐指数
1
解决办法
486
查看次数

如何将库依赖项更新为Scala版本?

我正在用sbt编译我的项目并且收到UNRESOLVED DEPENDENCIES错误.

事实上,我从在线博客中获取了我使用的示例(一个hello world程序),其使用scalaVersion := "2.10.0"如下所示.我使用的是2.11.2.

如何将库依赖项(在build.sbt)中更新到最新版本的Scala,特别是revision部件?

build.sbt

name := "Hello Test #1"

version := "1.0"

scalaVersion := "2.10.0"

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"

libraryDependencies += "com.typesafe.akka" % "akka-actor_2.10" % "2.2-M1"
Run Code Online (Sandbox Code Playgroud)

错误:

[info] Resolving com.typesafe.akka#akka-actor_2.11.2;2.2-M1 ...
[warn]  module not found: com.typesafe.akka#akka-actor_2.11.2;2.2-M1
[warn] ==== local: tried
[warn]   /home/plard/.ivy2/local/com.typesafe.akka/akka-actor_2.11.2/2.2-M1/ivys/ivy.xml
[warn] ==== public: tried
[warn]   http://repo1.maven.org/maven2/com/typesafe/akka/akka-actor_2.11.2/2.2-M1/akka-actor_2.11.2-2.2-M1.pom
[warn] ==== Typesafe Repository: tried
[warn]   http://repo.typesafe.com/typesafe/releases/com/typesafe/akka/akka-actor_2.11.2/2.2-M1/akka-actor_2.11.2-2.2-M1.pom
[info] Resolving jline#jline;2.12 ...
[warn]  ::::::::::::::::::::::::::::::::::::::::::::::
[warn]  ::          UNRESOLVED DEPENDENCIES         :: …
Run Code Online (Sandbox Code Playgroud)

sbt

4
推荐指数
1
解决办法
2092
查看次数

Akka Scheduler,时间单位错误

我正在尝试向演员发送推迟的消息.因此,我使用了http://doc.akka.io/docs/akka/2.1.0/scala/scheduler.html中Scheduler的代码示例.

[error] /home/zeus/terra/src/main/scala/hw.scala:55: value milliseconds is not a member of Int
[error]         50 milliseconds,
[error]            ^
[error] one error found
[error] (compile:compile) Compilation failed
[error] Total time: 3 s, completed Aug 11, 2014 4:07:48 PM
Run Code Online (Sandbox Code Playgroud)

令人惊讶的是,发生了这个错误... milliseconds从我的代码中删除了另一个错误:

[error] /home/vador/death_star/src/main/scala/hw.scala:55: type mismatch;
[error]  found   : Int(50)
[error]  required: scala.concurrent.duration.FiniteDuration
[error]         50,
[error]         ^
[error] one error found
[error] (compile:compile) Compilation failed
[error] Total time: 3 s, completed Aug 11, 2014 4:12:32 PM
Run Code Online (Sandbox Code Playgroud)

我不明白这一点.

scala akka

3
推荐指数
1
解决办法
2260
查看次数

摆脱旧的scala版本并安装2.11.7

Noob提问来了.

我刚刚scala 2.9.2从我的机器上卸载了:

sudo apt-get remove scala
Run Code Online (Sandbox Code Playgroud)

虽然命令:

scala -version
Run Code Online (Sandbox Code Playgroud)

输出:

The program 'scala' is currently not installed. You can install it by typing: sudo apt-get install scala
Run Code Online (Sandbox Code Playgroud)

scala/ sbt程序仍在运行......

为什么?我不知道.

我想安装最新版本scala2.11.7.

我的问题是: - 如何完全卸载scala?- 我该如何安装scala 2.11.7?(在哪个目录中解压缩.tgz?什么绑定?)

ubuntu scala

3
推荐指数
1
解决办法
4387
查看次数

本地Kafka应用程序失败:NoSuchMethodError:createEphemeral

我试图以spark本地模式运行我的应用程序.为了全部设置,我按照本教程:http://blog.d2-si.fr/2015/11/05/apache-kafka-3/,(法语)显示构建本地kafka/ zookeeper环境的每个步骤.

而且,我使用IntelliJ以下配置:

val sparkConf = new SparkConf().setAppName("zumbaApp").setMaster("local[2]")
Run Code Online (Sandbox Code Playgroud)

我的运行配置,为消费者:

"127.0.0.1:2181" "zumbaApp-gpId" "D2SI" "1"
Run Code Online (Sandbox Code Playgroud)

而对于制片人:

"127.0.0.1:9092" "D2SI" "my\Input\File.csv" 300
Run Code Online (Sandbox Code Playgroud)

在此之前,我检查如果消费者从默认的生产者收到的意见console-producerconsole-consumerkafka_2.10-0.9.0.1; 它确实.

但是,我面临以下错误:

java.lang.NoSuchMethodError: org.I0Itec.zkclient.ZkClient.createEphemeral(Ljava/lang/String;Ljava/lang/Object;Ljava/util/List;)V
at kafka.utils.ZkPath$.createEphemeral(ZkUtils.scala:921)
at kafka.utils.ZkUtils.createEphemeralPath(ZkUtils.scala:348)
at kafka.utils.ZkUtils.createEphemeralPathExpectConflict(ZkUtils.scala:363)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$18.apply(ZookeeperConsumerConnector.scala:839)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$18.apply(ZookeeperConsumerConnector.scala:833)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.reflectPartitionOwnershipDecision(ZookeeperConsumerConnector.scala:833)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:721)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:636)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:627)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:627) …
Run Code Online (Sandbox Code Playgroud)

producer-consumer apache-kafka apache-spark apache-zookeeper

3
推荐指数
1
解决办法
1977
查看次数

未找到状态设置中的参数

我正在解决一个错误,如果可能我真的想解决,而不是简单地找到解决方法.

这是我的代码:

import akka.actor._
import concurrent.duration._

sealed trait Message
case class ReturnInfluenceMessage(source: ActorRef) extends Message
case class SetInfluences(source: ActorRef) extends Message
case class Start(period:Int) extends Message
case object GetInfluence extends Message
case object Stop extends Message
case object TickMessage extends Message

class Listener extends Actor {
    def receive = {
        case _ =>
            println("Listener: received something\n")
    }
}

class Entity extends Actor {
    val router = context.actorOf(Props[Router], name = "Router")

    def receive = {
        case rim @ ReturnInfluenceMessage(source) =>
            source …
Run Code Online (Sandbox Code Playgroud)

scala akka

1
推荐指数
1
解决办法
900
查看次数

特质作为辛格尔顿

有可能trait成为一个singleton吗?

我要实现的目标是拥有一个干净轻巧的API,可以将其扩展到整个应用程序,如下所示:

trait SingletonTrait {
  // element I wish to be unique throughout my application
  val singletonElement = ///

  ...
}

// uses *singletonElement*
object MainApplication extends SingletonTrait {
  ...
}

// uses *singletonElement*
class SomeClass(...) extends SingletonTrait {
  ...
}
Run Code Online (Sandbox Code Playgroud)

In the same idea implied by a getOrCreate() function that would retrieve an existing instance of an element if one already exists or creates it otherwise.

singleton scala traits

1
推荐指数
1
解决办法
74
查看次数