我使用以下命令在本地模式下使用Spark 2.0调用Pyspark:
pyspark --executor-memory 4g --driver-memory 4g
Run Code Online (Sandbox Code Playgroud)
输入数据帧正在从tsv文件中读取,并具有580 K x 28列.我正在对数据帧进行一些操作,然后我尝试将其导出到tsv文件,我收到此错误.
df.coalesce(1).write.save("sample.tsv",format = "csv",header = 'true', delimiter = '\t')
Run Code Online (Sandbox Code Playgroud)
任何指针如何摆脱这个错误.我可以轻松显示df或计算行数.
输出数据帧为3100行,共23列
错误:
Job aborted due to stage failure: Task 0 in stage 70.0 failed 1 times, most recent failure: Lost task 0.0 in stage 70.0 (TID 1073, localhost): org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:261)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: Unable to acquire 100 bytes …Run Code Online (Sandbox Code Playgroud) 我想基于2个现有列的值向现有的dask数据帧添加一个新列,并涉及一个用于检查空值的条件语句:
DataFrame定义
import pandas as pd
import dask.dataframe as dd
df = pd.DataFrame({'x': [1, 2, 3, 4, 5], 'y': [0.2, "", 0.345, 0.40, 0.15]})
ddf = dd.from_pandas(df1, npartitions=2)
Run Code Online (Sandbox Code Playgroud)
方法-1尝试过
def funcUpdate(row):
if row['y'].isnull():
return row['y']
else:
return round((1 + row['x'])/(1+ 1/row['y']),4)
ddf = ddf.assign(z= ddf.apply(funcUpdate, axis=1 , meta = ddf))
Run Code Online (Sandbox Code Playgroud)
它给出了一个错误:
TypeError: Column assignment doesn't support type DataFrame
Run Code Online (Sandbox Code Playgroud)
方法2
ddf = ddf.assign(z = ddf.apply(lambda col: col.y if col.y.isnull() else round((1 + col.x)/(1+ 1/col.y),4),axis = 1, meta = ddf))
Run Code Online (Sandbox Code Playgroud)
知道应该怎么做吗?