Alpakka KinesisSink:无法将消息推送到Stream

cch*_*son 5 akka-stream alpakka

我正在尝试使用alpakka kinesis连接器向Kinesis Stream发送消息,但我没有成功.我尝试了下面的代码,但我的流中没有任何内容.

implicit val sys = ActorSystem()
implicit val mat = ActorMaterializer()
implicit val kinesisAsync: AmazonKinesisAsync = AmazonKinesisAsyncClientBuilder.defaultClient()


val debug = Flow[PutRecordsRequestEntry].map { reqEntry =>
    println(reqEntry)
    reqEntry
}

val entry = new PutRecordsRequestEntry()
    .withData(ByteBuffer.wrap("Hello World".getBytes))
    .withPartitionKey(Random.nextInt.toString)

Source.tick(1.second, 1.second, entry).to(KinesisSink("myStreamName", KinesisFlowSettings.defaultInstance)).run()

// 2) Source.tick(1.second, 1.second,entry).via(debug).to(KinesisSink("myStreamName", inesisFlowSettings.defaultInstance)).run()
Run Code Online (Sandbox Code Playgroud)
  • 使用a Sink.foreach(println)而不是每隔1秒KinesisSink打印PutRecordsRequestEntry=> EXPECTED
  • 使用时KinesisSink,该条目仅生成一次.

我究竟做错了什么 ?

我正在检查我的流,KinesisSource并且正在读取工作(使用另一个流测试)

此外,AWS Kinesis的监控仪表板不会显示任何PUT请求.

注1:我尝试启用alpakka的调试日志,但没有效果

<logger name="akka.stream.alpakka.kinesis" level="DEBUG"/>
Run Code Online (Sandbox Code Playgroud)

在我的logback.xml+根级别调试

cch*_*son 2

问题是对流上的操作的访问被拒绝/许可。

我必须添加 akka actor 配置来进行日志记录

akka {
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loglevel = "DEBUG"
  stdout-loglevel = "DEBUG"
  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
  logger-startup-timeout = "30s"
}
Run Code Online (Sandbox Code Playgroud)

查看调试行,我实际上在每个阶段的调试和步骤中运行。

它需要IAM 角色中的权限“PutRecord s ”