Sha*_*kar 1 python dictionary scala apache-spark pyspark-sql
我想转换DataFrame
为快译通使用collectAsMap()
函数RDD
。
代码:
dict = df.rdd.collectAsMap()
Run Code Online (Sandbox Code Playgroud)
错误日志:
ValueError: dictionary update sequence element #0 has length 8; 2 is required
Run Code Online (Sandbox Code Playgroud)
更新:
DF 有 8 个字段,是否意味着collectAsMap()
只能使用具有两个字段的 DF?
下面是在 pyspark 中做同样事情的解释。我同意 Ram 的解释。collectAsMap 仅适用于 pairedrdd,因此您需要先将数据帧转换为 pair rdd,然后才能使用 collectAsMap 函数将其转换为某些字典。
例如,我有一个以下数据框:
df = spark.sql("""select emp_id,emp_city from udb.temptable_1 order by emp_id""");
+------+--------+
|emp_id|emp_city|
+------+--------+
| 1|NOIDA |
| 2|GURGAON |
| 3|DWARKA |
| 4|SAKET |
| 5|USA |
| 6|UP |
| 7|NOIDA |
| 8|SAKET |
| 9|GURGAON |
+------+--------+
Run Code Online (Sandbox Code Playgroud)
newrdd = df.rdd.map(lambda x : (x[0],x))
>>> type(newrdd)
<class 'pyspark.rdd.PipelinedRDD'>
[(1, Row(emp_id=1, emp_city=u'NOIDA ')),
(2, Row(emp_id=2, emp_city=u'GURGAON ')),
(3, Row(emp_id=3, emp_city=u'DWARKA ')),
(4, Row(emp_id=4, emp_city=u'SAKET ')),
(5, Row(emp_id=5, emp_city=u'USA ')),
(6, Row(emp_id=6, emp_city=u'UP ')),
(7, Row(emp_id=7, emp_city=u'NOIDA ')),
(8, Row(emp_id=8, emp_city=u'SAKET ')),
(9, Row(emp_id=9, emp_city=u'GURGAON '))]
Run Code Online (Sandbox Code Playgroud)
最后,您可以使用 collectAsMap 将您的键值对 rdd 转换为字典
dict = newrdd.collectAsMap()
{1: Row(emp_id=1, emp_city=u'NOIDA '),
2: Row(emp_id=2, emp_city=u'GURGAON '),
3: Row(emp_id=3, emp_city=u'DWARKA '),
4: Row(emp_id=4, emp_city=u'SAKET '),
5: Row(emp_id=5, emp_city=u'USA '),
6: Row(emp_id=6, emp_city=u'UP '),
7: Row(emp_id=7, emp_city=u'NOIDA '),
8: Row(emp_id=8, emp_city=u'SAKET '),
9: Row(emp_id=9, emp_city=u'GURGAON ')}
>>> dict.keys()
[1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> dict.get(2)
Row(emp_id=2, emp_city=u'GURGAON ')
Run Code Online (Sandbox Code Playgroud)