使用AWS Java DynamoDB流处理DynamoDB流Kinesis适配器

fra*_*cis 11 java scala amazon-web-services amazon-dynamodb amazon-kinesis

我正在尝试使用DynamoDB流和AWS提供的Java DynamoDB流Kinesis适配器捕获DynamoDB表更改.我正在使用Scala应用程序中的AWS Java SDK.

我首先遵循AWS指南并浏览AWS发布的代码示例.但是,我遇到了在我的环境中使用亚马逊自己发布的代码的问题.我的问题在于KinesisClientLibConfiguration对象.

在示例代码中,KinesisClientLibConfiguration配置了DynamoDB提供的流ARN.

new KinesisClientLibConfiguration("streams-adapter-demo",
    streamArn, 
    streamsCredentials, 
    "streams-demo-worker")
Run Code Online (Sandbox Code Playgroud)

我在我的Scala应用程序中遵循类似的模式,首先从我的Dynamo表中找到当前的ARN:

lazy val streamArn = dynamoClient.describeTable(config.tableName)
.getTable.getLatestStreamArn
Run Code Online (Sandbox Code Playgroud)

然后KinesisClientLibConfiguration使用提供的ARN 创建:

lazy val kinesisConfig :KinesisClientLibConfiguration =
new KinesisClientLibConfiguration(
  "testProcess",
  streamArn,
  defaultProviderChain,
  "testWorker"
).withMaxRecords(1000)
   .withRegionName("eu-west-1")
   .withMetricsLevel(MetricsLevel.NONE)
  .withIdleTimeBetweenReadsInMillis(500)
  .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
Run Code Online (Sandbox Code Playgroud)

我已经验证了提供的流ARN,所有内容都与我在AWS控制台中看到的相匹配.

在运行时,我最终得到一个异常,说明提供的ARN不是有效的流名称:

com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncTask call
SEVERE: Caught exception while sync'ing Kinesis shards and leases
com.amazonaws.services.kinesis.model.AmazonKinesisException: 1 validation     
error detected: Value 'arn:aws:dynamodb:eu-west-1:STREAM ARN' at 
'streamName'    failed to satisfy constraint: Member must satisfy regular 
expression pattern: [a-zA-Z0-9_.-]+ (Service: AmazonKinesis; Status Code: 
400; Error Code: ValidationException; Request ID: )
Run Code Online (Sandbox Code Playgroud)

查看提供的文档KinesisClientLibConfiguration确实有意义,因为第二个参数被列为streamName而没有提及ARN.

我似乎无法找到KinesisClientLibConfiguration与ARN有关的任何内容.由于我正在使用DynamoDB流而不是Kinesis流,因此我也不确定如何查找我的流名称.

在这一点上,我不确定我在发布的AWS示例中遗漏了什么,似乎他们可能正在使用更旧版本的KCL.我正在使用版本1.7.0的amazon-kinesis-client.

fra*_*cis 4

这个问题实际上最终超出了我的范围KinesisClientLibConfiguration

通过使用相同的配置并提供 DynamoDB 流适配器库中包含的流适配器以及 DynamoDB 和 CloudWatch 的客户端,我能够解决此问题。

我的工作解决方案现在看起来像这样。

定义 Kinesis 客户端配置。

//Kinesis config for DynamoDB streams
lazy val kinesisConfig :KinesisClientLibConfiguration =
    new KinesisClientLibConfiguration(
        getClass.getName, //DynamoDB shard lease table name
        streamArn, //pulled from the dynamo table at runtime
        dynamoCredentials, //DefaultAWSCredentialsProviderChain 
        KeywordTrackingActor.NAME //Lease owner name
    ).withMaxRecords(1000) //using AWS recommended value
     .withIdleTimeBetweenReadsInMillis(500) //using AWS recommended value
    .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
Run Code Online (Sandbox Code Playgroud)

定义流适配器和 CloudWatch 客户端

val streamAdapterClient :AmazonDynamoDBStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(dynamoCredentials)
streamAdapterClient.setRegion(region)

val cloudWatchClient :AmazonCloudWatchClient = new AmazonCloudWatchClient(dynamoCredentials)
cloudWatchClient.setRegion(region)
Run Code Online (Sandbox Code Playgroud)

创建 a 的实例RecordProcessorFactory,由您定义一个实现提供的 KCLIRecordProcessorFactory和返回的的类IRecordProcessor

val recordProcessorFactory :RecordProcessorFactory = new RecordProcessorFactory(context, keywordActor, config.keywordColumnName)
Run Code Online (Sandbox Code Playgroud)

而我所缺少的部分,所有这些都需要提供给你的工人。

val worker :Worker =
  new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(kinesisConfig)
    .kinesisClient(streamAdapterClient)
    .dynamoDBClient(dynamoClient)
    .cloudWatchClient(cloudWatchClient)
    .build()

//this will start record processing
streamExecutorService.submit(worker)
Run Code Online (Sandbox Code Playgroud)