优化任务以减少交易应用程序中的CPU

Joe*_*oel 3 google-app-engine

我设计了一个处理客户股票投资组合的交易应用程序.

我使用两种数据存储类型:

  1. 股票 - 包含唯一的股票名称及其每日百分比变化.
  2. UserTransactions -包含关于具体购买由用户做出股票的信息:购买具有参考股票当前购买沿价值.

db.Model python模块:

class Stocks (db.Model):
stockname = db.StringProperty(multiline=True) 
dailyPercentChange=db.FloatProperty(default=1.0) 

class UserTransactions (db.Model): 
buyer = db.UserProperty() 
value=db.FloatProperty() 
stockref = db.ReferenceProperty(Stocks) 
Run Code Online (Sandbox Code Playgroud)

每小时我需要更新一次数据库:更新每日百分比变化Stocks,然后更新UserTransactions引用该股票的所有实体的价值.

以下python模块迭代所有股票,更新dailyPercentChange属性,并调用任务以遍历所有UserTransactions引用股票的实体并更新其值:

Stocks.py

# Iterate over all stocks in datastore
for stock in Stocks.all():
   # update daily percent change in datastore
   db.run_in_transaction(updateStockTxn, stock.key()) 
   # create a task to update all user transactions entities referring to this stock
   taskqueue.add(url='/task', params={'stock_key': str(stock.key(), 'value' : self.request.get ('some_val_for_stock') }) 

def updateStockTxn(stock_key):
   #fetch the stock again - necessary to avoid concurrency updates
   stock = db.get(stock_key)
   stock.dailyPercentChange= data.get('some_val_for_stock') # I get this value from outside
   ... some more calculations here ...
   stock.put()
Run Code Online (Sandbox Code Playgroud)

Task.py(/ task)

# Amount of transaction per task
amountPerCall=10
stock=db.get(self.request.get("stock_key")) 
# Get all user transactions which point to current stock
user_transaction_query=stock.usertransactions_set
cursor=self.request.get("cursor") 
if cursor: 
    user_transaction_query.with_cursor(cursor) 

# Spawn another task if more than 10 transactions are in datastore
transactions = user_transaction_query.fetch(amountPerCall) 
if len(transactions)==amountPerCall: 
    taskqueue.add(url='/task', params={'stock_key': str(stock.key(), 'value' : self.request.get ('some_val_for_stock'), 'cursor': user_transaction_query.cursor()  })

# Iterate over all transaction pointing to stock and update their value
for transaction in transactions: 
   db.run_in_transaction(updateUserTransactionTxn, transaction.key()) 

def updateUserTransactionTxn(transaction_key): 
   #fetch the transaction again - necessary to avoid concurrency updates
   transaction = db.get(transaction_key)
   transaction.value= transaction.value* self.request.get ('some_val_for_stock')
   db.put(transaction) 
Run Code Online (Sandbox Code Playgroud)

问题:

目前,该系统的伟大工程,但问题是,它不结垢好......我身边有100只股票有300个用户交易和我运行每隔一小时更新.在仪表板中,我看到task.py需要大约65%的CPU(Stock.py需要大约20%-30%)而且我几乎使用app引擎给我的所有6.5小时免费CPU时间.我没有问题启用计费和支付额外的CPU,但问题是系统的扩展...使用6.5 CPU小时100个股票是非常差的.

我想知道,考虑到上面提到的系统要求,如果有一个更好,更有效的实现(或只是一个可以帮助当前实现的小变化),而不是这里提出的实现.

谢谢!!

乔尔

Nic*_*son 8

有几个明显的改进:

  1. 您应该在第一个片段中使用keys_only查询:因为您实际上没有在任何时候引用stock对象的属性,所以检索它是没有意义的.您也可以只检索密钥.
  2. 您可以使用此处记录的Queue对象.add方法批量添加任务.这比单独添加任务更有效.
  3. 您的任务每10个事务链接一个新任务,但任务最多可运行10分钟,10个数据存储区事务可能不会超过一两秒.相反,在请求开始时设置一个计时器,并在每次循环时检查它,在接近10分钟限制时中止并链接下一个任务.
  4. 如果您希望迭代大量实体,请使用.fetch和游标,而不是迭代; 迭代20个实体的小批量提取.
  5. 在单个实体更新中,您再次进行常规查询,但仅使用密钥.改为执行keys_only查询.
  6. 该任务是否是UserTransaction在最初编写实体后更新实体的唯一内容?如果是这样,您可以跳过该事务并批量更新它们.

最后,我建议进行整体重构:不是为每个股票开始新任务,而是使用上面提到的计时器在任务中运行外部循环.当您链接下一个任务时,使用游标传递当前状态并从中断处继续.

唯一需要考虑的是,如果有某种方法可以重构数据,以避免需要这么多更新.例如,您可以让UserTransaction实体引用Stock实体中的某个值,以便您可以在运行时计算它们的实际值,并且您只需要更改单个Stock实体吗?