aut*_*ard 8 python google-app-engine memory-management mapreduce
我正在研究appengine-mapreduce函数,并修改了演示以符合我的目的.基本上我有以下格式的百万行:userid,time1,time2.我的目的是找到每个用户ID的time1和time2之间的差异.
但是,当我在Google App Engine上运行时,我在日志部分遇到了此错误消息:
在处理130个请求之后超过180.56 MB的软私有内存限制在处理此请求时,发现处理此请求的进程使用了太多内存并被终止.这可能会导致新进程用于您的应用程序的下一个请求.如果经常看到此消息,则可能是应用程序中存在内存泄漏.
def time_count_map(data):
"""Time count map function."""
(entry, text_fn) = data
text = text_fn()
try:
q = text.split('\n')
for m in q:
reader = csv.reader([m.replace('\0', '')], skipinitialspace=True)
for s in reader:
"""Calculate time elapsed"""
sdw = s[1]
start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p")
edw = s[2]
end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p")
time_difference = time.mktime(end_date) - time.mktime(start_date)
yield (s[0], time_difference)
except IndexError, e:
logging.debug(e)
def time_count_reduce(key, values):
"""Time count reduce function."""
time = 0.0
for subtime in values:
time += float(subtime)
realtime = int(time)
yield "%s: %d\n" % (key, realtime)
Run Code Online (Sandbox Code Playgroud)
任何人都可以建议我如何更好地优化我的代码?谢谢!!
编辑:
这是管道处理程序:
class TimeCountPipeline(base_handler.PipelineBase):
"""A pipeline to run Time count demo.
Args:
blobkey: blobkey to process as string. Should be a zip archive with
text files inside.
"""
def run(self, filekey, blobkey):
logging.debug("filename is %s" % filekey)
output = yield mapreduce_pipeline.MapreducePipeline(
"time_count",
"main.time_count_map",
"main.time_count_reduce",
"mapreduce.input_readers.BlobstoreZipInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params={
"blob_key": blobkey,
},
reducer_params={
"mime_type": "text/plain",
},
shards=32)
yield StoreOutput("TimeCount", filekey, output)
Run Code Online (Sandbox Code Playgroud)
Mapreduce.yaml:
mapreduce:
- name: Make messages lowercase
params:
- name: done_callback
value: /done
mapper:
handler: main.lower_case_posts
input_reader: mapreduce.input_readers.DatastoreInputReader
params:
- name: entity_kind
default: main.Post
- name: processing_rate
default: 100
- name: shard_count
default: 4
- name: Make messages upper case
params:
- name: done_callback
value: /done
mapper:
handler: main.upper_case_posts
input_reader: mapreduce.input_readers.DatastoreInputReader
params:
- name: entity_kind
default: main.Post
- name: processing_rate
default: 100
- name: shard_count
default: 4
Run Code Online (Sandbox Code Playgroud)
其余文件与演示完全相同.
我已经在dropbox上传了我的代码副本:http://dl.dropbox.com/u/4288806/demo%20compressed%20fail%20memory.zip
还要考虑在代码中的常规点调用gc.collect().我已经看到了几个关于超过软内存限制的SO问题,这些问题通过调用gc.collect()来缓解,大多数情况与blobstore有关.
您的输入文件的大小可能超出了软内存限制。对于大文件,请使用BlobstoreLineInputReader或BlobstoreZipLineInputReader。
这些输入读取器向函数传递不同的东西map,它们传递start_position文件中的内容和文本行。
您的map函数可能类似于:
def time_count_map(data):
"""Time count map function."""
text = data[1]
try:
reader = csv.reader([text.replace('\0', '')], skipinitialspace=True)
for s in reader:
"""Calculate time elapsed"""
sdw = s[1]
start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p")
edw = s[2]
end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p")
time_difference = time.mktime(end_date) - time.mktime(start_date)
yield (s[0], time_difference)
except IndexError, e:
logging.debug(e)
Run Code Online (Sandbox Code Playgroud)
使用BlobstoreLineInputReader将使作业运行得更快,因为它可以使用多个分片(最多 256 个),但这意味着您需要上传未压缩的文件,这可能会很痛苦。我通过将压缩文件上传到 EC2 Windows 服务器来处理它,然后从那里解压缩并上传,因为上游带宽太大了。