PySpark数据帧操作导致OutOfMemoryError

Mar*_*nik 5 out-of-memory apache-spark pyspark databricks azure-databricks

我刚刚开始尝试pyspark / spark并遇到我的代码无法正常工作的问题。我找不到问题,spark的错误输出不是很有帮助。我确实在stackoverflow上找到了相同的问题,但没有一个明确的答案或解决方案(至少对我来说不是)。

我要运行的代码是:

import json
from datetime import datetime, timedelta

from pyspark.sql.session import SparkSession

from parse.data_reader import read_csv
from parse.interpolate import insert_time_range, create_time_range, linear_interpolate

spark = SparkSession.builder.getOrCreate()

df = None
with open('config/data_sources.json') as sources_file:
    sources = json.load(sources_file)
    for file in sources['files']:
        with open('config/mappings/{}.json'.format(file['mapping'])) as mapping:
            df_to_append = read_csv(
                spark=spark,
                file='{}{}'.format(sources['root_path'], file['name']),
                config=json.load(mapping)
            )

        if df is None:
            df = df_to_append
        else:
            df = df.union(df_to_append)

df.sort(["Timestamp", "Variable"]).show(n=5, truncate=False)

time_range = create_time_range(
    datetime(year=2019, month=7, day=1, hour=0),
    datetime(year=2019, month=7, day=8, hour=0),
    timedelta(seconds=3600)
)

df_with_intervals = insert_time_range(
    df=df,
    timestamp_column_name='Timestamp',
    variable_column_name='Variable',
    value_column_name='Value',
    time_range=time_range,
)
df_with_intervals.sort(["Timestamp", "Variable"]).show(n=5, truncate=False)
Run Code Online (Sandbox Code Playgroud)

给出以下输出:

C:\Users\mmun01\PycharmProjects\xxxx\venv\Scripts\python.exe C:/Users/mmun01/PycharmProjects/xxxx/application.py
19/09/04 13:31:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/09/04 13:31:36 WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set.
[Stage 4:=======================>                                   (2 + 3) / 5]19/09/04 13:31:52 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
View job details at https://xxxxxx.azuredatabricks.net/?o=xxxxxx#/setting/clusters/xxxxxx/sparkUi
[Stage 5:===========>                                               (1 + 4) / 5]+-----------------------+------------+-----+
|Timestamp              |Variable    |Value|
+-----------------------+------------+-----+
|2019-07-01 00:00:06.664|Load % PS DG|0.0  |
|2019-07-01 00:00:06.664|Load % SB DG|0.0  |
|2019-07-01 00:00:06.664|Power PS DG |null |
|2019-07-01 00:00:06.664|Power SB DG |null |
|2019-07-01 00:00:06.664|Power Shore |null |
+-----------------------+------------+-----+
only showing top 5 rows

Traceback (most recent call last):
  File "C:/Users/mmun01/PycharmProjects/xxxx/application.py", line 42, in <module>
    df_with_intervals.sort(["Timestamp", "Variable"]).show(n=5, truncate=False)
  File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\pyspark\sql\dataframe.py", line 381, in show
    print(self._jdf.showString(n, int(truncate), vertical))
  File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\py4j\java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\pyspark\sql\utils.py", line 63, in deco
    return f(*a, **kw)
  File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\py4j\protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o655.showString.
: java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Unknown Source)
    at java.lang.AbstractStringBuilder.ensureCapacityInternal(Unknown Source)
    at java.lang.AbstractStringBuilder.append(Unknown Source)
    at java.lang.StringBuilder.append(Unknown Source)
    at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:210)
    at com.trueaccord.scalapb.textformat.TextGenerator.maybeNewLine(TextGenerator.scala:13)
    at com.trueaccord.scalapb.textformat.TextGenerator.addNewLine(TextGenerator.scala:33)
    at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:38)
    at com.trueaccord.scalapb.textformat.Printer$.printField(Printer.scala:28)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:13)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:12)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at com.trueaccord.scalapb.textformat.Printer$.print(Printer.scala:12)
    at com.trueaccord.scalapb.textformat.Printer$.printFieldValue(Printer.scala:70)
    at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:37)
    at com.trueaccord.scalapb.textformat.Printer$.printField(Printer.scala:28)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:13)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:12)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at com.trueaccord.scalapb.textformat.Printer$.print(Printer.scala:12)
    at com.trueaccord.scalapb.textformat.Printer$.printFieldValue(Printer.scala:70)
    at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:37)
    at com.trueaccord.scalapb.textformat.Printer$.printField(Printer.scala:28)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:13)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:12)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at com.trueaccord.scalapb.textformat.Printer$.print(Printer.scala:12)
    at com.trueaccord.scalapb.textformat.Printer$.printFieldValue(Printer.scala:70)
    at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:37)


Process finished with exit code 1
Run Code Online (Sandbox Code Playgroud)

我正在使用的两个功能是:

