mrjob:在EMR上设置日志记录

Bek*_*eka 6 python logging hadoop mapreduce mrjob

我正在尝试使用mrjob在EMR上运行hadoop,并且无法弄清楚如何设置日志记录(用户生成的日志在map/reduce步骤中),因此我可以在集群终止后访问它们.

我已经使用了试图建立日志logging模块,print并且sys.stderr.write()到目前为止,但没有运气.对我有用的唯一选择是将日志写入文件然后SSH机器并读取它,但它很麻烦.我希望我的日志转到stderr/stdout/syslog并自动收集到S3,因此我可以在群集终止后查看它们.

这是带有日志记录的word_freq示例:

"""The classic MapReduce job: count the frequency of words.
"""
from mrjob.job import MRJob
import re
import logging
import logging.handlers
import sys

WORD_RE = re.compile(r"[\w']+")


class MRWordFreqCount(MRJob):

    def mapper_init(self):
        self.logger = logging.getLogger()
        self.logger.setLevel(logging.INFO)
        self.logger.addHandler(logging.FileHandler("/tmp/mr.log"))
        self.logger.addHandler(logging.StreamHandler())
        self.logger.addHandler(logging.StreamHandler(sys.stdout))
        self.logger.addHandler(logging.handlers.SysLogHandler())

    def mapper(self, _, line):
        self.logger.info("Test logging: %s", line)
        sys.stderr.write("Test stderr: %s\n" % line)
        print "Test print: %s" % line
        for word in WORD_RE.findall(line):
            yield (word.lower(), 1)

    def combiner(self, word, counts):
        yield (word, sum(counts))

    def reducer(self, word, counts):
        yield (word, sum(counts))


if __name__ == '__main__':
    MRWordFreqCount.run()
Run Code Online (Sandbox Code Playgroud)

Bek*_*eka 4

在所有选项中,唯一真正有效的方法是将 stderr 与直接写入 ( sys.stderr.write) 一起使用,或使用带有 StreamHandler 的记录器到 stderr。

作业完成(成功或有错误)后,可以稍后从以下位置检索日志:

[s3_log_uri]/[jobflow-id]/task-attempts/[job-id]/[attempt-id]/stderr

请务必将日志保留在您的runners.emr.cleanup配置中。