调用map后调用POSpark EOFError

Pet*_*ete 23 python apache-spark pyspark

我是spark和pyspark的新手.

我正在将一个小的csv文件(~40k)读入数据帧.

from pyspark.sql import functions as F
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('/tmp/sm.csv')
df = df.withColumn('verified', F.when(df['verified'] == 'Y', 1).otherwise(0))
df2 = df.map(lambda x: Row(label=float(x[0]), features=Vectors.dense(x[1:]))).toDF()
Run Code Online (Sandbox Code Playgroud)

我得到一些奇怪的错误,每次都不会发生,但确实经常发生

>>> df2.show(1)
+--------------------+---------+
|            features|    label|
+--------------------+---------+
|[0.0,0.0,0.0,0.0,...|4700734.0|
+--------------------+---------+
only showing top 1 row

>>> df2.count()
41999                                                                           
>>> df2.show(1)
+--------------------+---------+
|            features|    label|
+--------------------+---------+
|[0.0,0.0,0.0,0.0,...|4700734.0|
+--------------------+---------+
only showing top 1 row

>>> df2.count()
41999                                                                           
>>> df2.show(1)
Traceback (most recent call last):
  File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 157, in manager
  File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 61, in worker    
  File "spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 136, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", line 545, in read_int
    raise EOFError
EOFError
+--------------------+---------+
|            features|    label|
+--------------------+---------+
|[0.0,0.0,0.0,0.0,...|4700734.0|
+--------------------+---------+
only showing top 1 row
Run Code Online (Sandbox Code Playgroud)

一旦提出了EOFError,我就不会再看到它了,直到我做了一些需要与spark服务器交互的东西

当我调用df2.count()时,它会显示[Stage xxx]提示符,这就是我去火花服务器的意思.当我使用df2执行某些操作时,触发该操作的任何内容似乎最终都会再次出现EOFError.

df(与df2)似乎没有发生,所以看起来它必须是df.map()行发生的事情.

Man*_*pta 3

您可以尝试将 dataframe 转换为 rdd 后进行映射吗?您正在数据帧上应用地图函数,然后再次从中创建数据帧。语法如下

df.rdd.map().toDF()
Run Code Online (Sandbox Code Playgroud)

请告诉我是否有效。谢谢。