Spark DataFrame to Dict - 字典更新序列元素错误

Sha*_*kar 1 python dictionary scala apache-spark pyspark-sql

我想转换DataFrame为快译通使用collectAsMap()函数RDD

代码:

dict = df.rdd.collectAsMap()
Run Code Online (Sandbox Code Playgroud)

错误日志:

ValueError: dictionary update sequence element #0 has length 8; 2 is required
Run Code Online (Sandbox Code Playgroud)

更新:

DF 有 8 个字段,是否意味着collectAsMap()只能使用具有两个字段的 DF?

vik*_*ana 5

下面是在 pyspark 中做同样事情的解释。我同意 Ram 的解释。collectAsMap 仅适用于 pairedrdd,因此您需要先将数据帧转换为 pair rdd,然后才能使用 collectAsMap 函数将其转换为某些字典。

例如,我有一个以下数据框:

df = spark.sql("""select emp_id,emp_city from udb.temptable_1 order by emp_id""");
+------+--------+
|emp_id|emp_city|
+------+--------+
|     1|NOIDA   |
|     2|GURGAON |
|     3|DWARKA  |
|     4|SAKET   |
|     5|USA     |
|     6|UP      |
|     7|NOIDA   |
|     8|SAKET   |
|     9|GURGAON |
+------+--------+
Run Code Online (Sandbox Code Playgroud)

将其转换为键值对 rdd

newrdd = df.rdd.map(lambda x : (x[0],x))

>>> type(newrdd)
<class 'pyspark.rdd.PipelinedRDD'>

[(1, Row(emp_id=1, emp_city=u'NOIDA   ')), 
(2, Row(emp_id=2, emp_city=u'GURGAON ')), 
(3, Row(emp_id=3, emp_city=u'DWARKA  ')), 
(4, Row(emp_id=4, emp_city=u'SAKET   ')), 
(5, Row(emp_id=5, emp_city=u'USA     ')), 
(6, Row(emp_id=6, emp_city=u'UP      ')), 
(7, Row(emp_id=7, emp_city=u'NOIDA   ')), 
(8, Row(emp_id=8, emp_city=u'SAKET   ')), 
(9, Row(emp_id=9, emp_city=u'GURGAON '))]
Run Code Online (Sandbox Code Playgroud)

最后,您可以使用 collectAsMap 将您的键值对 rdd 转换为字典

dict = newrdd.collectAsMap()

{1: Row(emp_id=1, emp_city=u'NOIDA   '), 
2: Row(emp_id=2, emp_city=u'GURGAON '), 
3: Row(emp_id=3, emp_city=u'DWARKA  '), 
4: Row(emp_id=4, emp_city=u'SAKET   '), 
5: Row(emp_id=5, emp_city=u'USA     '), 
6: Row(emp_id=6, emp_city=u'UP      '), 
7: Row(emp_id=7, emp_city=u'NOIDA   '), 
8: Row(emp_id=8, emp_city=u'SAKET   '), 
9: Row(emp_id=9, emp_city=u'GURGAON ')}

>>> dict.keys()
[1, 2, 3, 4, 5, 6, 7, 8, 9]

>>> dict.get(2)
Row(emp_id=2, emp_city=u'GURGAON ')
Run Code Online (Sandbox Code Playgroud)