在PySpark中,如何从转换内部登录log4j

Pyr*_*rce 8 apache-spark pyspark

我希望在转换过程中记录执行程序内的标准记录器,并保持日志级别和格式.不幸的是,我无法访问方法中的log4j logger对象,因为它不可序列化,并且转换中的spark上下文不可用.我可以在变换之外记录我要触摸的所有对象,但这并不能帮助调试或监视代码执行.

def slow_row_contents_fetch(row):
    rows = fetch_id_row_contents(row) # API fetch, DB fetch, etc
    # This shows up, but not controllable by log level
    print "Processed slow row with {} results".format(len(rows))
    return rows

sc.parallelize(fetchable_ids).flatMap(slow_row_contents_fetch, True)
Run Code Online (Sandbox Code Playgroud)

在转换之外,我可以通过以下方式获取记录器:

logger = sc._jvm.org.apache.log4j.LogManager.getRootLogger()
logger.warn('This will show up as expected')
Run Code Online (Sandbox Code Playgroud)

但是sc,由于充分的理由,在转型中无法获得.如果您尝试直接在转换中调用sc,则会看到以下消息:

例外:您似乎尝试从广播变量,操作或转换引用SparkContext.SparkContext只能在驱动程序上使用,而不能在工作程序上运行的代码中使用.有关更多信息,请参阅SPARK-5063.

我可以打印,但这不容易过滤,只是作为未格式化的错误消息跟踪到log4j记录器.

在转换函数中调用记录器时,序列化记录器本身(例外)会失败:

...
File "/usr/lib/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
  File "/usr/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
  File "/usr/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 36, in deco
  File "/usr/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 304, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling o36.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
        at py4j.Gateway.invoke(Gateway.java:252)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:207)
        at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

有没有办法在pyspark转换过程中访问执行程序记录器?

Pyr*_*rce 7

经过几个小时的挖掘火花库后,目前似乎无法实现.执行程序实际上并没有附加到它的jvm实例,数据只是通过套接字流传输而没有使用jvm本机绑定.

这是将错误消息流式传输到stderr的worker创建代码:

private def createSimpleWorker(): Socket = {
  ...
  val worker = pb.start()

  // Redirect worker stdout and stderr
  redirectStreamsToStderr(worker.getInputStream, worker.getErrorStream)

  ...
}

/**
 * Redirect the given streams to our stderr in separate threads.
 */
private def redirectStreamsToStderr(stdout: InputStream, stderr: InputStream) {
  try {
    new RedirectThread(stdout, System.err, "stdout reader for " + pythonExec).start()
    new RedirectThread(stderr, System.err, "stderr reader for " + pythonExec).start()
  } catch {
    case e: Exception =>
      logError("Exception in redirecting streams", e)
  }
}
Run Code Online (Sandbox Code Playgroud)

这是用于传递作业处理的worker.py代码.没有地方可以发出日志消息或指示日志事件的消息类型.

try:
    ...
    command = pickleSer._read_with_length(infile)
    if isinstance(command, Broadcast):
        command = pickleSer.loads(command.value)
    func, profiler, deserializer, serializer = command
    init_time = time.time()

    def process():
        iterator = deserializer.load_stream(infile)
        serializer.dump_stream(func(split_index, iterator), outfile)

    if profiler:
        profiler.profile(process)
    else:
        process()
except Exception:
    try:
        write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)
        write_with_length(traceback.format_exc().encode("utf-8"), outfile)
    except IOError:
        # JVM close the socket
        pass
    except Exception:
        # Write the error to stderr if it happened while serializing
        print("PySpark worker failed with exception:", file=sys.stderr)
        print(traceback.format_exc(), file=sys.stderr)
    exit(-1)
finish_time = time.time()
report_times(outfile, boot_time, init_time, finish_time)
write_long(shuffle.MemoryBytesSpilled, outfile)
write_long(shuffle.DiskBytesSpilled, outfile)

# Mark the beginning of the accumulators section of the output
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
write_int(len(_accumulatorRegistry), outfile)
for (aid, accum) in _accumulatorRegistry.items():
    pickleSer._write_with_length((aid, accum._value), outfile)
...
Run Code Online (Sandbox Code Playgroud)

最后是可用的消息类型:

class SpecialLengths(object):
    END_OF_DATA_SECTION = -1
    PYTHON_EXCEPTION_THROWN = -2
    TIMING_DATA = -3
    END_OF_STREAM = -4
    NULL = -5
Run Code Online (Sandbox Code Playgroud)