小编Igo*_*lov的帖子

我可以在常规Spark映射操作中使用Spark DataFrame吗?

我试图从常规的Spark映射操作中使用在Spark DataFrame之前定义的内容,如下所示:

businessJSON = os.path.join(targetDir, 'business.json')
businessDF = sqlContext.read.json(businessJSON)

reviewsJSON = os.path.join(targetDir, 'review.json')
reviewsDF = sqlContext.read.json(reviewsJSON)

contains = udf(lambda xs, val: val in xs, BooleanType())

def selectReviews(category):
    businessesByCategory = businessDF[contains(businessDF.categories, lit(category))]
    selectedReviewsDF = reviewsDF.join(businessesByCategory,\
                                   businessesByCategory.business_id == reviewsDF.business_id)      
    return selectedReviewsDF.select("text").map(lambda x: x.text)

categories = ['category1', 'category2'] 
rdd = (sc.parallelize(cuisines)
       .map(lambda c: (c, selectReviews(c)))
       )

rdd.take(1)
Run Code Online (Sandbox Code Playgroud)

而且我收到一条巨大的错误消息:

Py4JError                                 Traceback (most recent call last)
<ipython-input-346-051af5183a76> in <module>()
     12        )
     13 
---> 14 rdd.take(1)

/usr/local/Cellar/apache-spark/1.4.1/libexec/python/pyspark/rdd.pyc in take(self, num)
   1275 
   1276             p = range(partsScanned, min(partsScanned …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark pyspark-sql

2
推荐指数
1
解决办法
1996
查看次数