标签: amazon-dynamodb-streams

Kinesis Stream和DynamoDB流之间的区别

他们似乎对我做同样的事情.任何人都可以向我解释这个区别吗?

amazon-kinesis amazon-dynamodb-streams

16
推荐指数
2
解决办法
8671
查看次数

Dynamodb在python中流

我想从python中的dynamodb流中读取数据,以及到目前为止我找到的替代方案

  1. 使用dynamodb码流低级别的库函数(如描述这里):但是,这种解决方案似乎几乎不可能在生产环境中维护,具有保持碎片等的状态的应用程序

  2. 使用专为读取Kinesis流而设计的KCL库:库的python版本似乎无法从dynamodb流中读取.

在python中成功处理dynamodb流的选项有哪些?(链接到可能的示例将是非常有用的)

PS:我考虑过使用lambda函数来处理dynamodb,但是对于这个任务,我想在应用程序中读取流,因为它必须与其他组件进行交互,这些组件无法通过lamda函数完成.

boto3 amazon-dynamodb-streams

15
推荐指数
1
解决办法
1742
查看次数

Dynamodb Streams 中 AttributeValue 的不兼容类型

我正在尝试借助unmarshall中提供的官方功能来解组 dynamodb 流记录@aws-sdk/util-dynamodb

\n

我正在打字稿中执行此操作,定义如下所示

\n

unmarshall: (data: Record<string, AttributeValue>, options?: unmarshallOptions | undefined)

\n

这里AttributValue是源自@aws-sdk/client-dynamodb.

\n

dynamodb 记录的传入类型如下所示

\n
export interface DynamoDBRecord {\n    awsRegion?: string | undefined;\n    dynamodb?: StreamRecord | undefined;\n    eventID?: string | undefined;\n    eventName?: 'INSERT' | 'MODIFY' | 'REMOVE' | undefined;\n    eventSource?: string | undefined;\n    eventSourceARN?: string | undefined;\n    eventVersion?: string | undefined;\n    userIdentity?: any;\n}\n
Run Code Online (Sandbox Code Playgroud)\n
export interface StreamRecord {\n    ApproximateCreationDateTime?: number | undefined;\n    Keys?: { [key: string]: AttributeValue …
Run Code Online (Sandbox Code Playgroud)

amazon-web-services amazon-dynamodb typescript aws-lambda amazon-dynamodb-streams

15
推荐指数
1
解决办法
2156
查看次数

AWS Glue:如何使用不同的模式处理嵌套的JSON

目标: 我们希望使用AWS Glue Data Catalog为驻留在S3存储桶中的JSON数据创建单个表,然后我们将通过Redshift Spectrum进行查询和解析.

背景: JSON数据来自DynamoDB Streams,并且是深层嵌套的.第一级JSON具有一组一致的元素:Keys,NewImage,OldImage,SequenceNumber,ApproximateCreationDateTime,SizeBytes和EventName.唯一的变化是有些记录没有NewImage,有些没有OldImage.但是,在第一级之下,架构变化很大.

理想情况下,我们希望使用Glue仅解析第一级JSON,并且基本上将较低级别视为大型STRING对象(我们将根据需要使用Redshift Spectrum对其进行解析).目前,我们将整个记录加载到Redshift中的单个VARCHAR列中,但记录接近Redshift中数据类型的最大大小(最大VARCHAR长度为65535).因此,我们希望在记录达到Redshift之前执行第一级解析.

到目前为止我们尝试/引用的内容:

