从数据存储区查询大量ndb实体的最佳实践

All*_*len 62 google-app-engine app-engine-ndb google-cloud-datastore

我在App Engine数据存储区遇到了一个有趣的限制.我正在创建一个处理程序来帮助我们分析一个生产服务器上的一些使用数据.为了执行分析,我需要查询和汇总从数据存储中提取的10,000多个实体.计算并不难,它只是通过使用样本的特定过滤器的项目的直方图.我遇到的问题是,在达到查询截止日期之前,我无法快速从数据存储区中获取数据以进行任何处理.

我已经尝试了所有我能想到的将查询分块到并行RPC调用以提高性能,但根据appstats我似乎无法让查询实际并行执行.无论我尝试什么方法(见下文),似乎RPC总是回到顺序下一个查询的瀑布.

注意:查询和分析代码确实有效,它只是运行缓慢,因为我无法从数据存储中快速获取数据.

背景

我没有可以分享的实时版本,但这里是我正在谈论的系统部分的基本模型:

class Session(ndb.Model):
   """ A tracked user session. (customer account (company), version, OS, etc) """
   data = ndb.JsonProperty(required = False, indexed = False)

class Sample(ndb.Model):
   name      = ndb.StringProperty  (required = True,  indexed = True)
   session   = ndb.KeyProperty     (required = True,  kind = Session)
   timestamp = ndb.DateTimeProperty(required = True,  indexed = True)
   tags      = ndb.StringProperty  (repeated = True,  indexed = True)
Run Code Online (Sandbox Code Playgroud)

您可以将样本视为用户使用给定名称功能的时间.(例如:'systemA.feature_x').标签基于客户详细信息,系统信息和功能.例如:['winxp','2.5.1','systemA','feature_x','premium_account']).因此,标签形成一组非规范化的标记,可用于查找感兴趣的样本.

我试图做的分析包括获取一个日期范围,并询问每个客户帐户(公司,而不是每个用户)每天(或每小时)使用的功能集(可能是所有功能)的特征次数.

因此处理程序的输入类似于:

  • 开始日期
  • 结束日期
  • 标签(S)

输出将是:

[{
   'company_account': <string>,
   'counts': [
      {'timeperiod': <iso8601 date>, 'count': <int>}, ...
   ]
 }, ...
]
Run Code Online (Sandbox Code Playgroud)

查询的通用代码

以下是所有查询的一些共同代码.处理程序的一般结构是使用webapp2的简单get处理程序,它设置查询参数,运行查询,处理结果,创建要返回的数据.

# -- Build Query Object --- #
query_opts = {}
query_opts['batch_size'] = 500   # Bring in large groups of entities

q = Sample.query()
q = q.order(Sample.timestamp)

# Tags
tag_args = [(Sample.tags == t) for t in tags]
q = q.filter(ndb.query.AND(*tag_args))

def handle_sample(sample):
   session_obj = sample.session.get()    # Usually found in local or memcache thanks to ndb
   count_key   = session_obj.data['customer']
   addCountForPeriod(count_key, sample.timestamp)
Run Code Online (Sandbox Code Playgroud)

试过的方法

我尝试了各种方法尝试尽可能快地并行地从数据存储区中提取数据.到目前为止我尝试过的方法包括:

A.单次迭代

这是一个简单的基本案例,可以与其他方法进行比较.我只是构建查询并迭代所有项目,让ndb做它做的一个接一个地拉它们.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
q_iter = q.iter(**query_opts)

for sample in q_iter:
   handle_sample(sample)
Run Code Online (Sandbox Code Playgroud)

B.大取

这里的想法是看我是否可以做一个非常大的提取.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
samples = q.fetch(20000, **query_opts)

for sample in samples:
   handle_sample(sample)
Run Code Online (Sandbox Code Playgroud)

C.跨时间范围的异步提取

这里的想法是认识到样本在时间上相当好,所以我可以创建一组独立的查询,将整个时间区域分成块,并尝试使用异步并行运行每个查询:

