标签: alpakka

以编程方式停止Alpakka Kafka流的正确方法

我们正在尝试将Akka流与Alpakka Kafka一起使用以消耗服务中的事件流。为了处理事件处理错误,我们使用Kafka自动提交和多个队列。例如,如果我们有user_created要从产品服务中使用的主题,那么我们还将创建user_created_for_products_faileduser_created_for_products_dead_letter。这两个额外的主题与特定的Kafka消费者群体相关。如果事件无法处理,它将进入失败的队列,我们​​尝试在五分钟内再次消耗事件;如果事件再次失败,则变为死信。

在部署时,我们要确保我们不会丢失事件。因此,我们试图在停止应用程序之前停止流。就像我说的,我们正在使用自动提交,但是所有这些正在“飞行”的事件尚未得到处理。流和应用程序停止后,我们可以部署新代码并重新启动应用程序。

阅读文档后,我们已经了解了该KillSwitch功能。我们在这看到的问题是,该shutdown方法返回Unit,而不是Future[Unit]像我们期望的那样。我们不确定使用它不会丢失事件,因为在测试中,它看起来太快而无法正常工作。

解决方法是,ActorSystem为每个流创建一个,然后使用terminate方法(返回Future[Terminate])。此解决方案的问题在于,我们认为创建ActorSystem每个流的伸缩性不好,并且解决该问题terminate需要花费很多时间(在测试中最多需要一分钟才能关闭)。

你遇到过这样的问题吗?有没有一种更快的方法(与相比ActorSystem.terminate)来停止流并确保Source已处理发出的所有事件?

scala akka apache-kafka akka-stream alpakka

6
推荐指数
1
解决办法
491
查看次数

Alpakka KinesisSink:无法将消息推送到Stream

我正在尝试使用alpakka kinesis连接器向Kinesis Stream发送消息,但我没有成功.我尝试了下面的代码,但我的流中没有任何内容.

implicit val sys = ActorSystem()
implicit val mat = ActorMaterializer()
implicit val kinesisAsync: AmazonKinesisAsync = AmazonKinesisAsyncClientBuilder.defaultClient()


val debug = Flow[PutRecordsRequestEntry].map { reqEntry =>
    println(reqEntry)
    reqEntry
}

val entry = new PutRecordsRequestEntry()
    .withData(ByteBuffer.wrap("Hello World".getBytes))
    .withPartitionKey(Random.nextInt.toString)

Source.tick(1.second, 1.second, entry).to(KinesisSink("myStreamName", KinesisFlowSettings.defaultInstance)).run()

// 2) Source.tick(1.second, 1.second,entry).via(debug).to(KinesisSink("myStreamName", inesisFlowSettings.defaultInstance)).run()
Run Code Online (Sandbox Code Playgroud)
  • 使用a Sink.foreach(println)而不是每隔1秒KinesisSink打印PutRecordsRequestEntry=> EXPECTED
  • 使用时KinesisSink,该条目仅生成一次.

我究竟做错了什么 ?

我正在检查我的流,KinesisSource并且正在读取工作(使用另一个流测试)

此外,AWS Kinesis的监控仪表板不会显示任何PUT请求.

注1:我尝试启用alpakka的调试日志,但没有效果

<logger name="akka.stream.alpakka.kinesis" level="DEBUG"/>
Run Code Online (Sandbox Code Playgroud)

在我的logback.xml+根级别调试

akka-stream alpakka

5
推荐指数
1
解决办法
392
查看次数

计算Akka Streams中的元素数量

我想变换Source斯卡拉实体到SourceByteString通过Alpakka的CsvFormatting.但是,由于缺乏Scala和Akka经验,我发现很难计算初始流中元素的数量.你能建议计算initialSource元素的最佳方法,并将结果保持为ByteString Source:

val initialSource: Source[SomeEntity, NotUsed] = Source.fromPublisher(publisher)
val csvSource: Source[ByteString, NotUsed] = initialSource
  .map(e => List(e.firstName, e.lastName, e.city))
  .via(CsvFormatting.format())