  • 将AWS Glue Crawler指向S3存储桶会导致数百个表具有一致的顶级模式(上面列出的属性),但STRUCT元素中更深层次的模式会有所不同.我们还没有找到一种方法来创建一个Glue ETL Job,它可以从所有这些表中读取并将其加载到一个表中.
  • 手动创建表并不富有成效.我们尝试将每列设置为STRING数据类型,但是作业没有成功加载数据(可能因为这会涉及从STRUCT到STRING的一些转换).将列设置为STRUCT时,它需要一个已定义的模式 - 但这正是从一个记录到另一个记录的不同,因此我们无法提供适用于所有相关记录的通用STRUCT模式.
  • AWS Glue Relationalize转换很有趣,但不是我们在这种情况下要寻找的(因为我们希望保留一些JSON完整,而不是完全展平它).Redshift Spectrum支持几周前的标量JSON数据,但这不适用于我们正在处理的嵌套JSON.这些似乎都没有帮助处理由Glue Crawler创建的数百个表.

问题: 我们如何使用Glue(或其他方法)来解析这些记录的第一级 - 同时忽略顶层元素下面的不同模式 - 以便我们可以从Spectrum访问它或加载它身体进入Redshift?

我是Glue的新手.我花了很多时间在Glue文档中并在论坛上查看(有些稀疏)信息.我可能会遗漏一些明显的东西 - 或者这可能是目前形式的胶水限制.欢迎任何建议.

谢谢!

amazon-redshift amazon-dynamodb-streams amazon-redshift-spectrum aws-glue

11
推荐指数
1
解决办法
7707
查看次数

如何在 Cloudformation 模板中引用 DynamoDB 表的最新流

我正在为无服务器框架编写一个插件,它通过 ARN 引用 DynamoDB Stream。我可以使用手头的信息构建 DynamoDB 表 ARN,但我不知道时间戳部分,这是构建完整流 ARN 所必需的。我无权访问原始 DynamoDB Cloudformation 定义,当我需要引用 Stream ARN 时,这两件事可能会在完全不同的模板中发生。此时我所拥有的只是已创建的 DynamoDB 的 ARN。

有没有办法通过类似于 的变量来引用最新的流 arn:aws:dynamodb:${AWS::Region}::${AWS::AccountId}:table/eventbus-test/stream/${LATEST}

或者我可以通过无服务器配置或 Cloudformation 模板以另一种方式构建它吗?

amazon-web-services aws-cloudformation amazon-dynamodb serverless-framework amazon-dynamodb-streams

11
推荐指数
1
解决办法
5186
查看次数

无法将json - Dynamo db Streams复制到redshift

以下是我正在使用的用例:我enable StreamsDynamoDB使用new and old Image.I创建了一个Kinesis Firehose delivery stream目的地为Redshift(Intermediate s3)时已经配置.

从Dynamodb我的小溪到达Firhose并从那里到下面给出的JSON(S3 Bucket -Gzip)的Bucket.我的问题是我cannot COPY this JSON to redshift.

我无法得到的东西:

