DynamoDB批处理执行QueryRequests

Cat*_*fey 4 amazon-dynamodb

我有以下DynamoDB查询,它返回带有哈希苹果的第一条记录和低于some_timestamp的时间戳.

Map<String, Condition> keyConditions = newHashMap();

keyConditions.put("HASH", new Condition().
    withComparisonOperator(EQ).
    withAttributeValueList(new AttributeValue().withS("apple")))
);

keyConditions.put("TIMESTAMP", new Condition().
    withComparisonOperator(LE).
    withAttributeValueList(new AttributeValue().withN(some_timestamp)))
);

QueryResult queryResult = dynamoDBClient.query(
    new QueryRequest().
            withTableName("TABLE").
            withKeyConditions(keyConditions).
            withLimit(1).
            withScanIndexForward(SCAN_INDEX_FORWARD)
);
Run Code Online (Sandbox Code Playgroud)

我需要执行这种类型的许多查询,所以我的问题是:是否可以批量执行这些查询?类似于以下API.

Map<String, Condition> keyConditions = newHashMap();

keyConditions.put("HASH", new Condition().
    withComparisonOperator(EQ).
    withAttributeValueList(new AttributeValue().withS("apple")))
);

keyConditions.put("TIMESTAMP", new Condition().
    withComparisonOperator(LE).
    withAttributeValueList(new AttributeValue().withN(some_timestamp)))
);

QueryRequest one = new QueryRequest().
    withTableName("TABLE").
    withKeyConditions(keyConditions).
    withLimit(1).
    withScanIndexForward(SCAN_INDEX_FORWARD);

keyConditions = newHashMap();

keyConditions.put("HASH", new Condition().
    withComparisonOperator(EQ).
    withAttributeValueList(new AttributeValue().withS("pear")))
);

keyConditions.put("TIMESTAMP", new Condition().
    withComparisonOperator(LE).
    withAttributeValueList(new AttributeValue().withN(some_other_timestamp)))
);

QueryRequest two = new QueryRequest().
    withTableName("TABLE").
    withKeyConditions(keyConditions).
    withLimit(1).
    withScanIndexForward(SCAN_INDEX_FORWARD)

ArrayList<String> queryRequests = new ArrayList<String>() {{
    add(one);
    add(two);
}};

List<QueryResult> queryResults = dynamoDBClient.query(queryRequests);
Run Code Online (Sandbox Code Playgroud)

Dav*_*cek 5

从AWS论坛上一个非常类似的问题在这里:

DynamoDB的Query API仅支持在查询操作中单个"使用"索引,因此,您查询的索引的"哈希"必须指定为EQ条件.DynamoDB目前没有任何类型的"批量查询"API,所以不幸的是,您今天在单个API调用中无法找到所需的内容.如果这些是GetItem请求(虽然不适合您的用例),您可以发出BatchGetItem请求.

与此同时,由于看起来您正在使用Java,我的建议是使用线程并行发出多个查询请求.下面是一些完成此操作的示例代码,但您需要考虑应用程序如何处理分页/部分结果和错误:

/**
* Simulate a "Batch Query" operation in DynamoDB by querying an index for 
* multiple hash keys
* 
* Resulting list may be incomplete if any queries time out.  Returns a list of 
* QueryResult so that LastEvaluatedKeys can be followed.  A better implementation 
* would answer the case where some queries fail, deal with pagination (and 
* Limit), have configurable timeouts.  One improvement on this end would be 
* to make a simple immutable bean that contains a query result or exception, 
* as well as the associated request.  Maybe it could even be called back with 
* a previous list for pagination. 
* 
* @param hashKeyValues (you'll also need table name / index name) 
* @return a list of query results for the queries that succeeded
* @throws InterruptedException
*/
public List<QueryResult> queryAll(String... hashKeyValues) 
  throws InterruptedException {
  // initialize accordingly
  int timeout = 2 * 1000;
  ExecutorService executorService = Executors.newFixedThreadPool(10);

  final List<QueryResult> results = 
    new ArrayList<QueryResult>(hashKeyValues.length);
  final CountDownLatch latch = 
    new CountDownLatch(hashKeyValues.length);

  // Loop through the hash key values to "OR" in the final list of results
  for (final String hashKey : hashKeyValues) {

    executorService.submit(new Runnable() {

      @Override
      public void run() {
        try {
          // fill in parameters
          QueryResult result = dynamodb.query(new QueryRequest()
            .withTableName("MultiQueryExample")
            .addKeyConditionsEntry("City", new Condition()
              .withComparisonOperator("EQ")
            .withAttributeValueList(new AttributeValue(hashKey))));
          // one of many flavors of dealing with concurrency
          synchronized (results) { 
            results.add(result);
          }
        } catch (Throwable t) {
          // Log and handle errors
          t.printStackTrace();
        } finally {
          latch.countDown();
        }
      }
    });
  }

  // Wait for all queries to finish or time out
  latch.await(timeout, TimeUnit.MILLISECONDS);

  // return a copy to prevent concurrent modification of 
  // the list in the face of timeouts
  synchronized (results) {
    return new ArrayList<QueryResult>(results);
  }
}
Run Code Online (Sandbox Code Playgroud)

  • 与 GetItem 只返回一个(可选)结果不同,查询可以返回无限的结果集。一旦您返回不同大小的结果集,批处理相对于手动并行发送查询的优势就会在结果集解析逻辑中消失。 (2认同)