C:\Users\mmun01\PycharmProjects\xxxx\venv\Scripts\python.exe C:/Users/mmun01/PycharmProjects/xxxx/application.py
19/09/04 13:31:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/09/04 13:31:36 WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set.
[Stage 4:=======================>                                   (2 + 3) / 5]19/09/04 13:31:52 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
View job details at https://xxxxxx.azuredatabricks.net/?o=xxxxxx#/setting/clusters/xxxxxx/sparkUi
[Stage 5:===========>                                               (1 + 4) / 5]+-----------------------+------------+-----+
|Timestamp              |Variable    |Value|
+-----------------------+------------+-----+
|2019-07-01 00:00:06.664|Load % PS DG|0.0  |
|2019-07-01 00:00:06.664|Load % SB DG|0.0  |
|2019-07-01 00:00:06.664|Power PS DG |null |
|2019-07-01 00:00:06.664|Power SB DG |null |
|2019-07-01 00:00:06.664|Power Shore |null |
+-----------------------+------------+-----+
only showing top 5 rows

Traceback (most recent call last):
  File "C:/Users/mmun01/PycharmProjects/xxxx/application.py", line 42, in <module>
    df_with_intervals.sort(["Timestamp", "Variable"]).show(n=5, truncate=False)
  File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\pyspark\sql\dataframe.py", line 381, in show
    print(self._jdf.showString(n, int(truncate), vertical))
  File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\py4j\java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\pyspark\sql\utils.py", line 63, in deco
    return f(*a, **kw)
  File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\py4j\protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o655.showString.
: java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Unknown Source)
    at java.lang.AbstractStringBuilder.ensureCapacityInternal(Unknown Source)
    at java.lang.AbstractStringBuilder.append(Unknown Source)
    at java.lang.StringBuilder.append(Unknown Source)
    at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:210)
    at com.trueaccord.scalapb.textformat.TextGenerator.maybeNewLine(TextGenerator.scala:13)
    at com.trueaccord.scalapb.textformat.TextGenerator.addNewLine(TextGenerator.scala:33)
    at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:38)
    at com.trueaccord.scalapb.textformat.Printer$.printField(Printer.scala:28)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:13)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:12)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at com.trueaccord.scalapb.textformat.Printer$.print(Printer.scala:12)
    at com.trueaccord.scalapb.textformat.Printer$.printFieldValue(Printer.scala:70)
    at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:37)
    at com.trueaccord.scalapb.textformat.Printer$.printField(Printer.scala:28)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:13)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:12)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at com.trueaccord.scalapb.textformat.Printer$.print(Printer.scala:12)
    at com.trueaccord.scalapb.textformat.Printer$.printFieldValue(Printer.scala:70)
    at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:37)
    at com.trueaccord.scalapb.textformat.Printer$.printField(Printer.scala:28)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:13)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:12)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at com.trueaccord.scalapb.textformat.Printer$.print(Printer.scala:12)
    at com.trueaccord.scalapb.textformat.Printer$.printFieldValue(Printer.scala:70)
    at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:37)


Process finished with exit code 1
Run Code Online (Sandbox Code Playgroud)

data_sources.json文件当前仅包含一个csv文件,大小为MB。是什么原因导致OutOfMemoryException或如何获取更详细的错误报告?

正如niuer所建议的,我将功能更改insert_time_range为:

def create_time_range(start_time: datetime, end_time: datetime, step_size: timedelta) -> Iterable[datetime]:
    return [start_time + step_size * n for n in range(int((end_time - start_time) / step_size))]


def insert_time_range(df: DataFrame, timestamp_column_name: str, variable_column_name: str, value_column_name: str,
                      time_range: Iterable[datetime]) -> DataFrame:
    time_range = array([lit(ts) for ts in time_range])
    df_exploded = df \
        .drop(value_column_name) \
        .drop(timestamp_column_name) \
        .distinct() \
        .withColumn(value_column_name, lit(None)) \
        .withColumn(timestamp_column_name, explode(time_range))
    return df.union(df_exploded.select([timestamp_column_name, variable_column_name, value_column_name]))
Run Code Online (Sandbox Code Playgroud)

.show()通话之前,我添加了一行print(df_with_intervals.count())以输出数字5(如预期)。但是当我尝试show()这些值时,我仍然得到相同的结果OutOfMemoryException

更新 我已将问题缩小到工会范围,但仍不清楚为什么它不起作用。我已经insert_time_range根据评论中的建议更新了该方法:

def insert_time_range(df: DataFrame, timestamp_column_name: str, variable_column_name: str, value_column_name: str,
                      time_range: Iterable[datetime]) -> DataFrame:
    time_range = array([lit(ts) for ts in time_range])
    df_exploded = df \
        .drop(value_column_name) \
        .drop(timestamp_column_name) \
        .distinct() \
        .withColumn(value_column_name, lit(None)) \
        .withColumn(timestamp_column_name, lit(time_range[0]))
    return df_exploded.select([timestamp_column_name, variable_column_name, value_column_name])
Run Code Online (Sandbox Code Playgroud)

提供以下输出:

