小编Ahm*_*man的帖子

将pyspark数据帧转换为pandas数据帧

我有 pyspark 数据框,其维度为 (28002528,21) 并尝试使用以下代码行将其转换为 Pandas 数据框:

pd_df=spark_df.toPandas()
Run Code Online (Sandbox Code Playgroud)

我收到此错误:

第一部分

Py4JJavaError: An error occurred while calling o170.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 39.0 failed 1 times, most recent failure: Lost task 3.0 in stage 39.0 (TID 89, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:3236)
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
    at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:220)
    at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:173)
    at java.io.DataOutputStream.write(DataOutputStream.java:107)
    at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:552)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:256)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at …
Run Code Online (Sandbox Code Playgroud)

pandas pyspark apache-spark-2.3

5
推荐指数
1
解决办法
5791
查看次数

将时间戳舍入到最接近的 30 秒

我在 a 中有一个列,DF它包含timestamp格式(yyyy-mm-dd HH:mm:ss)。我需要四舍五入timestamp到最接近的 30 秒。

old column                   desired column
2016-02-09 19:31:02          2016-02-09 19:31:00  
2016-02-09 19:31:35          2016-02-09 19:31:30
2016-02-09 19:31:52          2016-02-09 19:32:00
2016-02-09 19:31:28          2016-02-09 19:31:30
Run Code Online (Sandbox Code Playgroud)

可以在 Pyspark 中做到这一点吗?

python timestamp unix-timestamp pyspark

3
推荐指数
1
解决办法
1990
查看次数

Pyspark:将列从字符串类型转换为时间戳类型

我一直在使用pyspark 2.3。我的数据框包含日期时间值的字符串格式的“TIME”列。该列如下所示:

+---------------+
|           TIME|
+---------------+
| 2016/04/14 190|
| 2016/04/15 180|
|2016/04/14 1530|
|2016/04/16 1530|
| 2016/04/17 160|
+---------------+
Run Code Online (Sandbox Code Playgroud)

其中前两位数字 190代表1530 小时,其余数字代表分钟。我尝试使用以下行将其转换为时间戳类型:

df.withColumn('TIME_timestamp',fn.unix_timestamp('TIME','yyyy/MM/dd HHMM').cast(TimestampType()))
Run Code Online (Sandbox Code Playgroud)

并且 :

df.withColumn('TIME_timestamp', fn.to_timestamp("TIME", 'yyyy/MM/dd HHMM'))
Run Code Online (Sandbox Code Playgroud)

但结果是:

+---------------+-------------------+
|           TIME|     TIME_timestamp|
+---------------+-------------------+
| 2016/04/14 190|               null|
| 2016/04/15 180|               null|
|2016/04/14 1530|               null|
|2016/04/16 1530|               null|
| 2016/04/17 160|               null|
+---------------+-------------------+
Run Code Online (Sandbox Code Playgroud)

所以所需的数据框应该如下所示:

+---------------+
| TIME_timestamp|
+---------------+
| 16-04-15 19:00|
| 16-04-15 18:00|
| 16-04-15 15:30|
| 16-04-15 15:30|
| 16-04-15 16:00|
+---------------+
Run Code Online (Sandbox Code Playgroud)

casting unix-timestamp pyspark

3
推荐指数
1
解决办法
1万
查看次数

如何将多个 .parquet 文件从多个目录读入单个 Pandas 数据帧?

我需要从多个目录中读取镶木地板文件。

例如,

 Dir---
          |
           ----dir1---
                      |
                       .parquet
                       .parquet
          |
           ----dir2---
                      |
                       .parquet
                       .parquet
                       .parquet
Run Code Online (Sandbox Code Playgroud)

有没有办法将这些文件读取到单个熊猫数据框?

注意:所有镶木地板文件都是使用 pyspark 生成的。

pandas parquet

3
推荐指数
1
解决办法
2909
查看次数

Spark __getnewargs__错误...方法或([类java.lang.String])不存在

我正在尝试将列添加到DataFrame中,具体取决于列值是否在另一列中,如下所示:

df=df.withColumn('new_column',when(df['color']=='blue'|df['color']=='green','A').otherwise('WD'))
Run Code Online (Sandbox Code Playgroud)

运行代码后,我得到以下错误:

Py4JError: An error occurred while calling o59.or. Trace:
py4j.Py4JException: Method or([class java.lang.String]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:274)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)
Run Code Online (Sandbox Code Playgroud)

我该怎么做才能克服这个问题?我正在使用PySpark 2.3.0

apache-spark pyspark-sql

2
推荐指数
1
解决办法
3130
查看次数

有没有办法像 Pyspark 那样将大熊猫数据保存在多个(parquet/csv)文件中?

我有大的 pandas 数据框,我需要将其保存到多个(parquet/csv)文件中以减少文件的体积空间。

我可以通过将数据帧划分为多个数据帧并单独保存每个数据帧来划分它

有没有一种方法可以直接做到这一点?

csv pandas parquet

2
推荐指数
1
解决办法
2489
查看次数