fjs*_*jsj 6 python google-app-engine mapreduce task-queue google-cloud-datastore
我刚看了谷歌I/O 2010 App Engine会话的批量数据处理,阅读了Google Research的MapReduce部分文章,现在我想在Google App Engine上使用MapReduce来实现Python的推荐系统.
我更喜欢使用appengine-mapreduce而不是Task Queue API,因为前者提供了对某种实例,自动批处理,自动任务链等的轻松迭代.问题是:我的推荐系统需要计算两个不同模型的实例之间的相关性,即两种不同种类的实例.
示例:我有这两个模型:用户和项目.每个标签都有一个标签列表作为属性.以下是计算用户和项目之间的相关性的函数.请注意,calculateCorrelation应为每个用户和项目组合调用:
def calculateCorrelation(user, item):
return calculateCorrelationAverage(u.tags, i.tags)
def calculateCorrelationAverage(tags1, tags2):
correlationSum = 0.0
for (tag1, tag2) in allCombinations(tags1, tags2):
correlationSum += correlation(tag1, tag2)
return correlationSum / (len(tags1) + len(tags2))
def allCombinations(list1, list2):
combinations = []
for x in list1:
for y in list2:
combinations.append((x, y))
return combinations
Run Code Online (Sandbox Code Playgroud)
但这calculateCorrelation不是appengine-mapreduce中的有效Mapper,也许这个函数甚至不兼容MapReduce计算概念.然而,我需要确定......对于我来说,拥有像自动批处理和任务链接这样的appengine-mapreduce优势真的很棒.
那有什么解决方案吗?
我应该定义自己的InputReader吗?读取两种不同类型的所有实例的新InputReader与当前的appengine-mapreduce实现兼容?
或者我应该尝试以下?
根据 Nick Johnson 的建议,我编写了自己的 InputReader。该读取器从两种不同类型中获取实体。它生成包含这些实体的所有组合的元组。这里是:
class TwoKindsInputReader(InputReader):
_APP_PARAM = "_app"
_KIND1_PARAM = "kind1"
_KIND2_PARAM = "kind2"
MAPPER_PARAMS = "mapper_params"
def __init__(self, reader1, reader2):
self._reader1 = reader1
self._reader2 = reader2
def __iter__(self):
for u in self._reader1:
for e in self._reader2:
yield (u, e)
@classmethod
def from_json(cls, input_shard_state):
reader1 = DatastoreInputReader.from_json(input_shard_state[cls._KIND1_PARAM])
reader2 = DatastoreInputReader.from_json(input_shard_state[cls._KIND2_PARAM])
return cls(reader1, reader2)
def to_json(self):
json_dict = {}
json_dict[self._KIND1_PARAM] = self._reader1.to_json()
json_dict[self._KIND2_PARAM] = self._reader2.to_json()
return json_dict
@classmethod
def split_input(cls, mapper_spec):
params = mapper_spec.params
app = params.get(cls._APP_PARAM)
kind1 = params.get(cls._KIND1_PARAM)
kind2 = params.get(cls._KIND2_PARAM)
shard_count = mapper_spec.shard_count
shard_count_sqrt = int(math.sqrt(shard_count))
splitted1 = DatastoreInputReader._split_input_from_params(app, kind1, params, shard_count_sqrt)
splitted2 = DatastoreInputReader._split_input_from_params(app, kind2, params, shard_count_sqrt)
inputs = []
for u in splitted1:
for e in splitted2:
inputs.append(TwoKindsInputReader(u, e))
#mapper_spec.shard_count = len(inputs) #uncomment this in case of "Incorrect number of shard states" (at line 408 in handlers.py)
return inputs
@classmethod
def validate(cls, mapper_spec):
return True #TODO
Run Code Online (Sandbox Code Playgroud)
当您需要处理两种实体的所有组合时,应该使用此代码。您还可以将其概括为两种以上。
这是一个有效的mapreduce.yaml TwoKindsInputReader:
mapreduce:
- name: recommendationMapReduce
mapper:
input_reader: customInputReaders.TwoKindsInputReader
handler: recommendation.calculateCorrelationHandler
params:
- name: kind1
default: kinds.User
- name: kind2
default: kinds.Item
- name: shard_count
default: 16
Run Code Online (Sandbox Code Playgroud)