使用boto3完全扫描dynamoDb

CJ_*_*paz 48 python amazon-web-services amazon-dynamodb boto3

我的桌子大约是220mb,里面有25万条记录.我正在尝试将所有这些数据都放到python中.我意识到这需要一个分块的批处理过程并循环,但我不知道如何设置批次从前一个停止的地方开始.

有没有办法过滤我的扫描?从我读到的,加载后发生过滤,加载停止在1mb,所以我实际上无法扫描新对象.

任何援助将不胜感激.

import boto3
dynamodb = boto3.resource('dynamodb',
    aws_session_token = aws_session_token,
    aws_access_key_id = aws_access_key_id,
    aws_secret_access_key = aws_secret_access_key,
    region_name = region
    )

table = dynamodb.Table('widgetsTableName')

data = table.scan()
Run Code Online (Sandbox Code Playgroud)

小智 53

我认为有关表扫描的Amazon DynamoDB文档可以回答您的问题.

简而言之,您需要LastEvaluatedKey在响应中进行检查.以下是使用您的代码的示例:

import boto3
dynamodb = boto3.resource('dynamodb',
                          aws_session_token=aws_session_token,
                          aws_access_key_id=aws_access_key_id,
                          aws_secret_access_key=aws_secret_access_key,
                          region_name=region
)

table = dynamodb.Table('widgetsTableName')

response = table.scan()
data = response['Items']

while 'LastEvaluatedKey' in response:
    response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
    data.extend(response['Items'])
