如何在Spark流媒体应用程序中处理DynamoDB Stream

Rap*_*ère 5 amazon-dynamodb apache-spark amazon-kinesis

我想从Spark Streaming应用程序使用DynamoDB流。

Spark流使用KCL读取Kinesis。有一个使KCL能够从DynamoDB流读取的库:dynamodb-streams-kinesis-adapter

但是有可能将此lib插入spark吗?有人这样做吗?

我正在使用Spark 2.1.0。

我的备份计划是让另一个应用程序从DynamoDB流读取到Kinesis流中。

谢谢

rav*_*nde 1

执行此操作的方法是实现 KinesisInputDStream 以使用官方指南提供的工作程序,dynamodb-streams-kinesis-adapter 建议如下所示:

final Worker worker = StreamsWorkerFactory .createDynamoDbStreamsWorker( recordProcessorFactory, workerConfig, adapterClient, amazonDynamoDB, amazonCloudWatchClient);

从Spark的角度来看,它是在KinesisInputDStream.scala中的kinesis-asl模块下实现的

我已经在 Spark 2.4.0 上尝试过这个。这是我的仓库。它需要很少的改进,但可以完成工作

https://github.com/ravi72munde/spark-dynamo-stream-asl

修改 KinesisInputDStream 后,我们可以使用它,如下所示。 val stream = KinesisInputDStream.builder .streamingContext(ssc) .streamName("sample-tablename-2") .regionName("us-east-1") .initialPosition(new Latest()) .checkpointAppName("sample-app") .checkpointInterval(Milliseconds(100)) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .build()