PySpark:在 UDF 中使用列名称并根据逻辑进行列名称的串联

NDS*_*NDS 2 python dataframe apache-spark apache-spark-sql pyspark

我有一个像这样的 Spark 数据框......

ID A C D
编号1 1 0 0 2
编号2 0 3 0 1
id3 1 2 5 0
id4 4 0 0 1

我想要一个基于这个逻辑的新数据框......

  1. 接受任何具有正值的列
  2. 连接他们的名字

那么结果就会是这样的……

ID 新专栏
编号1 广告
编号2 乙、丁
id3 甲、乙、丙
id4 广告

我的努力:

A)对于第一步,我想我会将整数转换为列的名称......所以它看起来像这样......

ID A C D
编号1 A 0 0 D
编号2 0 0 D
id3 A C 0
id4 A 0 0 D

我尝试使用 UDF 但没有成功...

def CountSelect(colname, x):
  if x>0 :
    return colname
  else:
    return ""

countUDF = UserDefinedFunction(CountSelect, T.StringType())

cols = inoutDF.columns
cols.remove("ID")

intermediateDF = inputDF.select("ID", *(countDF(c, col(c)).alias(c) for c in cols))
Run Code Online (Sandbox Code Playgroud)

但它不起作用......

你们中有人可以帮忙吗?

B)然后我将在所有列上使用字符串连接函数

这部分应该更容易,但如果您可以将这两种逻辑组合成更简单的工作代码,我将非常感谢您。

多谢

Vae*_*hav 6

这个想法是跨列标记行positive并返回相应列的值。

您可以使用reduce来标记列并创建一个新的列DataFrame,最后使用concat_ws来形成所需的值

@anky 提供的更简洁的解决方案

简洁的解决方案 -

sparkDF.withColumn("GreaterThanZero",F.concat_ws(",",*[F.when(F.col(col)>0,col) for col in to_concat]))\
.select("id","GreaterThanZero").show()

+---+---------------+
| id|GreaterThanZero|
+---+---------------+
|id1|            A,D|
|id2|            B,D|
|id3|          A,B,C|
|id4|            A,D|
+---+---------------+
Run Code Online (Sandbox Code Playgroud)

数据准备

input_str = """
id1 1   0   0   2
id2 0   3   0   1
id3 1   2   5   0
id4 4   0   0   1
""".split()

input_values = list(map(lambda x: x.strip() if x.strip() != 'null' else None, input_str))

cols = list(map(lambda x: x.strip() if x.strip() != 'null' else None, "ID   A   B   C   D".split()))
            
n = len(input_values)
n_cols = 5

input_list = [tuple(input_values[i:i+n_cols]) for i in range(0,n,n_cols)]

sparkDF = sql.createDataFrame(input_list, cols)

sparkDF.show()

+---+---+---+---+---+
| ID|  A|  B|  C|  D|
+---+---+---+---+---+
|id1|  1|  0|  0|  2|
|id2|  0|  3|  0|  1|
|id3|  1|  2|  5|  0|
|id4|  4|  0|  0|  1|
+---+---+---+---+---+

Run Code Online (Sandbox Code Playgroud)

减少

to_check = ['id','A','B','C','D']

sparkDF_marked = reduce(lambda df
                , x: df.withColumn(x,F.when(F.col(x) > 0 ,x).otherwise(None))\
                        if x != 'id' else df.withColumn(x,F.col(x)) \
                ,to_check, sparkDF
            )
                      
sparkDF_marked.show()

+---+----+----+----+----+
| id|   A|   B|   C|   D|
+---+----+----+----+----+
|id1|   A|null|null|   D|
|id2|null|   B|null|   D|
|id3|   A|   B|   C|null|
|id4|   A|null|null|   D|
+---+----+----+----+----+
Run Code Online (Sandbox Code Playgroud)

康卡特

to_concat = ['A','B','C','D']

sparkDF_marked.select(['id',F.concat_ws(',',*to_concat).alias('GreaterThanZero')]).show()

+---+---------------+
| id|GreaterThanZero|
+---+---------------+
|id1|            A,D|
|id2|            B,D|
|id3|          A,B,C|
|id4|            A,D|
+---+---------------+
Run Code Online (Sandbox Code Playgroud)

该解决方案虽然有效,但有一些微妙的细微差别,您需要小心,特别是reduce 代码片段to_checkto_concat

to_check可以轻松地替换为 -sparkDF.columns对于实际数据,但请让我知道更大数据集的性能。