use*_*714 7 python apache-spark pyspark
Class ProdsTransformer:
def __init__(self):
self.products_lookup_hmap = {}
self.broadcast_products_lookup_map = None
def create_broadcast_variables(self):
self.broadcast_products_lookup_map = sc.broadcast(self.products_lookup_hmap)
def create_lookup_maps(self):
// The code here builds the hashmap that maps Prod_ID to another space.
pt = ProdsTransformer ()
pt.create_broadcast_variables()
pairs = distinct_users_projected.map(lambda x: (x.user_id,
pt.broadcast_products_lookup_map.value[x.Prod_ID]))
Run Code Online (Sandbox Code Playgroud)
我收到以下错误:
"例外:您似乎尝试从广播变量,操作或转换中引用SparkContext.SparkContext只能用于驱动程序,而不能用于在工作程序上运行的代码.有关更多信息,请参阅SPARK-5063."
任何有关如何处理广播变量的帮助都会很棒!
小智 14
通过在maplambda中引用包含广播变量的对象,Spark将尝试序列化整个对象并将其发送给worker.由于该对象包含对SparkContext的引用,因此会出现错误.而不是这个:
pairs = distinct_users_projected.map(lambda x: (x.user_id, pt.broadcast_products_lookup_map.value[x.Prod_ID]))
Run Code Online (Sandbox Code Playgroud)
试试这个:
bcast = pt.broadcast_products_lookup_map
pairs = distinct_users_projected.map(lambda x: (x.user_id, bcast.value[x.Prod_ID]))
Run Code Online (Sandbox Code Playgroud)
后者避免了对object(pt)的引用,因此Spark只需要发送广播变量.
| 归档时间: |
|
| 查看次数: |
13274 次 |
| 最近记录: |