ant*_*nes 5 python numpy apache-spark rdd pyspark
尝试将由 numpy 数组组成的 rdd 转换为 pyspark 中的数据帧时,出现以下错误:
下面是导致这个错误的一段代码,我什至不确定我能得到错误的实际位置,甚至阅读跟踪......
有谁知道如何绕过?
非常感谢 !
In [111]: rddUser.take(5)
Out[111]:
[array([u'1008798262000292538', u'1.0', u'0.0', ..., u'0.0', u'0.0', u'1.0'],
dtype='<U32'),
array([u'102254941859441333', u'1.0', u'0.0', ..., u'0.0', u'0.0', u'1.0'],
dtype='<U32'),
array([u'1035609083097069747', u'1.0', u'0.0', ..., u'0.0', u'0.0', u'1.0'],
dtype='<U32'),
array([u'10363297284472000', u'1.0', u'0.0', ..., u'0.0', u'0.0', u'1.0'],
dtype='<U32'),
array([u'1059178934871294116', u'1.0', u'0.0', ..., u'0.0', u'0.0', u'1.0'],
dtype='<U32')]
Run Code Online (Sandbox Code Playgroud)
那么麻烦来了:
In [110]: rddUser.toDF(schema=None).show()
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-110-073037afd70e> in <module>()
----> 1 rddUser.toDF(schema=None).show()
62 [Row(name=u'Alice', age=1)]
63 """
---> 64 return sqlContext.createDataFrame(self, schema, sampleRatio)
65
66 RDD.toDF = toDF
421
422 if isinstance(data, RDD):
--> 423 rdd, schema = self._createFromRDD(data, schema, samplingRatio)
424 else:
425 rdd, schema = self._createFromLocal(data, schema)
308 """
309 if schema is None or isinstance(schema, (list, tuple)):
--> 310 struct = self._inferSchema(rdd, samplingRatio)
311 converter = _create_converter(struct)
312 rdd = rdd.map(converter)
253 """
254 first = rdd.first()
--> 255 if not first:
256 raise ValueError("The first row in RDD is empty, "
257 "can not infer schema")
ValueError: The truth value of an array with more than one element is ambiguous. Use a.any() or a.all()
Run Code Online (Sandbox Code Playgroud)
如果 RDD 被定义map为tolist
import numpy as np
rdd = spark.sparkContext.parallelize([
np.array([u'1059178934871294116', u'1.0', u'0.0', u'0.0', u'0.0', u'1.0']),
np.array([u'102254941859441333', u'1.0', u'0.0', u'0.0', u'0.0', u'1.0'])
])
df = rdd.map(lambda x: x.tolist()).toDF(["user_id"])
# +-------------------+---+---+---+---+---+
# | user_id| _2| _3| _4| _5| _6|
# +-------------------+---+---+---+---+---+
# |1059178934871294116|1.0|0.0|0.0|0.0|1.0|
# | 102254941859441333|1.0|0.0|0.0|0.0|1.0|
# +-------------------+---+---+---+---+---+
Run Code Online (Sandbox Code Playgroud)
但考虑到你的评论,我假设你想将它与ml. 那么这可能会更好:
from pyspark.ml.linalg import DenseVector
(rdd
.map(lambda x: (x[0].tolist(), DenseVector(x[1:])))
.toDF(["user_id", "features"])
.show(2, False))
# +-------------------+---------------------+
# |user_id |features |
# +-------------------+---------------------+
# |1059178934871294116|[1.0,0.0,0.0,0.0,1.0]|
# |102254941859441333 |[1.0,0.0,0.0,0.0,1.0]|
# +-------------------+---------------------+
Run Code Online (Sandbox Code Playgroud)
您还应该看看pyspark.ml.feature.OneHotEncoder。