Rob*_*att 4 apache-spark spark-streaming dstream pyspark
我正在通过 PySpark 探索 Spark Streaming,当我尝试将transform函数与take.
我可以成功地使用sortBy过DStream孔transform和pprint结果。
author_counts_sorted_dstream = author_counts_dstream.transform\
(lambda foo:foo\
.sortBy(lambda x:x[0].lower())\
.sortBy(lambda x:x[1],ascending=False))
author_counts_sorted_dstream.pprint()
Run Code Online (Sandbox Code Playgroud)
但是如果我使用take以下相同的模式并尝试pprint它:
top_five = author_counts_sorted_dstream.transform\
(lambda rdd:rdd.take(5))
top_five.pprint()
Run Code Online (Sandbox Code Playgroud)
工作失败
Run Code Online (Sandbox Code Playgroud)Py4JJavaError: An error occurred while calling o25.awaitTermination. : org.apache.spark.SparkException: An exception was raised by Python: Traceback (most recent call last): File "/usr/local/spark/python/pyspark/streaming/util.py", line 67, in call return r._jrdd AttributeError: 'list' object has no attribute '_jrdd'
您可以在此处查看笔记本中的完整代码和输出。
我究竟做错了什么?
您传递给的函数transform应该从 转换RDD为RDD。如果您使用操作,例如take,则必须将结果转换回RDD:
sc: SparkContext = ...
author_counts_sorted_dstream.transform(
lambda rdd: sc.parallelize(rdd.take(5))
)
Run Code Online (Sandbox Code Playgroud)
相比之下,RDD.sortBy使用的是转换(返回 RDD),因此不需要进一步的并行化。
附带说明以下功能:
lambda foo: foo \
.sortBy(lambda x:x[0].lower()) \
.sortBy(lambda x:x[1], ascending=False)
Run Code Online (Sandbox Code Playgroud)
没有多大意义。请记住,Spark 按 shuffle 排序,因此它不稳定。如果要按多个字段排序,则应使用复合键,例如:
lambda x: (x[0].lower(), -x[1])
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3261 次 |
| 最近记录: |