问题 当我重新启动/完成/停止流时,旧的消费者不会死/关闭:
[INFO ] a.a.RepointableActorRef -
Message [akka.kafka.KafkaConsumerActor$Internal$Stop$]
from Actor[akka://ufo-sightings/deadLetters]
to Actor[akka://ufo-sightings/system/kafka-consumer-1#1896610594]
was not delivered. [1] dead letters encountered.
Run Code Online (Sandbox Code Playgroud)
描述 我正在构建一个服务,它接收来自Kafka主题的消息,并通过HTTP请求将消息发送到外部服务.
可以断开与外部服务的连接,我的服务需要重试该请求.
此外,如果Stream中存在错误,则需要重新启动整个流.
最后,有时我不需要流和相应的Kafka消费者,我想关闭整个流
所以我有一个流:
Consumer.committableSource(customizedSettings, subscriptions)
.flatMapConcat(sourceFunction)
.toMat(Sink.ignore)
.run
Run Code Online (Sandbox Code Playgroud)
发送Http请求 sourceFunction
我在新文档中遵循了新的Kafka Consumer Restart说明
RestartSource.withBackoff(
minBackoff = 20.seconds,
maxBackoff = 5.minutes,
randomFactor = 0.2 ) { () =>
Consumer.committableSource(customizedSettings, subscriptions)
.watchTermination() {
case (consumerControl, streamComplete) =>
logger.info(s" Started Watching Kafka consumer id = ${consumer.id} termination: is shutdown: ${consumerControl.isShutdown}, is f completed: ${streamComplete.isCompleted}")
consumerControl.isShutdown.map(_ => logger.info(s"Shutdown of consumer finally happened …
Run Code Online (Sandbox Code Playgroud) 我想使用akka-http-client链接http请求作为Stream.链中的每个http请求都取决于先前请求的成功/响应,并使用它来构造新请求.如果请求不成功,则Stream应返回不成功请求的响应.
如何在akka-http中构建这样的流?我应该使用哪个akka-http客户端级API?
我正在使用带有队列的主机级API。
private val (queueSource, connectionPool) = Source.queue[(HttpRequest, Promise[HttpResponse])](queueSize, OverflowStrategy.backpressure).async
.viaMat(poolFlow)(Keep.both)
.toMat(
Sink.foreach({
case ((Success(resp), p)) =>
p.success(resp)
case ((Failure(e), p)) => p.failure(e)
})
)(Keep.left)
.run()
Run Code Online (Sandbox Code Playgroud)
我有很多请求在连接池中争夺连接,但出现以下错误:
java.lang.IllegalStateException: You have to wait for previous offer to be resolved to send another request
at akka.stream.impl.QueueSource$$anon$1.akka$stream$impl$QueueSource$$anon$$bufferElem(QueueSource.scala:84)
at akka.stream.impl.QueueSource$$anon$1$$anonfun$1.apply(QueueSource.scala:94)
at akka.stream.impl.QueueSource$$anon$1$$anonfun$1.apply(QueueSource.scala:91)
at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:447)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:464)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:559)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:741)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:756)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:666)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527)
at akka.actor.ActorCell.invoke(ActorCell.scala:496)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at …
Run Code Online (Sandbox Code Playgroud) 我有一个UIViewController,我正在通过IB使用Autolayout来定位和调整其子视图.
出于某种原因,我没有得到正确的UIView布局,我决定在VC中记录所有子视图大小.
我记录了子视图大小:
viewDidLoad{
self.view.layoutIfNeeded()
print(uiview.size)
}
viewDidLayoutSubviews{
print(uiview.size)
}
viewWillAppear{
print(uiview.size)
}
viewDidAppear{
print(uiview.size)
}
Run Code Online (Sandbox Code Playgroud)
子视图大小是正确的:
但它变得不正确:
如何使用ViewWillAppear中正确的子视图大小?为什么在最终的viewDidLayoutSubviews调用和viewDidAppear中我的尺寸不正确?
在 Scala 中从 FileIO.fromPath 获取 akka Bytestring 的惯用方法是什么?
akka-stream ×4
scala ×4
akka ×2
akka-http ×2
apache-kafka ×1
autolayout ×1
ios8 ×1
swift ×1
xcode6 ×1