如何在 python 中的 hadoop 流中实现计数器

sub*_*bro 5 python hadoop

我是 hadoop 流媒体的新手。我的reduce代码中有几个过滤条件,我想知道有多少记录通过了这个条件。我开始知道我们可以通过编写自定义计数器来做到这一点。一些身体展示可以告诉我如何编写自定义计数器吗?

我在映射器代码中发出三列,说a,b,c 键是 a,值是列表,就像[b,c],从映射器代码中得到一个例子,就像['I'^['C','P']]

这是我的减少代码。

labels = ["a","b"]
for line in sys.stdin:
    l = line.strip().split("^")
    key = l[0]
    value = l[1]
    record = [key] + value
    records.append(record)
df = pd.DataFrame.from_records(records,columns=labels)
df = df((df['a'] == 'I') & (df['b'] == 'C'))
Run Code Online (Sandbox Code Playgroud)

我想知道 df 在减速器级别包含多少条记录。

谢谢你。

use*_*446 6

您可以简单地打印到 stderr:

print >> sys.stderr, "reporter:counter: CUSTOM, NbRecords,1"
Run Code Online (Sandbox Code Playgroud)

这将使计数器组“CUSTOM”中的计数器“NbRecords”增加 1


mic*_*ael 5

如果使用 mrjob

class MRCountingJob(MRJob):

    def mapper(self, _, value):
        self.increment_counter('group', 'counter_name', 1)
        yield _, value
Run Code Online (Sandbox Code Playgroud)

如果使用基本的hadoop 流 API(使用 python),

sys.stderr.write("reporter:counter:group,counter_name,1\n")
Run Code Online (Sandbox Code Playgroud)

例如,其中group可能是"My Mapper", "My Reducer",或者"My FooBar"计数器可能是num_calls,并且该值通常始终为 1,因为这些值将由框架求和。(使用 时stderr.write,不要忘记尾随换行符,\n