kam*_*nga 4 python apache-spark pyspark
在Scala中,我们会像这样写一个RDD给Redis:
datardd.foreachPartition(iter => {
val r = new RedisClient("hosturl", 6379)
iter.foreach(i => {
val (str, it) = i
val map = it.toMap
r.hmset(str, map)
})
})
Run Code Online (Sandbox Code Playgroud)
我尝试在PySpark中这样做:datardd.foreachPartition(storeToRedis),其中function storeToRedis定义为:
def storeToRedis(x):
r = redis.StrictRedis(host = 'hosturl', port = 6379)
for i in x:
r.set(i[0], dict(i[1]))
Run Code Online (Sandbox Code Playgroud)
它给了我这个:
ImportError:('没有名为redis的模块',函数subimport在0x47879b0,('redis',))
当然,我已经进口了redis.
PySpark的SparkContext有一个addPyFile专门用于此事的方法.将redis模块设为zip文件(如下所示)并调用此方法:
sc = SparkContext(appName = "analyze")
sc.addPyFile("/path/to/redis.zip")
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4001 次 |
| 最近记录: |