cch*_*son 5 akka-stream alpakka
我正在尝试使用alpakka kinesis连接器向Kinesis Stream发送消息,但我没有成功.我尝试了下面的代码,但我的流中没有任何内容.
implicit val sys = ActorSystem()
implicit val mat = ActorMaterializer()
implicit val kinesisAsync: AmazonKinesisAsync = AmazonKinesisAsyncClientBuilder.defaultClient()
val debug = Flow[PutRecordsRequestEntry].map { reqEntry =>
println(reqEntry)
reqEntry
}
val entry = new PutRecordsRequestEntry()
.withData(ByteBuffer.wrap("Hello World".getBytes))
.withPartitionKey(Random.nextInt.toString)
Source.tick(1.second, 1.second, entry).to(KinesisSink("myStreamName", KinesisFlowSettings.defaultInstance)).run()
// 2) Source.tick(1.second, 1.second,entry).via(debug).to(KinesisSink("myStreamName", inesisFlowSettings.defaultInstance)).run()
Run Code Online (Sandbox Code Playgroud)
Sink.foreach(println)而不是每隔1秒KinesisSink打印PutRecordsRequestEntry=> EXPECTEDKinesisSink,该条目仅生成一次.我究竟做错了什么 ?
我正在检查我的流,KinesisSource并且正在读取工作(使用另一个流测试)
此外,AWS Kinesis的监控仪表板不会显示任何PUT请求.
注1:我尝试启用alpakka的调试日志,但没有效果
<logger name="akka.stream.alpakka.kinesis" level="DEBUG"/>
Run Code Online (Sandbox Code Playgroud)
在我的logback.xml+根级别调试
问题是对流上的操作的访问被拒绝/许可。
我必须添加 akka actor 配置来进行日志记录
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
stdout-loglevel = "DEBUG"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
logger-startup-timeout = "30s"
}
Run Code Online (Sandbox Code Playgroud)
查看调试行,我实际上在每个阶段的调试和步骤中运行。
它需要IAM 角色中的权限“PutRecord s ”
| 归档时间: |
|
| 查看次数: |
392 次 |
| 最近记录: |