小编Rab*_*bzu的帖子

在失败时正常重启Reactive-Kafka Consumer Stream

问题 当我重新启动/完成/停止流时,旧的消费者不会死/关闭:

[INFO ] a.a.RepointableActorRef -
  Message [akka.kafka.KafkaConsumerActor$Internal$Stop$] 
  from Actor[akka://ufo-sightings/deadLetters]
  to Actor[akka://ufo-sightings/system/kafka-consumer-1#1896610594]
  was not delivered. [1] dead letters encountered.
Run Code Online (Sandbox Code Playgroud)

描述 我正在构建一个服务,它接收来自Kafka主题的消息,并通过HTTP请求将消息发送到外部服务.

  1. 可以断开与外部服务的连接,我的服务需要重试该请求.

  2. 此外,如果Stream中存在错误,则需要重新启动整个流.

  3. 最后,有时我不需要流和相应的Kafka消费者,我想关闭整个流

所以我有一个流:

Consumer.committableSource(customizedSettings, subscriptions)
  .flatMapConcat(sourceFunction)
  .toMat(Sink.ignore)
  .run
Run Code Online (Sandbox Code Playgroud)

发送Http请求 sourceFunction

我在新文档中遵循了新的Kafka Consumer Restart说明

  RestartSource.withBackoff(
      minBackoff = 20.seconds,
      maxBackoff = 5.minutes,
      randomFactor = 0.2 ) { () =>
          Consumer.committableSource(customizedSettings, subscriptions)
            .watchTermination() {
                case (consumerControl, streamComplete) =>
                  logger.info(s" Started Watching Kafka consumer id = ${consumer.id} termination: is shutdown: ${consumerControl.isShutdown}, is f completed: ${streamComplete.isCompleted}")
                  consumerControl.isShutdown.map(_ => logger.info(s"Shutdown of consumer finally happened …
Run Code Online (Sandbox Code Playgroud)

scala akka apache-kafka akka-stream reactive-kafka

12
推荐指数
1
解决办法
729
查看次数

链中的Akka-http-client链请求

我想使用akka-http-client链接http请求作为Stream.链中的每个http请求都取决于先前请求的成功/响应,并使用它来构造新请求.如果请求不成功,则Stream应返回不成功请求的响应.

如何在akka-http中构建这样的流?我应该使用哪个akka-http客户端级API?

scala akka-stream akka-http

8
推荐指数
1
解决办法
1916
查看次数

如何启用 Source.Queue 背压

我正在使用带有队列的主机级API

  private val (queueSource, connectionPool) = Source.queue[(HttpRequest, Promise[HttpResponse])](queueSize, OverflowStrategy.backpressure).async
    .viaMat(poolFlow)(Keep.both)
    .toMat(
      Sink.foreach({
        case ((Success(resp), p)) =>
          p.success(resp)
        case ((Failure(e), p)) => p.failure(e)
      })
    )(Keep.left)
    .run()
Run Code Online (Sandbox Code Playgroud)

我有很多请求在连接池中争夺连接,但出现以下错误:

 java.lang.IllegalStateException: You have to wait for previous offer to be resolved to send another request
    at akka.stream.impl.QueueSource$$anon$1.akka$stream$impl$QueueSource$$anon$$bufferElem(QueueSource.scala:84)
    at akka.stream.impl.QueueSource$$anon$1$$anonfun$1.apply(QueueSource.scala:94)
    at akka.stream.impl.QueueSource$$anon$1$$anonfun$1.apply(QueueSource.scala:91)
    at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:447)
    at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:464)
    at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:559)
    at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:741)
    at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:756)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:666)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527)
    at akka.actor.ActorCell.invoke(ActorCell.scala:496)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at …
Run Code Online (Sandbox Code Playgroud)

scala akka akka-stream akka-http

5
推荐指数
1
解决办法
1304
查看次数

Autolayout:ViewDidAppear中的帧大小不正确

我有一个UIViewController,我正在通过IB使用Autolayout来定位和调整其子视图.

出于某种原因,我没有得到正确的UIView布局,我决定在VC中记录所有子视图大小.

我记录了子视图大小:

viewDidLoad{
   self.view.layoutIfNeeded()
   print(uiview.size)
}

viewDidLayoutSubviews{
 print(uiview.size)
}
viewWillAppear{
 print(uiview.size)

}
viewDidAppear{
 print(uiview.size)
}
Run Code Online (Sandbox Code Playgroud)

子视图大小是正确的:

  1. viewDidLoad中
  2. viewDidLayoutSubviews
  3. viewWillAppear中
  4. viewDidLayoutSubviews

但它变得不正确:

  1. viewDidLayoutSubviews
  2. viewDidAppear

如何使用ViewWillAppear中正确的子视图大小?为什么在最终的viewDidLayoutSubviews调用和viewDidAppear中我的尺寸不正确?

autolayout swift xcode6 ios8 ios-autolayout

1
推荐指数
1
解决办法
4543
查看次数

如何以惯用的方式从 FileIO.fromPath 获取 Bytestring

在 Scala 中从 FileIO.fromPath 获取 akka Bytestring 的惯用方法是什么?

scala akka-stream

1
推荐指数
1
解决办法
1120
查看次数