我有 pyspark 数据框,其维度为 (28002528,21) 并尝试使用以下代码行将其转换为 Pandas 数据框:
pd_df=spark_df.toPandas()
Run Code Online (Sandbox Code Playgroud)
我收到此错误:
第一部分
Py4JJavaError: An error occurred while calling o170.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 39.0 failed 1 times, most recent failure: Lost task 3.0 in stage 39.0 (TID 89, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:220)
at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:173)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:552)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:256)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at …Run Code Online (Sandbox Code Playgroud) 我在 a 中有一个列,DF它包含timestamp格式(yyyy-mm-dd HH:mm:ss)。我需要四舍五入timestamp到最接近的 30 秒。
old column desired column
2016-02-09 19:31:02 2016-02-09 19:31:00
2016-02-09 19:31:35 2016-02-09 19:31:30
2016-02-09 19:31:52 2016-02-09 19:32:00
2016-02-09 19:31:28 2016-02-09 19:31:30
Run Code Online (Sandbox Code Playgroud)
可以在 Pyspark 中做到这一点吗?
我一直在使用pyspark 2.3。我的数据框包含日期时间值的字符串格式的“TIME”列。该列如下所示:
+---------------+
| TIME|
+---------------+
| 2016/04/14 190|
| 2016/04/15 180|
|2016/04/14 1530|
|2016/04/16 1530|
| 2016/04/17 160|
+---------------+
Run Code Online (Sandbox Code Playgroud)
其中前两位数字 190代表1530 小时,其余数字代表分钟。我尝试使用以下行将其转换为时间戳类型:
df.withColumn('TIME_timestamp',fn.unix_timestamp('TIME','yyyy/MM/dd HHMM').cast(TimestampType()))
Run Code Online (Sandbox Code Playgroud)
并且 :
df.withColumn('TIME_timestamp', fn.to_timestamp("TIME", 'yyyy/MM/dd HHMM'))
Run Code Online (Sandbox Code Playgroud)
但结果是:
+---------------+-------------------+
| TIME| TIME_timestamp|
+---------------+-------------------+
| 2016/04/14 190| null|
| 2016/04/15 180| null|
|2016/04/14 1530| null|
|2016/04/16 1530| null|
| 2016/04/17 160| null|
+---------------+-------------------+
Run Code Online (Sandbox Code Playgroud)
所以所需的数据框应该如下所示:
+---------------+
| TIME_timestamp|
+---------------+
| 16-04-15 19:00|
| 16-04-15 18:00|
| 16-04-15 15:30|
| 16-04-15 15:30|
| 16-04-15 16:00|
+---------------+
Run Code Online (Sandbox Code Playgroud) 我需要从多个目录中读取镶木地板文件。
例如,
Dir---
|
----dir1---
|
.parquet
.parquet
|
----dir2---
|
.parquet
.parquet
.parquet
Run Code Online (Sandbox Code Playgroud)
有没有办法将这些文件读取到单个熊猫数据框?
注意:所有镶木地板文件都是使用 pyspark 生成的。
我正在尝试将列添加到DataFrame中,具体取决于列值是否在另一列中,如下所示:
df=df.withColumn('new_column',when(df['color']=='blue'|df['color']=='green','A').otherwise('WD'))
Run Code Online (Sandbox Code Playgroud)
运行代码后,我得到以下错误:
Py4JError: An error occurred while calling o59.or. Trace:
py4j.Py4JException: Method or([class java.lang.String]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Run Code Online (Sandbox Code Playgroud)
我该怎么做才能克服这个问题?我正在使用PySpark 2.3.0
我有大的 pandas 数据框,我需要将其保存到多个(parquet/csv)文件中以减少文件的体积空间。
我可以通过将数据帧划分为多个数据帧并单独保存每个数据帧来划分它
有没有一种方法可以直接做到这一点?
pandas ×3
pyspark ×3
parquet ×2
apache-spark ×1
casting ×1
csv ×1
pyspark-sql ×1
python ×1
timestamp ×1