C:\Users\mmun01\PycharmProjects\xxxx\venv\Scripts\python.exe C:/Users/mmun01/PycharmProjects/xxxx/application.py
19/09/09 23:00:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/09/09 23:00:30 WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set.
[Stage 44:==================================>                       (3 + 2) / 5]19/09/09 23:00:43 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
View job details at https://westeurope.azuredatabricks.net/?o=2202252276771286#/setting/clusters/0903-124716-art213/sparkUi
[Stage 45:===========>                                              (1 + 4) / 5]+-----------------------+------------+-----+
|Timestamp              |Variable    |Value|
+-----------------------+------------+-----+
|2019-07-01 00:00:06.664|Load % PS DG|0.0  |
|2019-07-01 00:00:06.664|Load % SB DG|0.0  |
|2019-07-01 00:00:06.664|Power PS DG |null |
|2019-07-01 00:00:06.664|Power SB DG |null |
|2019-07-01 00:00:06.664|Power Shore |null |
+-----------------------+------------+-----+
only showing top 5 rows

View job details at https://westeurope.azuredatabricks.net/?o=2202252276771286#/setting/clusters/0903-124716-art213/sparkUi
+-------------------+------------+-----+
|Timestamp          |Variable    |Value|
+-------------------+------------+-----+
|2019-06-30 22:00:00|Load % PS DG|null |
|2019-06-30 22:00:00|Power PS DG |null |
|2019-06-30 22:00:00|Power Shore |null |
|2019-06-30 22:00:00|Load % SB DG|null |
|2019-06-30 22:00:00|Power SB DG |null |
|2019-06-30 22:01:00|Load % PS DG|null |
|2019-06-30 22:01:00|Power PS DG |null |
|2019-06-30 22:01:00|Power Shore |null |
|2019-06-30 22:01:00|Load % SB DG|null |
|2019-06-30 22:01:00|Power SB DG |null |
|2019-06-30 22:02:00|Load % PS DG|null |
|2019-06-30 22:02:00|Power PS DG |null |
|2019-06-30 22:02:00|Power Shore |null |
|2019-06-30 22:02:00|Load % SB DG|null |
|2019-06-30 22:02:00|Power SB DG |null |
|2019-06-30 22:03:00|Load % PS DG|null |
|2019-06-30 22:03:00|Power PS DG |null |
|2019-06-30 22:03:00|Power Shore |null |
|2019-06-30 22:03:00|Load % SB DG|null |
|2019-06-30 22:03:00|Power SB DG |null |
+-------------------+------------+-----+
only showing top 20 rows

Traceback (most recent call last):
  File "C:/Users/mmun01/PycharmProjects/xxxx/application.py", line 46, in <module>
    df_with_intervals.sort([timestamp_column_name, variable_column_name]).show(n=5, truncate=False)
  File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\pyspark\sql\dataframe.py", line 381, in show
    print(self._jdf.showString(n, int(truncate), vertical))
  File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\py4j\java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\pyspark\sql\utils.py", line 63, in deco
    return f(*a, **kw)
  File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\py4j\protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o333.showString.
: java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Unknown Source)
    at java.lang.AbstractStringBuilder.ensureCapacityInternal(Unknown Source)
    at java.lang.AbstractStringBuilder.append(Unknown Source)
    at java.lang.StringBuilder.append(Unknown Source)
    at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:210)
    at com.trueaccord.scalapb.textformat.TextGenerator.maybeNewLine(TextGenerator.scala:13)
    at com.trueaccord.scalapb.textformat.TextGenerator.add(TextGenerator.scala:19)
    at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:33)
    at com.trueaccord.scalapb.textformat.Printer$.printField(Printer.scala:28)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:13)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:12)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at com.trueaccord.scalapb.textformat.Printer$.print(Printer.scala:12)
    at com.trueaccord.scalapb.textformat.Printer$.printFieldValue(Printer.scala:70)
    at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:37)
    at com.trueaccord.scalapb.textformat.Printer$.printField(Printer.scala:28)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:13)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:12)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at com.trueaccord.scalapb.textformat.Printer$.print(Printer.scala:12)
    at com.trueaccord.scalapb.textformat.Printer$.printFieldValue(Printer.scala:70)
    at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:37)
    at com.trueaccord.scalapb.textformat.Printer$.printField(Printer.scala:28)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:13)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:12)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at com.trueaccord.scalapb.textformat.Printer$.print(Printer.scala:12)
    at com.trueaccord.scalapb.textformat.Printer$.printFieldValue(Printer.scala:70)
    at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:37)


Process finished with exit code 1
Run Code Online (Sandbox Code Playgroud)

所以问题一定在union方法中,但是我不知道问题是什么?

更新在我的第一次尝试中,我只有一个CSV文件,config/data_sources.json因此该df = df.union(df_to_append)行从未执行过。现在,我已经添加了多个CSV文件config/data_sources.json,然后该union方法得以执行,并且再次py4j.protocol.Py4JJavaError: An error occurred while calling o2043.showString. : java.lang.OutOfMemoryError: Java heap space出现错误,但第一个错误已经发生union。我在使用此方法时出错了,或者该方法本身存在错误?

niu*_*uer 0

它可能来自explode你正在做的事情。您基本上将从 json 文件生成的所有行与 中的日期时间进行交叉连接time_range,其中有 168 个元素。我会先替换explode为,F.lit()看看它是否运行。如果仍然存在问题,我将删除union代码来尝试。