我是 hadoop 流媒体的新手。我的reduce代码中有几个过滤条件,我想知道有多少记录通过了这个条件。我开始知道我们可以通过编写自定义计数器来做到这一点。一些身体展示可以告诉我如何编写自定义计数器吗?
我在映射器代码中发出三列,说a,b,c
键是 a,值是列表,就像[b,c]
,从映射器代码中得到一个例子,就像['I'^['C','P']]
这是我的减少代码。
labels = ["a","b"]
for line in sys.stdin:
l = line.strip().split("^")
key = l[0]
value = l[1]
record = [key] + value
records.append(record)
df = pd.DataFrame.from_records(records,columns=labels)
df = df((df['a'] == 'I') & (df['b'] == 'C'))
Run Code Online (Sandbox Code Playgroud)
我想知道 df 在减速器级别包含多少条记录。
谢谢你。
您可以简单地打印到 stderr:
print >> sys.stderr, "reporter:counter: CUSTOM, NbRecords,1"
Run Code Online (Sandbox Code Playgroud)
这将使计数器组“CUSTOM”中的计数器“NbRecords”增加 1
如果使用 mrjob,
class MRCountingJob(MRJob):
def mapper(self, _, value):
self.increment_counter('group', 'counter_name', 1)
yield _, value
Run Code Online (Sandbox Code Playgroud)
如果使用基本的hadoop 流 API(使用 python),
sys.stderr.write("reporter:counter:group,counter_name,1\n")
Run Code Online (Sandbox Code Playgroud)
例如,其中group
可能是"My Mapper
", "My Reducer"
,或者"My FooBar"
计数器可能是num_calls
,并且该值通常始终为 1,因为这些值将由框架求和。(使用 时stderr.write
,不要忘记尾随换行符,\n
)
归档时间: |
|
查看次数: |
2767 次 |
最近记录: |