CJ_*_*paz 48 python amazon-web-services amazon-dynamodb boto3
我的桌子大约是220mb,里面有25万条记录.我正在尝试将所有这些数据都放到python中.我意识到这需要一个分块的批处理过程并循环,但我不知道如何设置批次从前一个停止的地方开始.
有没有办法过滤我的扫描?从我读到的,加载后发生过滤,加载停止在1mb,所以我实际上无法扫描新对象.
任何援助将不胜感激.
import boto3
dynamodb = boto3.resource('dynamodb',
aws_session_token = aws_session_token,
aws_access_key_id = aws_access_key_id,
aws_secret_access_key = aws_secret_access_key,
region_name = region
)
table = dynamodb.Table('widgetsTableName')
data = table.scan()
Run Code Online (Sandbox Code Playgroud)
小智 53
我认为有关表扫描的Amazon DynamoDB文档可以回答您的问题.
简而言之,您需要LastEvaluatedKey
在响应中进行检查.以下是使用您的代码的示例:
import boto3
dynamodb = boto3.resource('dynamodb',
aws_session_token=aws_session_token,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region
)
table = dynamodb.Table('widgetsTableName')
response = table.scan()
data = response['Items']
while 'LastEvaluatedKey' in response:
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
data.extend(response['Items'])
Run Code Online (Sandbox Code Playgroud)
Jor*_*ips 25
boto3提供了处理所有分页细节的分页器.这是扫描分页器的doc页面.基本上,你会像这样使用它:
import boto3
client = boto3.client('dynamodb')
paginator = client.get_paginator('scan')
for page in paginator.paginate():
# do something
Run Code Online (Sandbox Code Playgroud)
Ric*_*ard 19
DynamoDB 将该scan
方法限制为每次扫描 1mb 的数据。
文档: https : //boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.scan
下面是一个示例循环,它使用LastEvaluatedKey
以下命令从 DynamoDB 表中获取所有数据:
import boto3
client = boto3.client('dynamodb')
def dump_table(table_name):
results = []
last_evaluated_key = None
while True:
if last_evaluated_key:
response = client.scan(
TableName=table_name,
ExclusiveStartKey=last_evaluated_key
)
else:
response = client.scan(TableName=table_name)
last_evaluated_key = response.get('LastEvaluatedKey')
results.extend(response['Items'])
if not last_evaluated_key:
break
return results
# Usage
data = dump_table('your-table-name')
# do something with data
Run Code Online (Sandbox Code Playgroud)
Abe*_*ker 14
关于Jordon Phillips的回答,这就是你如何通过FilterExpression
分页传递:
import boto3
client = boto3.client('dynamodb')
paginator = client.get_paginator('scan')
operation_parameters = {
'TableName': 'foo',
'FilterExpression': 'bar > :x AND bar < :y',
'ExpressionAttributeValues': {
':x': {'S': '2017-01-31T01:35'},
':y': {'S': '2017-01-31T02:08'},
}
}
page_iterator = paginator.paginate(**operation_parameters)
for page in page_iterator:
# do something
Run Code Online (Sandbox Code Playgroud)
小智 8
删除dynamodb格式类型的代码,如@kungphu所述.
import boto3
from boto3.dynamodb.types import TypeDeserializer
from boto3.dynamodb.transform import TransformationInjector
client = boto3.client('dynamodb')
paginator = client.get_paginator('query')
service_model = client._service_model.operation_model('Query')
trans = TransformationInjector(deserializer = TypeDeserializer())
for page in paginator.paginate():
trans.inject_attribute_value_output(page, service_model)
Run Code Online (Sandbox Code Playgroud)
结果证明 Boto3 捕获了“LastEvaluatedKey”作为返回响应的一部分。这可以用作扫描的起点:
data= table.scan(
ExclusiveStartKey=data['LastEvaluatedKey']
)
Run Code Online (Sandbox Code Playgroud)
我计划围绕这个建立一个循环,直到返回的数据只是 ExclusiveStartKey
上面建议的两种方法都存在问题:要么编写冗长且重复的代码来在循环中显式处理分页,要么使用具有低级会话的 Boto 分页器,并放弃高级 Boto 对象的优点。
使用 Python 函数代码提供高级抽象的解决方案允许使用更高级别的 Boto 方法,同时隐藏 AWS 分页的复杂性:
import itertools
import typing
def iterate_result_pages(function_returning_response: typing.Callable, *args, **kwargs) -> typing.Generator:
"""A wrapper for functions using AWS paging, that returns a generator which yields a sequence of items for
every response
Args:
function_returning_response: A function (or callable), that returns an AWS response with 'Items' and optionally 'LastEvaluatedKey'
This could be a bound method of an object.
Returns:
A generator which yields the 'Items' field of the result for every response
"""
response = function_returning_response(*args, **kwargs)
yield response["Items"]
while "LastEvaluatedKey" in response:
kwargs["ExclusiveStartKey"] = response["LastEvaluatedKey"]
response = function_returning_response(*args, **kwargs)
yield response["Items"]
return
def iterate_paged_results(function_returning_response: typing.Callable, *args, **kwargs) -> typing.Iterator:
"""A wrapper for functions using AWS paging, that returns an iterator of all the items in the responses.
Items are yielded to the caller as soon as they are received.
Args:
function_returning_response: A function (or callable), that returns an AWS response with 'Items' and optionally 'LastEvaluatedKey'
This could be a bound method of an object.
Returns:
An iterator which yields one response item at a time
"""
return itertools.chain.from_iterable(iterate_result_pages(function_returning_response, *args, **kwargs))
# Example, assuming 'table' is a Boto DynamoDB table object:
all_items = list(iterate_paged_results(ProjectionExpression = 'my_field'))
Run Code Online (Sandbox Code Playgroud)
如果您登陆此处寻找带有某些过滤表达式的分页扫描:
def scan(table, **kwargs):
response = table.scan(**kwargs)
yield from response['Items']
while response.get('LastEvaluatedKey'):
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'], **kwargs)
yield from response['Items']
Run Code Online (Sandbox Code Playgroud)
用法示例:
table = boto3.Session(...).resource('dynamodb').Table('widgetsTableName')
items = list(scan(table, FilterExpression=Attr('name').contains('foo')))
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
44564 次 |
最近记录: |