我试图从常规的Spark映射操作中使用在Spark DataFrame之前定义的内容,如下所示:
businessJSON = os.path.join(targetDir, 'business.json')
businessDF = sqlContext.read.json(businessJSON)
reviewsJSON = os.path.join(targetDir, 'review.json')
reviewsDF = sqlContext.read.json(reviewsJSON)
contains = udf(lambda xs, val: val in xs, BooleanType())
def selectReviews(category):
businessesByCategory = businessDF[contains(businessDF.categories, lit(category))]
selectedReviewsDF = reviewsDF.join(businessesByCategory,\
businessesByCategory.business_id == reviewsDF.business_id)
return selectedReviewsDF.select("text").map(lambda x: x.text)
categories = ['category1', 'category2']
rdd = (sc.parallelize(cuisines)
.map(lambda c: (c, selectReviews(c)))
)
rdd.take(1)
Run Code Online (Sandbox Code Playgroud)
而且我收到一条巨大的错误消息:
Py4JError Traceback (most recent call last)
<ipython-input-346-051af5183a76> in <module>()
12 )
13
---> 14 rdd.take(1)
/usr/local/Cellar/apache-spark/1.4.1/libexec/python/pyspark/rdd.pyc in take(self, num)
1275
1276 p = range(partsScanned, min(partsScanned …Run Code Online (Sandbox Code Playgroud)