(PySpark)reduceByKey之后的嵌套列表

Spa*_*man 3 python apache-spark

我确信这很简单,但我找不到与此有关的任何内容.

我的代码很简单:

... 
stream = stream.map(mapper) 
stream = stream.reduceByKey(reducer) 
... 
Run Code Online (Sandbox Code Playgroud)

没什么特别的.毕竟,输出看起来像这样:

... 
key1  value1 
key2  [value2, value3] 
key3  [[value4, value5], value6] 
... 
Run Code Online (Sandbox Code Playgroud)

等等.所以,有时我得到一个平坦的值(如果它是单一的).有时 - 嵌套列表可能非常非常深(在我的简单测试数据上它是3级深度).

我尝试通过源搜索"flat"之类的东西 - 但是只找到了flatMap方法(据我所知)并不是我需要的方法.

我不知道为什么这些列表是嵌套的.我的猜测是,它们是由不同的流程(工人?)处理的,然后在没有展平的情况下连接在一起.

当然,我可以用Python编写一个代码,它将展开该列表并将其展平.但我认为这不是一个正常的情况 - 我认为几乎每个人都需要一个平坦的输出.

itertools.chain停止在第一个找到的非可迭代值上展开.换句话说,它仍然需要一些编码(前一段).

那么 - 如何使用PySpark的本地方法展平列表?

谢谢

Jos*_*sen 5

这里的问题是你的reduce功能.对于每个键,reduceByKey使用成对值调用reduce函数,并期望它生成相同类型的组合值.

例如,假设我想执行字数统计操作.首先,我可以将每个单词映射到一(word, 1)对,然后我可以reduceByKey(lambda x, y: x + y)总结每个单词的计数.最后,我留下了一(word, count)对RDD .

以下是PySpark API文档中的示例:

>>> from operator import add
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.reduceByKey(add).collect())
[('a', 2), ('b', 1)]
Run Code Online (Sandbox Code Playgroud)

要理解为什么你的例子不起作用,你可以想象应用的reduce函数是这样的:

reduce(reduce(reduce(firstValue, secondValue), thirdValue), fourthValue) ...
Run Code Online (Sandbox Code Playgroud)

根据您的reduce函数,听起来您可能正在尝试实现内置groupByKey操作,该操作将每个键与其值列表组合在一起.

另外,看一下combineByKey,它的泛化reduceByKey()允许reduce函数的输入和输出类型不同(reduceByKey实现方式实现combineByKey)