从PySpark向Redis写入数据

kam*_*nga 4 python apache-spark pyspark

在Scala中,我们会像这样写一个RDD给Redis:

datardd.foreachPartition(iter => {
      val r = new RedisClient("hosturl", 6379)
      iter.foreach(i => {
        val (str, it) = i
        val map = it.toMap
        r.hmset(str, map)
      })
    })
Run Code Online (Sandbox Code Playgroud)

我尝试在PySpark中这样做:datardd.foreachPartition(storeToRedis),其中function storeToRedis定义为:

def storeToRedis(x):
    r = redis.StrictRedis(host = 'hosturl', port = 6379)
    for i in x:
        r.set(i[0], dict(i[1]))
Run Code Online (Sandbox Code Playgroud)

它给了我这个:

ImportError:('没有名为redis的模块',函数subimport在0x47879b0,('redis',))

当然,我已经进口了redis.

kam*_*nga 6

PySpark的SparkContext有一个addPyFile专门用于此事的方法.将redis模块设为zip文件(如下所示)并调用此方法:

sc = SparkContext(appName = "analyze")
sc.addPyFile("/path/to/redis.zip")
Run Code Online (Sandbox Code Playgroud)