str*_*dmc 4 python apache-beam
我是Apache Beam的新手,我想计算大型数据集的平均值和标准偏差.
给定一个"A,B"形式的.csv文件,其中A,B是整数,这基本上就是我所拥有的.
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.textio import ReadFromText
class Split(beam.DoFn):
def process(self, element):
A, B = element.split(',')
return [('A', A), ('B', B)]
with beam.Pipeline(options=PipelineOptions()) as p:
# parse the rows
rows = (p
| ReadFromText('data.csv')
| beam.ParDo(Split()))
# calculate the mean
avgs = (rows
| beam.CombinePerKey(
beam.combiners.MeanCombineFn()))
# calculate the stdv per key
# ???
std >> beam.io.WriteToText('std.out')
Run Code Online (Sandbox Code Playgroud)
我想做点什么:
class SquaredDiff(beam.DoFn):
def process(self, element):
A = element[0][1]
B = element[1][1]
return [('A', A - avgs[0]), ('B', B - avgs[1])]
stdv = (rows
| beam.ParDo(SquaredDiff())
| beam.CombinePerKey(
beam.combiners.MeanCombineFn()))
Run Code Online (Sandbox Code Playgroud)
什么的,但我无法弄清楚如何.
编写自己的合成器.这将有效:
class MeanStddev(beam.CombineFn):
def create_accumulator(self):
return (0.0, 0.0, 0) # x, x^2, count
def add_input(self, sum_count, input):
(sum, sumsq, count) = sum_count
return sum + input, sumsq + input*input, count + 1
def merge_accumulators(self, accumulators):
sums, sumsqs, counts = zip(*accumulators)
return sum(sums), sum(sumsqs), sum(counts)
def extract_output(self, sum_count):
(sum, sumsq, count) = sum_count
if count:
mean = sum / count
variance = (sumsq / count) - mean*mean
# -ve value could happen due to rounding
stddev = np.sqrt(variance) if variance > 0 else 0
return {
'mean': mean,
'variance': variance,
'stddev': stddev,
'count': count
}
else:
return {
'mean': float('NaN'),
'variance': float('NaN'),
'stddev': float('NaN'),
'count': 0
}
Run Code Online (Sandbox Code Playgroud)
这将方差计算为E(x ^ 2) - E(x)*E(x),因此您只需要传递一次数据.这就是你如何使用上面的组合器:
[1.3, 3.0, 4.2] | beam.CombineGlobally(MeanStddev())
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
359 次 |
| 最近记录: |