Run Code Online (Sandbox Code Playgroud)

  • 虽然这可能有用,但请注意[boto3文档](http://boto3.readthedocs.io/en/latest/reference/services/dynamodb.html#DynamoDB.Client.query)声明_如果LastEvaluatedKey为空,则"最后一页"的结果已被处理,没有更多的数据可供检索.所以我正在使用的测试是`while response.get('LastEvaluatedKey')`而不是'while`FastEvaluatedKey'作为响应`,只是因为"是空的"并不一定意味着"不存在",这在任何一种情况下都适用. (18认同)
  • 为什么不使用:`while response.get('LastEvaluatedKey', False)`? (3认同)
  • @John_J 你可以使用 `while True:` 然后使用 `if not response.get('LastEvaluatedKey'): break` 或类似的东西。您还可以将处理放入一个函数中,调用它,然后使用上面的“while response.get(...):”再次调用它来处理后续页面。你基本上只需要模拟 `do...while`,它在 Python 中并不明确存在。 (2认同)

Jor*_*ips 25

boto3提供了处理所有分页细节的分页器.是扫描分页器的doc页面.基本上,你会像这样使用它:

import boto3

client = boto3.client('dynamodb')
paginator = client.get_paginator('scan')

for page in paginator.paginate():
    # do something
Run Code Online (Sandbox Code Playgroud)

  • 请注意,`page ['Items']`中的项可能不是您所期望的:由于此paginator非常通用,因此每个DynamoDB项的内容都是格式类型的字典:value,例如` {'myAttribute':{'M':{}},'yourAttribute':{'N':u'132457'}}`用于具有空地图和数字类型的行(作为需要的字符串返回要被强制转换;我建议使用`decimal.Decimal`,因为它已经接受了一个字符串并将处理非整数数字.其他类型,例如字符串,映射和布尔值,由boto转换为其Python类型. (9认同)
  • 如果不是因为@kungphu提出的问题,那么分页者将很棒。我看不到将其用于做一件有用的事情,而是通过使用不相关的元数据污染响应数据来否定它的用途 (5认同)

Ric*_*ard 19

DynamoDB 将该scan方法限制为每次扫描 1mb 的数据。

文档: https : //boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.scan

下面是一个示例循环,它使用LastEvaluatedKey以下命令从 DynamoDB 表中获取所有数据:

import boto3
client = boto3.client('dynamodb')

def dump_table(table_name):
    results = []
    last_evaluated_key = None
    while True:
        if last_evaluated_key:
            response = client.scan(
                TableName=table_name,
                ExclusiveStartKey=last_evaluated_key
            )
        else: 
            response = client.scan(TableName=table_name)
        last_evaluated_key = response.get('LastEvaluatedKey')
        
        results.extend(response['Items'])
        
        if not last_evaluated_key:
            break
    return results

# Usage
data = dump_table('your-table-name')

# do something with data

Run Code Online (Sandbox Code Playgroud)


Abe*_*ker 14

关于Jordon Phillips的回答,这就是你如何通过FilterExpression分页传递:

import boto3

client = boto3.client('dynamodb')
paginator = client.get_paginator('scan')
operation_parameters = {
  'TableName': 'foo',
  'FilterExpression': 'bar > :x AND bar < :y',
  'ExpressionAttributeValues': {
    ':x': {'S': '2017-01-31T01:35'},
    ':y': {'S': '2017-01-31T02:08'},
  }
}

page_iterator = paginator.paginate(**operation_parameters)
for page in page_iterator:
    # do something
Run Code Online (Sandbox Code Playgroud)


小智 8

删除dynamodb格式类型的代码,如@kungphu所述.

import boto3

from boto3.dynamodb.types import TypeDeserializer
from boto3.dynamodb.transform import TransformationInjector

client = boto3.client('dynamodb')
paginator = client.get_paginator('query')
service_model = client._service_model.operation_model('Query')
trans = TransformationInjector(deserializer = TypeDeserializer())
for page in paginator.paginate():
    trans.inject_attribute_value_output(page, service_model)
Run Code Online (Sandbox Code Playgroud)

  • 太棒了!否定我之前关于分页器缺乏用处的评论。谢谢!为什么这不是默认行为? (2认同)

CJ_*_*paz 5

结果证明 Boto3 捕获了“LastEvaluatedKey”作为返回响应的一部分。这可以用作扫描的起点:

data= table.scan(
   ExclusiveStartKey=data['LastEvaluatedKey']
)
Run Code Online (Sandbox Code Playgroud)

我计划围绕这个建立一个循环,直到返回的数据只是 ExclusiveStartKey


Yit*_*ikC 5

上面建议的两种方法都存在问题:要么编写冗长且重复的代码来在循环中显式处理分页,要么使用具有低级会话的 Boto 分页器,并放弃高级 Boto 对象的优点。

使用 Python 函数代码提供高级抽象的解决方案允许使用更高级别的 Boto 方法,同时隐藏 AWS 分页的复杂性:

import itertools
import typing

def iterate_result_pages(function_returning_response: typing.Callable, *args, **kwargs) -> typing.Generator:
    """A wrapper for functions using AWS paging, that returns a generator which yields a sequence of items for
    every response

    Args:
        function_returning_response: A function (or callable), that returns an AWS response with 'Items' and optionally 'LastEvaluatedKey'
        This could be a bound method of an object.

    Returns:
        A generator which yields the 'Items' field of the result for every response
    """
    response = function_returning_response(*args, **kwargs)
    yield response["Items"]
    while "LastEvaluatedKey" in response:
        kwargs["ExclusiveStartKey"] = response["LastEvaluatedKey"]
        response = function_returning_response(*args, **kwargs)
        yield response["Items"]

    return

def iterate_paged_results(function_returning_response: typing.Callable, *args, **kwargs) -> typing.Iterator:
    """A wrapper for functions using AWS paging, that returns an iterator of all the items in the responses.
    Items are yielded to the caller as soon as they are received.

    Args:
        function_returning_response: A function (or callable), that returns an AWS response with 'Items' and optionally 'LastEvaluatedKey'
        This could be a bound method of an object.

    Returns:
        An iterator which yields one response item at a time
    """
    return itertools.chain.from_iterable(iterate_result_pages(function_returning_response, *args, **kwargs))

# Example, assuming 'table' is a Boto DynamoDB table object:
all_items = list(iterate_paged_results(ProjectionExpression = 'my_field'))
Run Code Online (Sandbox Code Playgroud)


Pie*_*e D 5

如果您登陆此处寻找带有某些过滤表达式的分页扫描:

def scan(table, **kwargs):
    response = table.scan(**kwargs)
    yield from response['Items']
    while response.get('LastEvaluatedKey'):
        response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'], **kwargs)
        yield from response['Items']
Run Code Online (Sandbox Code Playgroud)

用法示例:

table = boto3.Session(...).resource('dynamodb').Table('widgetsTableName')

items = list(scan(table, FilterExpression=Attr('name').contains('foo')))
Run Code Online (Sandbox Code Playgroud)