Pyspark - TypeError:使用reduceByKey计算平均值时“float”对象不可下标

fit*_*ida 1 python apache-spark pyspark

我的“asdasd.csv”文件具有以下结构。

 Index,Arrival_Time,Creation_Time,x,y,z,User,Model,Device,gt
0,1424696633908,1424696631913248572,-5.958191,0.6880646,8.135345,a,nexus4,nexus4_1,stand
1,1424696633909,1424696631918283972,-5.95224,0.6702118,8.136536,a,nexus4,nexus4_1,stand
2,1424696633918,1424696631923288855,-5.9950867,0.6535491999999999,8.204376,a,nexus4,nexus4_1,stand
3,1424696633919,1424696631928385290,-5.9427185,0.6761626999999999,8.128204,a,nexus4,nexus4_1,stand
Run Code Online (Sandbox Code Playgroud)

好的,我得到以下 {key,value} 元组来对其进行操作。

#                                 x           y        z
[(('a', 'nexus4', 'stand'), ((-5.958191, 0.6880646, 8.135345)))]
#           part A (key)               part B (value) 
Run Code Online (Sandbox Code Playgroud)

我计算平均值的代码如下,我必须计算每个键的每列 X、YZ 的平均值。

rdd_ori = sc.textFile("asdasd.csv") \
        .map(lambda x: ((x.split(",")[6], x.split(",")[7], x.split(",")[9]),(float(x.split(",")[3]),float(x.split(",")[4]),float(x.split(",")[5]))))

meanRDD = rdd_ori.mapValues(lambda x: (x,1)) \
            .reduceByKey(lambda a, b: (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1]))\
            .mapValues(lambda a : (a[0]/a[3], a[1]/a[3],a[2]/a[3]))
Run Code Online (Sandbox Code Playgroud)

我的问题是我尝试了该代码,它在其他 PC 上运行良好,并且与我用于开发它的相同 MV (PySpark Py3)

这是一个例子,这段代码是正确的:

在此输入图像描述

但我不知道为什么会收到此错误,重要的部分在Strong中。

-------------------------------------------------- ------------------------- Py4JJavaError Traceback (最近一次调用) in () 9 #sum_1 = count_.reduceByKey(lambda x, y: ( x[0][0]+y[0][0],x 0 +y 0 ,x[0][2]+y[0][2])) 10 ---> 11 print(meanRDD.take (1))

/opt/spark/current/python/pyspark/rdd.py 在 take(self, num) 1341
1342 p = range(partsScanned, min(partsScanned + numPartsToTry,totalParts)) -> 1343 res = self.context.runJob(self , takeUpToNumLeft, p) 1344 1345 项 += res

runJob 中的/opt/spark/current/python/pyspark/context.py(self、rdd、partitionFunc、partitions、allowLocal) 990 # SparkContext#runJob。第991章 992、第993章994

/opt/spark/current/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py调用(self,*args) 1131 答案 = self.gateway_client.send_command(命令) 1132 return_value = get_return_value( -> 1133 答案,self.gateway_client,self.target_id,self.name) 1134 1135 对于 temp_args 中的 temp_arg:

/opt/spark/current/python/pyspark/sql/utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 尝试: ---> 63 return f(* a, **kw) 64 除了 py4j.protocol.Py4JJavaError 为 e: 65 s = e.java_exception.toString()

