Rap*_*ère 5 amazon-dynamodb apache-spark amazon-kinesis
我想从Spark Streaming应用程序使用DynamoDB流。
Spark流使用KCL读取Kinesis。有一个使KCL能够从DynamoDB流读取的库:dynamodb-streams-kinesis-adapter。
但是有可能将此lib插入spark吗?有人这样做吗?
我正在使用Spark 2.1.0。
我的备份计划是让另一个应用程序从DynamoDB流读取到Kinesis流中。
谢谢
执行此操作的方法是实现 KinesisInputDStream 以使用官方指南提供的工作程序,dynamodb-streams-kinesis-adapter
建议如下所示:
final Worker worker = StreamsWorkerFactory
.createDynamoDbStreamsWorker(
recordProcessorFactory,
workerConfig,
adapterClient,
amazonDynamoDB,
amazonCloudWatchClient);
从Spark的角度来看,它是在KinesisInputDStream.scala中的kinesis-asl模块下实现的
我已经在 Spark 2.4.0 上尝试过这个。这是我的仓库。它需要很少的改进,但可以完成工作
https://github.com/ravi72munde/spark-dynamo-stream-asl
修改 KinesisInputDStream 后,我们可以使用它,如下所示。
val stream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("sample-tablename-2")
.regionName("us-east-1")
.initialPosition(new Latest())
.checkpointAppName("sample-app")
.checkpointInterval(Milliseconds(100))
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.build()
| 归档时间: |
|
| 查看次数: |
757 次 |
| 最近记录: |