这是火花流或内存泄漏的错误吗?

Kra*_* Li 8 memory memory-leaks apache-spark apache-spark-sql

我将我的代码提交给Spark独立集群.提交命令如下:

nohup ./bin/spark-submit  \  
--master spark://ES01:7077 \
--executor-memory 4G \
--num-executors 1 \
--total-executor-cores 1 \
--conf "spark.storage.memoryFraction=0.2"  \
./myCode.py 1>a.log 2>b.log &
Run Code Online (Sandbox Code Playgroud)

我在上面的命令中指定执行程序使用4G内存.但是使用top命令监视执行程序进程,我注意到内存使用量不断增长.现在top命令输出如下:

  PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND                                                                                                                                                    
12578 root      20   0 20.223g 5.790g  23856 S  61.5 37.3  20:49.36 java       
Run Code Online (Sandbox Code Playgroud)

我的总内存为16G,因此37.3%已经超过我指定的4GB.它仍在增长.

使用ps命令,可以知道它是执行程序进程.

[root@ES01 ~]# ps -awx | grep spark | grep java
10409 ?        Sl     1:43 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms4G -Xmx4G -XX:MaxPermSize=256m org.apache.spark.deploy.master.Master --ip ES01 --port 7077 --webui-port 8080
10603 ?        Sl     6:16 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms4G -Xmx4G -XX:MaxPermSize=256m org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://ES01:7077
12420 ?        Sl    10:16 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms1g -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.SparkSubmit --master spark://ES01:7077 --conf spark.storage.memoryFraction=0.2 --executor-memory 4G --num-executors 1 --total-executor-cores 1 /opt/flowSpark/sparkStream/ForAsk01.py
12578 ?        Sl    21:03 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms4096M -Xmx4096M -Dspark.driver.port=52931 -XX:MaxPermSize=256m org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@10.79.148.184:52931 --executor-id 0 --hostname 10.79.148.184 --cores 1 --app-id app-20160511080701-0013 --worker-url spark://Worker@10.79.148.184:52660
Run Code Online (Sandbox Code Playgroud)

以下是代码.它非常简单,所以我认为没有内存泄漏

if __name__ == "__main__":

    dataDirectory = '/stream/raw'

    sc = SparkContext(appName="Netflow")
    ssc = StreamingContext(sc, 20)

    # Read CSV File
    lines = ssc.textFileStream(dataDirectory)

    lines.foreachRDD(process)

    ssc.start()
    ssc.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

过程功能的代码如下.请注意我在这里使用的是HiveContext而不是SqlContext.因为SqlContext不支持窗口函数

def getSqlContextInstance(sparkContext):
    if ('sqlContextSingletonInstance' not in globals()):
        globals()['sqlContextSingletonInstance'] = HiveContext(sparkContext)
    return globals()['sqlContextSingletonInstance']

def process(time, rdd):

    if rdd.isEmpty():
        return sc.emptyRDD()

    sqlContext = getSqlContextInstance(rdd.context)

    # Convert CSV File to Dataframe
    parts = rdd.map(lambda l: l.split(","))
    rowRdd = parts.map(lambda p: Row(router=p[0], interface=int(p[1]), flow_direction=p[9], bits=int(p[11])))
    dataframe = sqlContext.createDataFrame(rowRdd)

    # Get the top 2 interface of each router
    dataframe = dataframe.groupBy(['router','interface']).agg(func.sum('bits').alias('bits'))
    windowSpec = Window.partitionBy(dataframe['router']).orderBy(dataframe['bits'].desc())
    rank = func.dense_rank().over(windowSpec)
    ret = dataframe.select(dataframe['router'],dataframe['interface'],dataframe['bits'], rank.alias('rank')).filter("rank<=2")

    ret.show()
    dataframe.show()
Run Code Online (Sandbox Code Playgroud)

其实我发现下面的代码会导致问题:

    # Get the top 2 interface of each router
    dataframe = dataframe.groupBy(['router','interface']).agg(func.sum('bits').alias('bits'))
    windowSpec = Window.partitionBy(dataframe['router']).orderBy(dataframe['bits'].desc())
    rank = func.dense_rank().over(windowSpec)
    ret = dataframe.select(dataframe['router'],dataframe['interface'],dataframe['bits'], rank.alias('rank')).filter("rank<=2")
    ret.show()
Run Code Online (Sandbox Code Playgroud)

因为如果我删除这5行.代码可以整晚运行而不会显示内存增加.但添加它们将导致执行程序的内存使用量增长到非常高的数量.

基本上上面的代码只是SparkSQL中的一些窗口+组合.这是一个错误吗?

Roy*_*eIX 0

正如我在你的 5 行中看到的,也许这groupBy就是问题所在,你会尝试使用reduceBy,看看它的表现如何。

请参阅此处此处