我希望有一个消费者演员订阅Kafka主题并流式传输数据,以便在消费者之外使用Spark Streaming进行进一步处理.为什么演员呢?因为我读到它的主管策略是处理Kafka失败的好方法(例如,重启失败).
我找到了两个选择:
KafkaConsumer
类:它的poll()
方法返回一个Map[String, Object]
.我希望像我一样DStream
返回KafkaUtils.createDirectStream
,而且我不知道如何从actor外部获取流.ActorHelper
特性并使用actorStream()
如此示例中所示.后一个选项不显示与主题的连接,而是显示与套接字的连接.有人能指出我正确的方向吗?
我面临的一个failed downloads
问题,同时要安装sbt 0.13.13
在ubuntu 16.04
上下列文件:http://www.scala-sbt.org/release/docs/Installing-sbt-on-Linux.html.
下面是消息日志:
我也尝试了paradigmatic
这里的答案:在ubuntu上安装sbt,并且:https://askubuntu.com/questions/732092/how-to-install-sbt-in-its-latest-version-in-ubuntu-14-04.这些帖子有点旧.从来没有,官方的方法似乎到目前为止.
有人遇到同样的障碍吗?
我正在用sbt编译我的项目并且收到UNRESOLVED DEPENDENCIES
错误.
事实上,我从在线博客中获取了我使用的示例(一个hello world程序),其使用scalaVersion := "2.10.0"
如下所示.我使用的是2.11.2.
如何将库依赖项(在build.sbt
)中更新到最新版本的Scala,特别是revision
部件?
build.sbt
name := "Hello Test #1"
version := "1.0"
scalaVersion := "2.10.0"
resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
libraryDependencies += "com.typesafe.akka" % "akka-actor_2.10" % "2.2-M1"
Run Code Online (Sandbox Code Playgroud)
错误:
[info] Resolving com.typesafe.akka#akka-actor_2.11.2;2.2-M1 ...
[warn] module not found: com.typesafe.akka#akka-actor_2.11.2;2.2-M1
[warn] ==== local: tried
[warn] /home/plard/.ivy2/local/com.typesafe.akka/akka-actor_2.11.2/2.2-M1/ivys/ivy.xml
[warn] ==== public: tried
[warn] http://repo1.maven.org/maven2/com/typesafe/akka/akka-actor_2.11.2/2.2-M1/akka-actor_2.11.2-2.2-M1.pom
[warn] ==== Typesafe Repository: tried
[warn] http://repo.typesafe.com/typesafe/releases/com/typesafe/akka/akka-actor_2.11.2/2.2-M1/akka-actor_2.11.2-2.2-M1.pom
[info] Resolving jline#jline;2.12 ...
[warn] ::::::::::::::::::::::::::::::::::::::::::::::
[warn] :: UNRESOLVED DEPENDENCIES :: …
Run Code Online (Sandbox Code Playgroud) 我正在尝试向演员发送推迟的消息.因此,我使用了http://doc.akka.io/docs/akka/2.1.0/scala/scheduler.html中Scheduler
的代码示例.
[error] /home/zeus/terra/src/main/scala/hw.scala:55: value milliseconds is not a member of Int
[error] 50 milliseconds,
[error] ^
[error] one error found
[error] (compile:compile) Compilation failed
[error] Total time: 3 s, completed Aug 11, 2014 4:07:48 PM
Run Code Online (Sandbox Code Playgroud)
令人惊讶的是,发生了这个错误... milliseconds
从我的代码中删除了另一个错误:
[error] /home/vador/death_star/src/main/scala/hw.scala:55: type mismatch;
[error] found : Int(50)
[error] required: scala.concurrent.duration.FiniteDuration
[error] 50,
[error] ^
[error] one error found
[error] (compile:compile) Compilation failed
[error] Total time: 3 s, completed Aug 11, 2014 4:12:32 PM
Run Code Online (Sandbox Code Playgroud)
我不明白这一点.
Noob提问来了.
我刚刚scala 2.9.2
从我的机器上卸载了:
sudo apt-get remove scala
Run Code Online (Sandbox Code Playgroud)
虽然命令:
scala -version
Run Code Online (Sandbox Code Playgroud)
输出:
The program 'scala' is currently not installed. You can install it by typing: sudo apt-get install scala
Run Code Online (Sandbox Code Playgroud)
我scala
/ sbt
程序仍在运行......
为什么?我不知道.
我想安装最新版本scala
的2.11.7
.
我的问题是: - 如何完全卸载scala
?- 我该如何安装scala 2.11.7
?(在哪个目录中解压缩.tgz
?什么绑定?)
我试图以spark
本地模式运行我的应用程序.为了全部设置,我按照本教程:http://blog.d2-si.fr/2015/11/05/apache-kafka-3/,(法语)显示构建本地kafka
/ zookeeper
环境的每个步骤.
而且,我使用IntelliJ
以下配置:
val sparkConf = new SparkConf().setAppName("zumbaApp").setMaster("local[2]")
Run Code Online (Sandbox Code Playgroud)
我的运行配置,为消费者:
"127.0.0.1:2181" "zumbaApp-gpId" "D2SI" "1"
Run Code Online (Sandbox Code Playgroud)
而对于制片人:
"127.0.0.1:9092" "D2SI" "my\Input\File.csv" 300
Run Code Online (Sandbox Code Playgroud)
在此之前,我检查如果消费者从默认的生产者收到的意见console-producer
和console-consumer
的kafka_2.10-0.9.0.1
; 它确实.
但是,我面临以下错误:
java.lang.NoSuchMethodError: org.I0Itec.zkclient.ZkClient.createEphemeral(Ljava/lang/String;Ljava/lang/Object;Ljava/util/List;)V
at kafka.utils.ZkPath$.createEphemeral(ZkUtils.scala:921)
at kafka.utils.ZkUtils.createEphemeralPath(ZkUtils.scala:348)
at kafka.utils.ZkUtils.createEphemeralPathExpectConflict(ZkUtils.scala:363)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$18.apply(ZookeeperConsumerConnector.scala:839)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$18.apply(ZookeeperConsumerConnector.scala:833)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.reflectPartitionOwnershipDecision(ZookeeperConsumerConnector.scala:833)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:721)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:636)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:627)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:627) …
Run Code Online (Sandbox Code Playgroud) producer-consumer apache-kafka apache-spark apache-zookeeper
我正在解决一个错误,如果可能我真的想解决,而不是简单地找到解决方法.
这是我的代码:
import akka.actor._
import concurrent.duration._
sealed trait Message
case class ReturnInfluenceMessage(source: ActorRef) extends Message
case class SetInfluences(source: ActorRef) extends Message
case class Start(period:Int) extends Message
case object GetInfluence extends Message
case object Stop extends Message
case object TickMessage extends Message
class Listener extends Actor {
def receive = {
case _ =>
println("Listener: received something\n")
}
}
class Entity extends Actor {
val router = context.actorOf(Props[Router], name = "Router")
def receive = {
case rim @ ReturnInfluenceMessage(source) =>
source …
Run Code Online (Sandbox Code Playgroud) 有可能trait
成为一个singleton
吗?
我要实现的目标是拥有一个干净轻巧的API,可以将其扩展到整个应用程序,如下所示:
trait SingletonTrait {
// element I wish to be unique throughout my application
val singletonElement = ///
...
}
// uses *singletonElement*
object MainApplication extends SingletonTrait {
...
}
// uses *singletonElement*
class SomeClass(...) extends SingletonTrait {
...
}
Run Code Online (Sandbox Code Playgroud)
In the same idea implied by a getOrCreate()
function that would retrieve an existing instance of an element if one already exists or creates it otherwise.
scala ×5
akka ×2
apache-kafka ×2
sbt ×2
actor ×1
apache-spark ×1
install ×1
singleton ×1
traits ×1
ubuntu ×1
ubuntu-16.04 ×1