pyspark 写入失败并出现 StackOverflowError

San*_*ale 2 fixed-width apache-spark parquet pyspark

我计划在 AWS Glue 中将固定宽度转换为 Parquet,我的数据大约有 1600 列和大约 3000 行。似乎当我尝试编写 Spark 数据框(镶木地板)时,我遇到了“StackOverflow”问题。
即使我执行 count()、show() 等操作,也会出现问题。我尝试调用 cache()、repartition() 但仍然看到此错误。

如果我将列数减少到 500,代码就可以工作。

请帮忙

下面是我的代码

    data_df = spark.read.text(input_path) 

    schema_df = pd.read_json(schema_path)
    df = data_df

    for r in schema_df.itertuples():
        df = df.withColumn(
            str(r.name), df.value.substr(int(r.start), int(r.length))
        )
    df = df.drop("value")

    df.write.mode("overwrite").option("compression", "gzip").parquet(output_path) # FAILING HERE
Run Code Online (Sandbox Code Playgroud)

下面的堆栈跟踪。

> 
2021-11-10 05:00:13,542 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(70)): Error from Python:Traceback (most recent call last):
  File "/tmp/conv_fw_2_pq.py", line 148, in <module>
    partition_ts=parsed_args.partition_timestamp,
  File "/tmp/conv_fw_2_pq.py", line 125, in process_file
    df.write.mode("overwrite").option("compression", "gzip").parquet(output_path)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 839, in parquet
    self._jwrite.parquet(path)
  File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
**py4j.protocol.Py4JJavaError: An error occurred while calling o7066.parquet.
: java.lang.StackOverflowError**
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.immutable.List.flatMap(List.scala:355)
    at org.apache.spark.sql.catalyst.expressions.Expression.references(Expression.scala:88)
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$references$1.apply(Expression.scala:88)
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$references$1.apply(Expression.scala:88)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.immutable.List.flatMap(List.scala:355)
    at org.apache.spark.sql.catalyst.expressions.Expression.references(Expression.scala:88)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$references$1.apply(QueryPlan.scala:45)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$references$1.apply(QueryPlan.scala:45)
    at scala.collection.immutable.Stream$$anonfun$flatMap$1.apply(Stream.scala:497)
    at scala.collection.immutable.Stream$$anonfun$flatMap$1.apply(Stream.scala:497)
Run Code Online (Sandbox Code Playgroud)

过过招*_*过过招 7

Spark官方文档有如下描述: 这个方法( withColumn)内部引入了一个投影。因此,多次调用它(例如,通过循环来添加多个列)可能会生成大计划,这可能会导致性能问题,甚至StackOverflowException。为了避免这种情况,请**select()**同时使用多个列。

建议您首先构建选择列表,然后使用 select 方法构建新的数据框。