Pyr*_*rce 8 apache-spark pyspark
我希望在转换过程中记录执行程序内的标准记录器,并保持日志级别和格式.不幸的是,我无法访问方法中的log4j logger对象,因为它不可序列化,并且转换中的spark上下文不可用.我可以在变换之外记录我要触摸的所有对象,但这并不能帮助调试或监视代码执行.
def slow_row_contents_fetch(row):
rows = fetch_id_row_contents(row) # API fetch, DB fetch, etc
# This shows up, but not controllable by log level
print "Processed slow row with {} results".format(len(rows))
return rows
sc.parallelize(fetchable_ids).flatMap(slow_row_contents_fetch, True)
Run Code Online (Sandbox Code Playgroud)
在转换之外,我可以通过以下方式获取记录器:
logger = sc._jvm.org.apache.log4j.LogManager.getRootLogger()
logger.warn('This will show up as expected')
Run Code Online (Sandbox Code Playgroud)
但是sc,由于充分的理由,在转型中无法获得.如果您尝试直接在转换中调用sc,则会看到以下消息:
例外:您似乎尝试从广播变量,操作或转换引用SparkContext.SparkContext只能在驱动程序上使用,而不能在工作程序上运行的代码中使用.有关更多信息,请参阅SPARK-5063.
我可以打印,但这不容易过滤,只是作为未格式化的错误消息跟踪到log4j记录器.
在转换函数中调用记录器时,序列化记录器本身(例外)会失败:
...
File "/usr/lib/python2.7/pickle.py", line 306, in save
rv = reduce(self.proto)
File "/usr/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/usr/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 36, in deco
File "/usr/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 304, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling o36.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
at py4j.Gateway.invoke(Gateway.java:252)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)
有没有办法在pyspark转换过程中访问执行程序记录器?
经过几个小时的挖掘火花库后,目前似乎无法实现.执行程序实际上并没有附加到它的jvm实例,数据只是通过套接字流传输而没有使用jvm本机绑定.
这是将错误消息流式传输到stderr的worker创建代码:
private def createSimpleWorker(): Socket = {
...
val worker = pb.start()
// Redirect worker stdout and stderr
redirectStreamsToStderr(worker.getInputStream, worker.getErrorStream)
...
}
/**
* Redirect the given streams to our stderr in separate threads.
*/
private def redirectStreamsToStderr(stdout: InputStream, stderr: InputStream) {
try {
new RedirectThread(stdout, System.err, "stdout reader for " + pythonExec).start()
new RedirectThread(stderr, System.err, "stderr reader for " + pythonExec).start()
} catch {
case e: Exception =>
logError("Exception in redirecting streams", e)
}
}
Run Code Online (Sandbox Code Playgroud)
这是用于传递作业处理的worker.py代码.没有地方可以发出日志消息或指示日志事件的消息类型.
try:
...
command = pickleSer._read_with_length(infile)
if isinstance(command, Broadcast):
command = pickleSer.loads(command.value)
func, profiler, deserializer, serializer = command
init_time = time.time()
def process():
iterator = deserializer.load_stream(infile)
serializer.dump_stream(func(split_index, iterator), outfile)
if profiler:
profiler.profile(process)
else:
process()
except Exception:
try:
write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)
write_with_length(traceback.format_exc().encode("utf-8"), outfile)
except IOError:
# JVM close the socket
pass
except Exception:
# Write the error to stderr if it happened while serializing
print("PySpark worker failed with exception:", file=sys.stderr)
print(traceback.format_exc(), file=sys.stderr)
exit(-1)
finish_time = time.time()
report_times(outfile, boot_time, init_time, finish_time)
write_long(shuffle.MemoryBytesSpilled, outfile)
write_long(shuffle.DiskBytesSpilled, outfile)
# Mark the beginning of the accumulators section of the output
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
write_int(len(_accumulatorRegistry), outfile)
for (aid, accum) in _accumulatorRegistry.items():
pickleSer._write_with_length((aid, accum._value), outfile)
...
Run Code Online (Sandbox Code Playgroud)
最后是可用的消息类型:
class SpecialLengths(object):
END_OF_DATA_SECTION = -1
PYTHON_EXCEPTION_THROWN = -2
TIMING_DATA = -3
END_OF_STREAM = -4
NULL = -5
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1920 次 |
| 最近记录: |