loc*_*ter 5 python apache-beam
在进行一些处理并按键分组后,我得到了如下所示的数据集。我现在需要对每一行数据进行一些处理以获得下面的输出。我尝试过平面地图,它真的很慢,因为“值”列表的长度可以是任意长的。我想我可以将每一行分成单独的 pcollections,并行处理,然后将它们压平在一起。如何将每一行拆分为不同的 pcollection?如果这不可行,是否有其他方法可以加快计算速度?
输入
key, value
1 [A, B, B, B]
2 [A, B, B, B]
3 [A, B, B, B]
4 [A, B, B, B]
5 [A, B, B, B]
Run Code Online (Sandbox Code Playgroud)
输出:
key, value
1 (A, 0)
1 (B, 1)
1 (B, 2)
1 (B, 3)
2 (A, 0)
2 (B, 1)
2 (B, 2)
2 (B, 3)
...
Run Code Online (Sandbox Code Playgroud)
使用 Apache Beam 模型时,一个常见的误解是并行化方案是由 PCollection 定义的(可以理解,因为这是 Parallel Collection 的缩写)。实际上,并行化是在每个 PCollection[1] 中的每个键上定义的。换句话说,Beam 模型并行处理键,但顺序处理单个键中的值。
您遇到的问题通常称为热键。当太多值与单个键配对时,就会发生这种情况,从而限制了并行性。
To manipulate the data to the expected output you will have to edit your existing pipeline to emit the values in such a way that not all elements go to a single key. This is a little tough because it looks like in your example you wish to output the index with the element. If this is the case, then no matter how you cut it, you will have to merge all the values somewhere to a key in memory to get the correct index.
If you don't care about getting the specific index like you have in the above example then take a look at the following code. This code assigns each element to a random partition within each key. This helps to break up the number of elements per key into something manageable.
data = [
(k, c) for k in range(1, 6) for c in ('A', 'B', 'B', 'B')
]
p = beam.Pipeline()
elems = p | beam.Create(data)
num_buckets = 4
class Preprocess(beam.DoFn):
def process(self, el):
key = str(el[0])
partition = random.randint(0, num_buckets)
yield (key, partition), el
class Postprocess(beam.DoFn):
def process(self, el):
(key, partition), values = el
index = 0
for el in values:
yield key, (el[1], partition, index)
index += 1
out = (elems | beam.ParDo(Preprocess())
| beam.GroupByKey()
| beam.ParDo(Postprocess()))
Run Code Online (Sandbox Code Playgroud)
Input
key,value
1 A
1 B
1 B
1 B
2 A
2 B
2 B
2 B
3 A
3 B
...
Run Code Online (Sandbox Code Playgroud)
Potential Output
key,(value,partition,index)
1 ('A', 1, 0)
1 ('B', 1, 1)
1 ('B', 2, 0)
1 ('B', 3, 0)
2 ('A', 3, 0)
2 ('B', 3, 1)
2 ('B', 1, 0)
2 ('B', 1, 1)
3 ('A', 3, 0)
3 ('B', 2, 0)
...
Run Code Online (Sandbox Code Playgroud)
[1] When using streaming it is defined per-key-per-window
| 归档时间: |
|
| 查看次数: |
1426 次 |
| 最近记录: |