    1. 不确定Redshift中的Create table语句应该是什么
    1. 什么应该是Kinesis firhose中的COPY语法.
    1. 我应该如何在这里使用JsonPaths.Kinesis Data消防站将json归还给我的s3水桶.
    1. 如何提及COPY Command中的Maniphest

JSON加载到S3如下所示:

{
    "Keys": {
        "vehicle_id": {
            "S": "x011"
        }
    },
    "NewImage": {
        "heart_beat": {
            "N": "0"
        },
        "cdc_id": {
            "N": "456"
        },
        "latitude": {
            "N": "1.30951"
        },
        "not_deployed_counter": {
            "N": "1"
        },
        "reg_ind": {
            "N": "0"
        },
        "operator": {
            "S": …
Run Code Online (Sandbox Code Playgroud)

amazon-s3 amazon-dynamodb amazon-redshift amazon-dynamodb-streams amazon-kinesis-firehose

11
推荐指数
1
解决办法
181
查看次数

如何从DynamoDB流新的图像中获取纯Json字符串?

我有一个启用了流媒体的Dynamodb表.此外,我为此表创建了一个触发AWS Lambda函数的触发器.在这个lambda函数中,我正在尝试从Dynamodb流中读取新图像(修改后的Dynamodb项)并尝试从中获取纯json字符串.我的问题是如何获得通过流发送的DynamoDB项的纯json字符串?我正在使用下面给出的代码片段来获取新的Image,但我不知道如何从中获取json字符串.感谢您的帮助.

public class LambdaFunctionHandler implements RequestHandler<DynamodbEvent, Object> {

@Override
public Object handleRequest(DynamodbEvent input, Context context) {
    context.getLogger().log("Input: " + input);

    for (DynamodbStreamRecord record : input.getRecords()){

        context.getLogger().log(record.getEventID());
        context.getLogger().log(record.getEventName());
        context.getLogger().log(record.getDynamodb().toString());
        Map<String,AttributeValue> currentRecord = record.getDynamodb().getNewImage();

        //how to get the pure json string of the new image
        //..............................................
     }
     return "Successfully processed " + input.getRecords().size() + " records.";
}
Run Code Online (Sandbox Code Playgroud)

}

java json aws-sdk amazon-dynamodb-streams

10
推荐指数
3
解决办法
6687
查看次数

DynamoDB流如何将记录分发到分片?

我的目标是确保DynamoDB流发布的记录以"正确"的顺序处理.我的表包含客户的事件.哈希键是事件ID,范围键是时间戳."正确"订单意味着按顺序处理同一客户ID的事件.可以并行处理不同的客户ID.

我正在通过Lambda函数使用流.每个碎片自动生成消费者.因此,如果运行时决定对流进行分片,则消耗并行发生(如果我做对了)并且我冒着在CustomerCreated之前处理CustomerAddressChanged事件的风险(例如).

文档暗示,有没有办法能够影响分片.但他们并没有这么明确地说.有没有办法,例如,通过使用客户ID和时间戳组合范围键?

amazon-web-services amazon-dynamodb aws-lambda amazon-dynamodb-streams

10
推荐指数
2
解决办法
6085
查看次数

创建GSI需要很长时间

我有一个已在DynamoDB中创建了近20亿行的表.

由于查询要求,我必须在其中创建全局二级索引(GSI).GSI创建过程在36小时前开始,但仍未完成.门户网站显示项目数量约为1亿.还有很长的路要走.

问题:

  1. 为什么在足够的WCU和RCU被分配(事实上为30k)时需要这么长的时间.
  2. 我使用的GSI分区密钥的值是重复的,这可能是GSI创建花费更多时间的原因(理想情况是我们选择一个不重复跨越多个分区的项目的分区键).
  3. 有没有办法在流程启动时中止GSI的创建?它不允许通过AWS控制台.

谢谢.

amazon-dynamodb amazon-dynamodb-streams

10
推荐指数
1
解决办法
3881
查看次数

过滤特定字段更改的 DynamoDB Streams 事件

我对 DynamoDB 表的特定(数字)字段/列(例如“计数”)的值变化感兴趣。

我知道我可以编写一个 Lambda,一旦触发,就会将新图像与旧图像进行比较,然后决定是否实际执行某项操作。但是,由于我的表收集了某种状态更新,并且其中只有少数对我的触发器实际上很重要,因此在这种设置中,大多数情况下都会毫无意义地调用 Lambda(而且,我会因以下原因而被收取费用)调用了 Lambda)。

所以我想过滤 DynamoDB Stream 事件,根据docs,这应该是可能的(即使没有给出类似情况的示例)。

我的问题是我找不到如何OldImage['count'] != NewImage['count']在过滤器模式中编写等效的内容。

下面是我的 SAM 模板的这一部分应如何显示的示例。

CountChangeDetector:
    Type: 'AWS::Serverless::Function'
    Properties:
      CodeUri: src/
      Handler: trigger.lambda_handler
      Runtime: python3.8
      FunctionName: "CountChangeDetector"
      AutoPublishAlias: live      
      Events:
        Stream:
          Type: DynamoDB
          Properties:
            Stream: !GetAtt MyTable.StreamArn
            BatchSize: 10
            StartingPosition: TRIM_HORIZON
            FilterCriteria:
              Filters:
                - Pattern: '{"OldImage": {"count": [ { "anything-but": [ {"NewImage": {"count"}} ] } ]}}'
Run Code Online (Sandbox Code Playgroud)

当然,我定义模式的方式不正确,并且出现错误。

那么,该模式该如何定义呢?

amazon-web-services amazon-dynamodb aws-lambda amazon-dynamodb-streams

9
推荐指数
1
解决办法
2011
查看次数