Run Code Online (Sandbox Code Playgroud)

scala akka akka-stream alpakka

5
推荐指数
1
解决办法
2083
查看次数

AMQP 服务器重新启动时 Akka 流停止

我在使用 Alpakka AMQP 连接器和 Akka Streams 时遇到了一个非常奇怪的问题。

当我的 RabbitMQ 消息代理重新启动时,源似乎重新启动正常。但是,一旦重新启动,流就永远不会完成,并且消息会在流中更远的分区中丢失。当我启动 AMQP 服务器时,我的 Akka 应用程序工作正常,但相反一切都一团糟。

这是我初始化我的方法AMQPSource

val amqpMessageSource = builder.add {
  val amqpSource = AmqpSource(
    NamedQueueSourceSettings(connectionDetails, amqpInMessageQueue).withDeclarations(queueDeclaration),
    bufferSize = 10
  ).map { message =>
    fromIncomingMessage(message)
  }.initialDelay(5.seconds)
  amqpSource.recoverWithRetries(-1, { case _ => amqpSource }) // Retry every 5 seconds an infinity of times
}
Run Code Online (Sandbox Code Playgroud)

我尝试删除发生问题的分区,以将流直接发送到与我的示例相关的流,甚至更奇怪:在这种情况下,AMQP 客户端甚至不再从 RabbitMQ 读取消息。

我显然在这里遗漏了一些东西,但我尝试了很多不同的方法,但根本没有解决我的问题。

scala rabbitmq akka akka-stream alpakka

5
推荐指数
0
解决办法
401
查看次数

使用 Alpakka S3 连接器的多个下载请求

我正在尝试使用Alpakka S3 连接器执行以下操作:

我使用的代码是这样的:

val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] = S3.multipartUpload("my-s3-bucket", "archive.zip")

val sourceList = (1 to 10).map(i => S3.download("my-s3-bucket", s"random$i.png").map {
    case Some((s, m)) => (ArchiveMetadata(s"${UUID.randomUUID()}.png"), s)
})
val source = Source.combine(sourceList.head, sourceList.tail.head, sourceList.tail.tail: _*)(Merge(_))

source
    .via(Archive.zip())
    .to(s3Sink)
    .run()
Run Code Online (Sandbox Code Playgroud)

但是,这会导致以下错误:

Response entity was not subscribed after 1 second. Make sure to read the response entity body or call `discardBytes()` on it.
Run Code Online (Sandbox Code Playgroud)

我怀疑这是因为 S3 连接器使用的底层 Akka …

scala amazon-s3 akka akka-stream alpakka

5
推荐指数
1
解决办法
655
查看次数

Alpakka卡夫卡vs卡夫卡流

我们正在构建一个高吞吐量,低延迟的流处理应用程序。我们将Apache Kafka用作消息传递平台和数据库。

似乎Kafka Streams和Alpakka Kafka框架有很多共同之处,但是Kafka Streams似乎比Kafka更“原生”,而Alpakka允许我们使用Akka框架的功能。

这两个框架之间的主要区别是什么?

scala akka apache-kafka apache-kafka-streams alpakka

1
推荐指数
1
解决办法
689
查看次数

使用 Slick (JDBC) Connector for Alpakka 时使用 Paging SQL 语句是否有意义

我目前想知道 Alpakka 的 Slick (JDBC) 连接器是如何在幕后工作的 - 我真的无法使用文档找到答案。

考虑一个用例,我想处理从数据库中选择的大量记录。我可以简单地SELECT * FROM [TABLE]在单个流中使用 a 吗,或者为每个页面(一个接一个)启动多个流是否有意义,例如SELECT * FROM [TABLE] LIMIT 0,1000.

我希望/认为 Slick Connector Alpakka 的反应式方式只在流需要它们时才从数据库中获取记录,以便我可以使用SELECT * FROM [TABLE]...

谁能给我一些见解或一些好的文档来阅读?

scala slick akka-stream alpakka

1
推荐指数
1
解决办法
633
查看次数