我们正在尝试将Akka流与Alpakka Kafka一起使用以消耗服务中的事件流。为了处理事件处理错误,我们使用Kafka自动提交和多个队列。例如,如果我们有user_created要从产品服务中使用的主题,那么我们还将创建user_created_for_products_failed和user_created_for_products_dead_letter。这两个额外的主题与特定的Kafka消费者群体相关。如果事件无法处理,它将进入失败的队列,我们尝试在五分钟内再次消耗事件;如果事件再次失败,则变为死信。
在部署时,我们要确保我们不会丢失事件。因此,我们试图在停止应用程序之前停止流。就像我说的,我们正在使用自动提交,但是所有这些正在“飞行”的事件尚未得到处理。流和应用程序停止后,我们可以部署新代码并重新启动应用程序。
阅读文档后,我们已经了解了该KillSwitch功能。我们在这看到的问题是,该shutdown方法返回Unit,而不是Future[Unit]像我们期望的那样。我们不确定使用它不会丢失事件,因为在测试中,它看起来太快而无法正常工作。
解决方法是,ActorSystem为每个流创建一个,然后使用terminate方法(返回Future[Terminate])。此解决方案的问题在于,我们认为创建ActorSystem每个流的伸缩性不好,并且解决该问题terminate需要花费很多时间(在测试中最多需要一分钟才能关闭)。
你遇到过这样的问题吗?有没有一种更快的方法(与相比ActorSystem.terminate)来停止流并确保Source已处理发出的所有事件?
我正在尝试使用alpakka kinesis连接器向Kinesis Stream发送消息,但我没有成功.我尝试了下面的代码,但我的流中没有任何内容.
implicit val sys = ActorSystem()
implicit val mat = ActorMaterializer()
implicit val kinesisAsync: AmazonKinesisAsync = AmazonKinesisAsyncClientBuilder.defaultClient()
val debug = Flow[PutRecordsRequestEntry].map { reqEntry =>
println(reqEntry)
reqEntry
}
val entry = new PutRecordsRequestEntry()
.withData(ByteBuffer.wrap("Hello World".getBytes))
.withPartitionKey(Random.nextInt.toString)
Source.tick(1.second, 1.second, entry).to(KinesisSink("myStreamName", KinesisFlowSettings.defaultInstance)).run()
// 2) Source.tick(1.second, 1.second,entry).via(debug).to(KinesisSink("myStreamName", inesisFlowSettings.defaultInstance)).run()
Run Code Online (Sandbox Code Playgroud)
Sink.foreach(println)而不是每隔1秒KinesisSink打印PutRecordsRequestEntry=> EXPECTEDKinesisSink,该条目仅生成一次.我究竟做错了什么 ?
我正在检查我的流,KinesisSource并且正在读取工作(使用另一个流测试)
此外,AWS Kinesis的监控仪表板不会显示任何PUT请求.
注1:我尝试启用alpakka的调试日志,但没有效果
<logger name="akka.stream.alpakka.kinesis" level="DEBUG"/>
Run Code Online (Sandbox Code Playgroud)
在我的logback.xml+根级别调试
我想变换Source斯卡拉实体到Source的ByteString通过Alpakka的CsvFormatting.但是,由于缺乏Scala和Akka经验,我发现很难计算初始流中元素的数量.你能建议计算initialSource元素的最佳方法,并将结果保持为ByteString Source:
val initialSource: Source[SomeEntity, NotUsed] = Source.fromPublisher(publisher)
val csvSource: Source[ByteString, NotUsed] = initialSource
.map(e => List(e.firstName, e.lastName, e.city))
.via(CsvFormatting.format())
Run Code Online (Sandbox Code Playgroud) 我在使用 Alpakka AMQP 连接器和 Akka Streams 时遇到了一个非常奇怪的问题。
当我的 RabbitMQ 消息代理重新启动时,源似乎重新启动正常。但是,一旦重新启动,流就永远不会完成,并且消息会在流中更远的分区中丢失。当我启动 AMQP 服务器时,我的 Akka 应用程序工作正常,但相反一切都一团糟。
这是我初始化我的方法AMQPSource:
val amqpMessageSource = builder.add {
val amqpSource = AmqpSource(
NamedQueueSourceSettings(connectionDetails, amqpInMessageQueue).withDeclarations(queueDeclaration),
bufferSize = 10
).map { message =>
fromIncomingMessage(message)
}.initialDelay(5.seconds)
amqpSource.recoverWithRetries(-1, { case _ => amqpSource }) // Retry every 5 seconds an infinity of times
}
Run Code Online (Sandbox Code Playgroud)
我尝试删除发生问题的分区,以将流直接发送到与我的示例相关的流,甚至更奇怪:在这种情况下,AMQP 客户端甚至不再从 RabbitMQ 读取消息。
我显然在这里遗漏了一些东西,但我尝试了很多不同的方法,但根本没有解决我的问题。
我正在尝试使用Alpakka S3 连接器执行以下操作:
我使用的代码是这样的:
val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] = S3.multipartUpload("my-s3-bucket", "archive.zip")
val sourceList = (1 to 10).map(i => S3.download("my-s3-bucket", s"random$i.png").map {
case Some((s, m)) => (ArchiveMetadata(s"${UUID.randomUUID()}.png"), s)
})
val source = Source.combine(sourceList.head, sourceList.tail.head, sourceList.tail.tail: _*)(Merge(_))
source
.via(Archive.zip())
.to(s3Sink)
.run()
Run Code Online (Sandbox Code Playgroud)
但是,这会导致以下错误:
Response entity was not subscribed after 1 second. Make sure to read the response entity body or call `discardBytes()` on it.
Run Code Online (Sandbox Code Playgroud)
我怀疑这是因为 S3 连接器使用的底层 Akka …
我们正在构建一个高吞吐量,低延迟的流处理应用程序。我们将Apache Kafka用作消息传递平台和数据库。
似乎Kafka Streams和Alpakka Kafka框架有很多共同之处,但是Kafka Streams似乎比Kafka更“原生”,而Alpakka允许我们使用Akka框架的功能。
这两个框架之间的主要区别是什么?
我目前想知道 Alpakka 的 Slick (JDBC) 连接器是如何在幕后工作的 - 我真的无法使用文档找到答案。
考虑一个用例,我想处理从数据库中选择的大量记录。我可以简单地SELECT * FROM [TABLE]在单个流中使用 a 吗,或者为每个页面(一个接一个)启动多个流是否有意义,例如SELECT * FROM [TABLE] LIMIT 0,1000.
我希望/认为 Slick Connector Alpakka 的反应式方式只在流需要它们时才从数据库中获取记录,以便我可以使用SELECT * FROM [TABLE]...
谁能给我一些见解或一些好的文档来阅读?