他们似乎对我做同样的事情.任何人都可以向我解释这个区别吗?
我正在尝试借助unmarshall中提供的官方功能来解组 dynamodb 流记录@aws-sdk/util-dynamodb。
我正在打字稿中执行此操作,定义如下所示
\nunmarshall: (data: Record<string, AttributeValue>, options?: unmarshallOptions | undefined)
这里AttributValue是源自@aws-sdk/client-dynamodb.
dynamodb 记录的传入类型如下所示
\nexport interface DynamoDBRecord {\n awsRegion?: string | undefined;\n dynamodb?: StreamRecord | undefined;\n eventID?: string | undefined;\n eventName?: 'INSERT' | 'MODIFY' | 'REMOVE' | undefined;\n eventSource?: string | undefined;\n eventSourceARN?: string | undefined;\n eventVersion?: string | undefined;\n userIdentity?: any;\n}\nRun Code Online (Sandbox Code Playgroud)\nexport interface StreamRecord {\n ApproximateCreationDateTime?: number | undefined;\n Keys?: { [key: string]: AttributeValue …Run Code Online (Sandbox Code Playgroud) amazon-web-services amazon-dynamodb typescript aws-lambda amazon-dynamodb-streams
目标: 我们希望使用AWS Glue Data Catalog为驻留在S3存储桶中的JSON数据创建单个表,然后我们将通过Redshift Spectrum进行查询和解析.
背景: JSON数据来自DynamoDB Streams,并且是深层嵌套的.第一级JSON具有一组一致的元素:Keys,NewImage,OldImage,SequenceNumber,ApproximateCreationDateTime,SizeBytes和EventName.唯一的变化是有些记录没有NewImage,有些没有OldImage.但是,在第一级之下,架构变化很大.
理想情况下,我们希望使用Glue仅解析第一级JSON,并且基本上将较低级别视为大型STRING对象(我们将根据需要使用Redshift Spectrum对其进行解析).目前,我们将整个记录加载到Redshift中的单个VARCHAR列中,但记录接近Redshift中数据类型的最大大小(最大VARCHAR长度为65535).因此,我们希望在记录达到Redshift之前执行第一级解析.
到目前为止我们尝试/引用的内容:
问题: 我们如何使用Glue(或其他方法)来解析这些记录的第一级 - 同时忽略顶层元素下面的不同模式 - 以便我们可以从Spectrum访问它或加载它身体进入Redshift?
我是Glue的新手.我花了很多时间在Glue文档中并在论坛上查看(有些稀疏)信息.我可能会遗漏一些明显的东西 - 或者这可能是目前形式的胶水限制.欢迎任何建议.
谢谢!
amazon-redshift amazon-dynamodb-streams amazon-redshift-spectrum aws-glue
我正在为无服务器框架编写一个插件,它通过 ARN 引用 DynamoDB Stream。我可以使用手头的信息构建 DynamoDB 表 ARN,但我不知道时间戳部分,这是构建完整流 ARN 所必需的。我无权访问原始 DynamoDB Cloudformation 定义,当我需要引用 Stream ARN 时,这两件事可能会在完全不同的模板中发生。此时我所拥有的只是已创建的 DynamoDB 的 ARN。
有没有办法通过类似于 的变量来引用最新的流 arn:aws:dynamodb:${AWS::Region}::${AWS::AccountId}:table/eventbus-test/stream/${LATEST}?
或者我可以通过无服务器配置或 Cloudformation 模板以另一种方式构建它吗?
amazon-web-services aws-cloudformation amazon-dynamodb serverless-framework amazon-dynamodb-streams
以下是我正在使用的用例:我enable Streams在DynamoDB使用new and old Image.I创建了一个Kinesis Firehose delivery stream目的地为Redshift(Intermediate s3)时已经配置.
从Dynamodb我的小溪到达Firhose并从那里到下面给出的JSON(S3 Bucket -Gzip)的Bucket.我的问题是我cannot COPY this JSON to redshift.
我无法得到的东西:
JSON加载到S3如下所示:
{
"Keys": {
"vehicle_id": {
"S": "x011"
}
},
"NewImage": {
"heart_beat": {
"N": "0"
},
"cdc_id": {
"N": "456"
},
"latitude": {
"N": "1.30951"
},
"not_deployed_counter": {
"N": "1"
},
"reg_ind": {
"N": "0"
},
"operator": {
"S": …Run Code Online (Sandbox Code Playgroud) amazon-s3 amazon-dynamodb amazon-redshift amazon-dynamodb-streams amazon-kinesis-firehose
我有一个启用了流媒体的Dynamodb表.此外,我为此表创建了一个触发AWS Lambda函数的触发器.在这个lambda函数中,我正在尝试从Dynamodb流中读取新图像(修改后的Dynamodb项)并尝试从中获取纯json字符串.我的问题是如何获得通过流发送的DynamoDB项的纯json字符串?我正在使用下面给出的代码片段来获取新的Image,但我不知道如何从中获取json字符串.感谢您的帮助.
public class LambdaFunctionHandler implements RequestHandler<DynamodbEvent, Object> {
@Override
public Object handleRequest(DynamodbEvent input, Context context) {
context.getLogger().log("Input: " + input);
for (DynamodbStreamRecord record : input.getRecords()){
context.getLogger().log(record.getEventID());
context.getLogger().log(record.getEventName());
context.getLogger().log(record.getDynamodb().toString());
Map<String,AttributeValue> currentRecord = record.getDynamodb().getNewImage();
//how to get the pure json string of the new image
//..............................................
}
return "Successfully processed " + input.getRecords().size() + " records.";
}
Run Code Online (Sandbox Code Playgroud)
}
我的目标是确保DynamoDB流发布的记录以"正确"的顺序处理.我的表包含客户的事件.哈希键是事件ID,范围键是时间戳."正确"订单意味着按顺序处理同一客户ID的事件.可以并行处理不同的客户ID.
我正在通过Lambda函数使用流.每个碎片自动生成消费者.因此,如果运行时决定对流进行分片,则消耗并行发生(如果我做对了)并且我冒着在CustomerCreated之前处理CustomerAddressChanged事件的风险(例如).
该文档暗示,有没有办法能够影响分片.但他们并没有这么明确地说.有没有办法,例如,通过使用客户ID和时间戳组合范围键?
amazon-web-services amazon-dynamodb aws-lambda amazon-dynamodb-streams
我有一个已在DynamoDB中创建了近20亿行的表.
由于查询要求,我必须在其中创建全局二级索引(GSI).GSI创建过程在36小时前开始,但仍未完成.门户网站显示项目数量约为1亿.还有很长的路要走.
问题:
谢谢.
我对 DynamoDB 表的特定(数字)字段/列(例如“计数”)的值变化感兴趣。
我知道我可以编写一个 Lambda,一旦触发,就会将新图像与旧图像进行比较,然后决定是否实际执行某项操作。但是,由于我的表收集了某种状态更新,并且其中只有少数对我的触发器实际上很重要,因此在这种设置中,大多数情况下都会毫无意义地调用 Lambda(而且,我会因以下原因而被收取费用)调用了 Lambda)。
所以我想过滤 DynamoDB Stream 事件,根据docs,这应该是可能的(即使没有给出类似情况的示例)。
我的问题是我找不到如何OldImage['count'] != NewImage['count']在过滤器模式中编写等效的内容。
下面是我的 SAM 模板的这一部分应如何显示的示例。
CountChangeDetector:
Type: 'AWS::Serverless::Function'
Properties:
CodeUri: src/
Handler: trigger.lambda_handler
Runtime: python3.8
FunctionName: "CountChangeDetector"
AutoPublishAlias: live
Events:
Stream:
Type: DynamoDB
Properties:
Stream: !GetAtt MyTable.StreamArn
BatchSize: 10
StartingPosition: TRIM_HORIZON
FilterCriteria:
Filters:
- Pattern: '{"OldImage": {"count": [ { "anything-but": [ {"NewImage": {"count"}} ] } ]}}'
Run Code Online (Sandbox Code Playgroud)
当然,我定义模式的方式不正确,并且出现错误。
那么,该模式该如何定义呢?
amazon-web-services amazon-dynamodb aws-lambda amazon-dynamodb-streams