Yu *_*hen 5 python apache-spark pyspark
我开始使用 pyspark 学习 Spark 并想知道以下日志消息的含义是什么?
用户警告:请安装 psutil 以获得更好的溢出支持
导致溢出的操作是join两个 RDD 之间的操作:
print(user_types.join(user_genres).collect())
Run Code Online (Sandbox Code Playgroud)
这听起来可能有些明显,但我的第一个问题是
我确实安装了psutil,警告消失了,但我想了解到底发生了什么。这里有一个非常相似的问题,但 OP 主要询问如何安装psutil。
这里的溢出意味着将内存中的数据帧写入磁盘,这会降低 pyspark 的性能,因为写入磁盘的速度很慢。
检查节点的已用内存。
这是来自此处的pyspark 源代码 shuffle.py 的原始片段,它会引发警告。下面的代码定义了一个函数,用于在 psutil 存在或系统是 linux 时获取已用内存。
try:
import psutil
def get_used_memory():
""" Return the used memory in MB """
process = psutil.Process(os.getpid())
if hasattr(process, "memory_info"):
info = process.memory_info()
else:
info = process.get_memory_info()
return info.rss >> 20
except ImportError:
def get_used_memory():
""" Return the used memory in MB """
if platform.system() == 'Linux':
for line in open('/proc/self/status'):
if line.startswith('VmRSS:'):
return int(line.split()[1]) >> 10
else:
warnings.warn("Please install psutil to have better "
"support with spilling")
if platform.system() == "Darwin":
import resource
rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
return rss >> 20
# TODO: support windows
return 0
Run Code Online (Sandbox Code Playgroud)
如果节点的已用内存大于预设限制,则下面的代码调用将数据帧写入磁盘。
def mergeCombiners(self, iterator, check=True):
""" Merge (K,V) pair by mergeCombiner """
iterator = iter(iterator)
# speedup attribute lookup
d, comb, batch = self.data, self.agg.mergeCombiners, self.batch
c = 0
for k, v in iterator:
d[k] = comb(d[k], v) if k in d else v
if not check:
continue
c += 1
if c % batch == 0 and get_used_memory() > self.memory_limit:
self._spill()
self._partitioned_mergeCombiners(iterator, self._next_limit())
break
Run Code Online (Sandbox Code Playgroud)
如果使用的内存大于预设限制,则此代码实际上写入 aka将数据帧溢出到磁盘。
def _spill(self):
"""
dump already partitioned data into disks.
It will dump the data in batch for better performance.
"""
global MemoryBytesSpilled, DiskBytesSpilled
path = self._get_spill_dir(self.spills)
if not os.path.exists(path):
os.makedirs(path)
used_memory = get_used_memory()
if not self.pdata:
# The data has not been partitioned, it will iterator the
# dataset once, write them into different files, has no
# additional memory. It only called when the memory goes
# above limit at the first time.
# open all the files for writing
streams = [open(os.path.join(path, str(i)), 'w')
for i in range(self.partitions)]
for k, v in self.data.iteritems():
h = self._partition(k)
# put one item in batch, make it compatitable with load_stream
# it will increase the memory if dump them in batch
self.serializer.dump_stream([(k, v)], streams[h])
for s in streams:
DiskBytesSpilled += s.tell()
s.close()
self.data.clear()
self.pdata = [{} for i in range(self.partitions)]
else:
for i in range(self.partitions):
p = os.path.join(path, str(i))
with open(p, "w") as f:
# dump items in batch
self.serializer.dump_stream(self.pdata[i].iteritems(), f)
self.pdata[i].clear()
DiskBytesSpilled += os.path.getsize(p)
self.spills += 1
gc.collect() # release the memory as much as possible
MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
Run Code Online (Sandbox Code Playgroud)