Spark:如何向Spark foreach函数发送参数

mvm*_*mvm 2 redis apache-spark pyspark

我试图使用以下代码将Spark RDD的内容保存到Redis

import redis
class RedisStorageAdapter(BaseStorageAdapter):
    @staticmethod
    def save(record):

        ###--- How do I get action_name ---- ###
        redis_key = #<self.source_action_name>
        redis_host=settings['REDIS']['HOST']
        redis_port=settings['REDIS']['PORT']
        redis_db=settings['REDIS']['DB']

        redis_client = redis.StrictRedis(redis_host, redis_port, redis_db)
        redis_client.sadd(redis_key, record)

    def store_output(self, results_rdd):

        print self.source_action_name
        results_rdd.foreach(RedisStorageAdapter.save)
Run Code Online (Sandbox Code Playgroud)

但我想根据self.source_action_name初始化的内容(在BaseStorageAdapter中)使Redis Key不同

如何将source_action_name传递给RedisStorageAdapter.save函数?foreach函数只允许执行函数的名称而不允许参数列表

另外 - 如果有更好的方法将数据从RDD移动到Redis,请告诉我

pze*_*vic 5

当然,foreach需要一个函数,而不是函数名.所以你可以传递一个lambda函数:

results_rdd.foreach(lambda x: RedisStorageAdapter.save(x, self.source_action_name))
Run Code Online (Sandbox Code Playgroud)