/opt/spark/current/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 317 raise Py4JJavaError( 318 "调用 {0 时发生错误}{1}{2}.\n"。 --> 319 format(target_id, ".", name), value) 320 else: 321 raise Py4JError(

Py4JJavaError:调用 z:org.apache.spark.api.python.PythonRDD.runJob 时发生错误。:org.apache.spark.SparkException:由于阶段失败而中止作业:阶段127.0中的任务0失败1次,最近一次失败:阶段127.0中丢失任务0.0(TID 102,本地主机,执行器驱动程序):org.apache.spark .api.python.PythonException:回溯(最近一次调用):文件“/opt/spark/current/python/lib/pyspark.zip/pyspark/worker.py”,第177行,在主进程()文件“/ opt/spark/current/python/lib/pyspark.zip/pyspark/worker.py”,第 172 行,进程中 serializer.dump_stream(func(split_index, iterator), outfile) 文件“/opt/spark/current/python/ pyspark/rdd.py”,第 2423 行,在 pipeline_func return func(split, prev_func(split, iterator)) 文件“/opt/spark/current/python/pyspark/rdd.py”,第 2423 行,在 pipeline_func return func( split, prev_func(split, iterator)) 文件“/opt/spark/current/python/pyspark/rdd.py”,第 346 行,在 func 返回 f(iterator) 文件“/opt/spark/current/python/pyspark/ rdd.py”,第 1842 行,在 mergeLocally merge.mergeValues(iterator) 文件“/opt/spark/current/python/lib/pyspark.zip/pyspark/shuffle.py”,第 238 行,在 mergeValues d[k] = Comb(d[k], v) if k in d else Creator(v) File "", line 3, in TypeError: 'float' 对象不可下标

Ram*_*jan 6

工作原理如下reduceByKey。我以您的示例为例进行说明,即使用您传递给的以下数据reduceByKey

#                                 x           y        z
[(('a', 'nexus4', 'stand'), ((-5.958191, 0.6880646, 8.135345), 1))]
#           part A (key)               part B (value)       counter
Run Code Online (Sandbox Code Playgroud)

让我一步一步走

执行以下mapValues功能后

rdd_ori.mapValues(lambda x: (x,1))
Run Code Online (Sandbox Code Playgroud)

rdd数据将如下所示

((u'a', u'nexus4', u'stand'), ((-5.9427185, 0.6761626999999999, 8.128204), 1))
((u'a', u'nexus4', u'stand'), ((-5.958191, 0.6880646, 8.135345), 1))
((u'a', u'nexus4', u'stand'), ((-5.95224, 0.6702118, 8.136536), 1))
((u'a', u'nexus4', u'stand'), ((-5.9950867, 0.6535491999999999, 8.204376), 1))
Run Code Online (Sandbox Code Playgroud)

所以什么时候reduceByKey被调用为

.reduceByKey(lambda a, b: (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1]))
Run Code Online (Sandbox Code Playgroud)

并且具有相同键的所有行都被分组,并且值被传递给lambda的函数reducyByKey

由于在您的情况下,所有键都是相同的,因此值将在以下迭代中传递给a和变量。b

在第一次迭代中,ais((-5.9427185, 0.6761626999999999, 8.128204), 1)bis((-5.958191, 0.6880646, 8.135345), 1)所以计算部分(a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1])是正确的并且通过。

在第二次迭代中,a其输出(a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1])(-11.910430999999999, 1.3582764, 16.271881, 2)

因此,如果您查看数据的格式,就会a[0][0]发现a. 你可以得到a[0], a[1].. 等等。这就是问题所在。这也是错误消息所暗示的内容

类型错误:“float”对象不可下标

解决方案是格式化数据,以便您可以访问,a如果a[0][0]您格式化reduceByKey以下格式,则可以完成此操作。

.reduceByKey(lambda a, b: ((a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2]), a[1] + b[1]))
Run Code Online (Sandbox Code Playgroud)

但这会给你的最后一个mapValues功能带来麻烦

.mapValues(lambda a : (a[0]/a[3], a[1]/a[3],a[2]/a[3]))
Run Code Online (Sandbox Code Playgroud)

因为你的值,a在 lambda 函数中,都是((-23.848236199999995, 2.6879882999999998, 32.604461), 4)如此的a[0]手段(-23.848236199999995, 2.6879882999999998, 32.604461)a[1]手段4,并且没有更多的,所以你会遇到

IndexError:元组索引超出范围

所以你的最后一个mapValues应该是

.mapValues(lambda a : (a[0][0]/a[1], a[0][1]/a[1],a[0][2]/a[1]))
Run Code Online (Sandbox Code Playgroud)

所以总的来说,以下代码应该适合您

rdd_ori = sc.textFile("asdasd.csv") \
    .map(lambda x: ((x.split(",")[6], x.split(",")[7], x.split(",")[9]),(float(x.split(",")[3]),float(x.split(",")[4]),float(x.split(",")[5]))))

meanRDD = rdd_ori.mapValues(lambda x: (x, 1)) \
    .reduceByKey(lambda a, b: ((a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2]), a[1] + b[1]))\
    .mapValues(lambda a : (a[0][0]/a[1], a[0][1]/a[1],a[0][2]/a[1]))
Run Code Online (Sandbox Code Playgroud)

我希望我已经解释得足够好了。