# split up timestamp space into 20 equal parts and async query each of them
ts_delta       = (end_time - start_time) / 20
cur_start_time = start_time
q_futures = []

for x in range(ts_intervals):
   cur_end_time = (cur_start_time + ts_delta)
   if x == (ts_intervals-1):    # Last one has to cover full range
      cur_end_time = end_time

   f = q.filter(Sample.timestamp >= cur_start_time,
                Sample.timestamp < cur_end_time).fetch_async(limit=None, **query_opts)
   q_futures.append(f)
   cur_start_time = cur_end_time

# Now loop through and collect results
for f in q_futures:
   samples = f.get_result()
   for sample in samples:
      handle_sample(sample)
Run Code Online (Sandbox Code Playgroud)

D.异步映射

我尝试了这种方法,因为文档听起来像ndb可能会在使用Query.map_async方法时自动利用一些并行性.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)

@ndb.tasklet
def process_sample(sample):
   period_ts   = getPeriodTimestamp(sample.timestamp)
   session_obj = yield sample.session.get_async()    # Lookup the session object from cache
   count_key   = session_obj.data['customer']
   addCountForPeriod(count_key, sample.timestamp)
   raise ndb.Return(None)

q_future = q.map_async(process_sample, **query_opts)
res = q_future.get_result()
Run Code Online (Sandbox Code Playgroud)

结果

我测试了一个示例查询来收集总体响应时间和appstats跟踪.结果是:

A.单次迭代

真实的:15.645s

这个按顺序依次获取批次,然后从memcache中检索每个会话.

方法A appstats

B.大取

真实:12.12s

实际上与选项A相同,但由于某种原因有点快.

方法B appstats

C.跨时间范围的异步提取

真实:15.251s

似乎在开始时提供更多并行性,但似乎在结果迭代期间通过一系列调用减慢速度.也似乎无法将会话memcache查找与挂起的查询重叠.

方法C appstats

D.异步映射

真实的:13.752s

这是我最难理解的.看起来它有很多重叠,但似乎所有东西都在瀑布而不是平行延伸.

方法D appstats

建议

基于这一切,我错过了什么?我只是在App Engine上达到了限制,还是有更好的方法可以同时降低大量实体?

我不知道下一步该尝试什么.我想重写客户端以同时向app引擎发出多个请求,但这似乎是相当暴力的.我真的希望应用程序引擎应该能够处理这个用例,所以我猜我有些东西是我遗漏的.

更新

最后我发现选项C对我来说是最好的.我能够在6.1秒内完成优化.仍然不完美,但更好.

在得到几个人的建议后,我发现以下项目是理解和记住的关键:

  • 多个查询可以并行运行
  • 一次只能运行10个RPC
  • 尝试非规范化到没有辅助查询的程度
  • 这种类型的任务最好留下映射reduce和任务队列,而不是实时查询

所以我做了更快的事情:

  • 我根据时间从头开始对查询空间进行了分区.(注意:就返回的实体而言,分区越相等越好)
  • 我进一步对数据进行了非规范化,以消除对辅助会话查询的需要
  • 我使用了ndb异步操作和wait_any()来使查询与处理重叠

我仍然没有得到我期望或喜欢的表现,但它现在是可行的.我只是希望它们是在处理程序中快速将大量顺序实体拉入内存的更好方法.

mji*_*son 8

像这样的大型处理不应该在具有60秒时间限制的用户请求中完成.相反,它应该在支持长时间运行的请求的上下文中完成.该任务队列支持请求长达10分钟,(我相信)普通内存的限制(F1情况下,默认情况下,拥有128MB内存).对于更高的限制(无请求超时,1GB +内存),请使用后端.

这里有一些尝试:设置一个URL,当访问时,它会触发任务队列任务.如果任务队列任务尚未完成,它将返回一个网页,该网页每隔约5秒轮询另一个响应true/false的URL.任务队列处理数据(可能需要大约10秒),并将结果作为计算数据或呈现的网页保存到数据存储区.一旦初始页面检测到它已完成,用户就会被重定向到页面,该页面从数据存储区中获取现在计算的结果.