小编Zha*_*Xin的帖子

pyspark.sql.types.Row 列出

我的初始数据集是:

{'ID': [Row(userid=17562323, gross_merchandise_value=6072210944, country=u'ID'), Row(userid=29989283, gross_merchandise_value=4931252224, country=u'ID')]
Run Code Online (Sandbox Code Playgroud)

dict 值的类型是 pyspark.sql.types.Row

如何将dict转换为userid列表?像下面这样:

[17562323, 29989283],
Run Code Online (Sandbox Code Playgroud)

只需获取用户 ID 列表。

python pyspark

7
推荐指数
1
解决办法
1万
查看次数

如何将字符串变量分配给数据框名称

我遇到了一个问题,这是一个 for 循环程序。如下所示:

list = [1,2,3,4]

for index in list:
    new_df_name = "user_" + index
    new_df_name = origin_df1.join(origin_df2,'id','left')
Run Code Online (Sandbox Code Playgroud)

但“ new_df_name ”只是一个变量和字符串类型。

如何实现这些?

python dataframe apache-spark apache-spark-sql pyspark

5
推荐指数
1
解决办法
7583
查看次数

加入后如何重命名重复的列?

我想将join与3个数据框一起使用,但是有些列我们不需要,或者与其他数据框有重复的名称,因此我想删除一些列,如下所示:

result_df = (aa_df.join(bb_df, 'id', 'left')
  .join(cc_df, 'id', 'left')
  .withColumnRenamed(bb_df.status, 'user_status'))
Run Code Online (Sandbox Code Playgroud)

请注意,该status列位于两个数据帧中,即aa_dfbb_df

上面的方法不起作用。我也尝试使用withColumn,但是创建了新列,而旧列仍然存在。

apache-spark apache-spark-sql pyspark

5
推荐指数
2
解决办法
5447
查看次数

如何在pyspark使用的一行中修改一列值

我想在userid = 22650984时更新值。如何在pyspark平台中执行此操作?感谢您的帮助。

>>>xxDF.select('userid','registration_time').filter('userid="22650984"').show(truncate=False)
18/04/08 10:57:00 WARN TaskSetManager: Lost task 0.1 in stage 57.0 (TID 874, shopee-hadoop-slave89, executor 9): TaskKilled (killed intentionally)
18/04/08 10:57:00 WARN TaskSetManager: Lost task 11.1 in stage 57.0 (TID 875, shopee-hadoop-slave97, executor 16): TaskKilled (killed intentionally)
+--------+----------------------------+
|userid  |registration_time           |
+--------+----------------------------+
|22650984|270972-04-26 13:14:46.345152|
+--------+----------------------------+
Run Code Online (Sandbox Code Playgroud)

pyspark

4
推荐指数
2
解决办法
6255
查看次数

如何在气流中操作,以便任务重新运行并继续下游任务

我在气流中设置了一个工作流程,其中一个工作失败了,在我修复了问题之后,我想重新运行失败的任务并继续工作流程.详情如下:

在此输入图像描述

如上所述,我准备运行任务"20_validation",我按下"运行"按钮如下:

在此输入图像描述

但问题是当任务'20_validation'完成时,这些下游任务不会继续启动.我应该怎么做?

scheduled-tasks airflow

3
推荐指数
1
解决办法
604
查看次数

如何使用数据框创建地图

我有一个数据框df.show()像这样:

+-----------+-------------------+
|id|                        name|
+-----------+-------------------+
|       1231|                aa |
|       1232|                bb |
|       1233|                cc |
|       1234|                dd |
|       1235|                 dd|
|       1236|                 cc|
+-----------+-------------------+
Run Code Online (Sandbox Code Playgroud)

“id”列是唯一的,现在我要创建一个键为“id”,值为“name”的映射,如何通过scala实现它?假设数据帧名称是 df。

val mapResult = df.map(...)
Run Code Online (Sandbox Code Playgroud)

scala apache-spark rdd

1
推荐指数
1
解决办法
5810
查看次数