fra*_*cis 11 java scala amazon-web-services amazon-dynamodb amazon-kinesis
我正在尝试使用DynamoDB流和AWS提供的Java DynamoDB流Kinesis适配器捕获DynamoDB表更改.我正在使用Scala应用程序中的AWS Java SDK.
我首先遵循AWS指南并浏览AWS发布的代码示例.但是,我遇到了在我的环境中使用亚马逊自己发布的代码的问题.我的问题在于KinesisClientLibConfiguration对象.
在示例代码中,KinesisClientLibConfiguration配置了DynamoDB提供的流ARN.
new KinesisClientLibConfiguration("streams-adapter-demo",
streamArn,
streamsCredentials,
"streams-demo-worker")
Run Code Online (Sandbox Code Playgroud)
我在我的Scala应用程序中遵循类似的模式,首先从我的Dynamo表中找到当前的ARN:
lazy val streamArn = dynamoClient.describeTable(config.tableName)
.getTable.getLatestStreamArn
Run Code Online (Sandbox Code Playgroud)
然后KinesisClientLibConfiguration使用提供的ARN 创建:
lazy val kinesisConfig :KinesisClientLibConfiguration =
new KinesisClientLibConfiguration(
"testProcess",
streamArn,
defaultProviderChain,
"testWorker"
).withMaxRecords(1000)
.withRegionName("eu-west-1")
.withMetricsLevel(MetricsLevel.NONE)
.withIdleTimeBetweenReadsInMillis(500)
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
Run Code Online (Sandbox Code Playgroud)
我已经验证了提供的流ARN,所有内容都与我在AWS控制台中看到的相匹配.
在运行时,我最终得到一个异常,说明提供的ARN不是有效的流名称:
com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncTask call
SEVERE: Caught exception while sync'ing Kinesis shards and leases
com.amazonaws.services.kinesis.model.AmazonKinesisException: 1 validation
error detected: Value 'arn:aws:dynamodb:eu-west-1:STREAM ARN' at
'streamName' failed to satisfy constraint: Member must satisfy regular
expression pattern: [a-zA-Z0-9_.-]+ (Service: AmazonKinesis; Status Code:
400; Error Code: ValidationException; Request ID: )
Run Code Online (Sandbox Code Playgroud)
查看提供的文档KinesisClientLibConfiguration确实有意义,因为第二个参数被列为streamName而没有提及ARN.
我似乎无法找到KinesisClientLibConfiguration与ARN有关的任何内容.由于我正在使用DynamoDB流而不是Kinesis流,因此我也不确定如何查找我的流名称.
在这一点上,我不确定我在发布的AWS示例中遗漏了什么,似乎他们可能正在使用更旧版本的KCL.我正在使用版本1.7.0的amazon-kinesis-client.
这个问题实际上最终超出了我的范围KinesisClientLibConfiguration。
通过使用相同的配置并提供 DynamoDB 流适配器库中包含的流适配器以及 DynamoDB 和 CloudWatch 的客户端,我能够解决此问题。
我的工作解决方案现在看起来像这样。
定义 Kinesis 客户端配置。
//Kinesis config for DynamoDB streams
lazy val kinesisConfig :KinesisClientLibConfiguration =
new KinesisClientLibConfiguration(
getClass.getName, //DynamoDB shard lease table name
streamArn, //pulled from the dynamo table at runtime
dynamoCredentials, //DefaultAWSCredentialsProviderChain
KeywordTrackingActor.NAME //Lease owner name
).withMaxRecords(1000) //using AWS recommended value
.withIdleTimeBetweenReadsInMillis(500) //using AWS recommended value
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
Run Code Online (Sandbox Code Playgroud)
定义流适配器和 CloudWatch 客户端
val streamAdapterClient :AmazonDynamoDBStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(dynamoCredentials)
streamAdapterClient.setRegion(region)
val cloudWatchClient :AmazonCloudWatchClient = new AmazonCloudWatchClient(dynamoCredentials)
cloudWatchClient.setRegion(region)
Run Code Online (Sandbox Code Playgroud)
创建 a 的实例RecordProcessorFactory,由您定义一个实现提供的 KCLIRecordProcessorFactory和返回的的类IRecordProcessor。
val recordProcessorFactory :RecordProcessorFactory = new RecordProcessorFactory(context, keywordActor, config.keywordColumnName)
Run Code Online (Sandbox Code Playgroud)
而我所缺少的部分,所有这些都需要提供给你的工人。
val worker :Worker =
new Worker.Builder()
.recordProcessorFactory(recordProcessorFactory)
.config(kinesisConfig)
.kinesisClient(streamAdapterClient)
.dynamoDBClient(dynamoClient)
.cloudWatchClient(cloudWatchClient)
.build()
//this will start record processing
streamExecutorService.submit(worker)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1570 次 |
| 最近记录: |