NDS*_*NDS 2 python dataframe apache-spark apache-spark-sql pyspark
我有一个像这样的 Spark 数据框......
| ID | A | 乙 | C | D |
|---|---|---|---|---|
| 编号1 | 1 | 0 | 0 | 2 |
| 编号2 | 0 | 3 | 0 | 1 |
| id3 | 1 | 2 | 5 | 0 |
| id4 | 4 | 0 | 0 | 1 |
我想要一个基于这个逻辑的新数据框......
那么结果就会是这样的……
| ID | 新专栏 |
|---|---|
| 编号1 | 广告 |
| 编号2 | 乙、丁 |
| id3 | 甲、乙、丙 |
| id4 | 广告 |
我的努力:
A)对于第一步,我想我会将整数转换为列的名称......所以它看起来像这样......
| ID | A | 乙 | C | D |
|---|---|---|---|---|
| 编号1 | A | 0 | 0 | D |
| 编号2 | 0 | 乙 | 0 | D |
| id3 | A | 乙 | C | 0 |
| id4 | A | 0 | 0 | D |
我尝试使用 UDF 但没有成功...
def CountSelect(colname, x):
if x>0 :
return colname
else:
return ""
countUDF = UserDefinedFunction(CountSelect, T.StringType())
cols = inoutDF.columns
cols.remove("ID")
intermediateDF = inputDF.select("ID", *(countDF(c, col(c)).alias(c) for c in cols))
Run Code Online (Sandbox Code Playgroud)
但它不起作用......
你们中有人可以帮忙吗?
B)然后我将在所有列上使用字符串连接函数
这部分应该更容易,但如果您可以将这两种逻辑组合成更简单的工作代码,我将非常感谢您。
多谢
这个想法是跨列标记行positive并返回相应列的值。
您可以使用reduce来标记列并创建一个新的列DataFrame,最后使用concat_ws来形成所需的值
@anky 提供的更简洁的解决方案
sparkDF.withColumn("GreaterThanZero",F.concat_ws(",",*[F.when(F.col(col)>0,col) for col in to_concat]))\
.select("id","GreaterThanZero").show()
+---+---------------+
| id|GreaterThanZero|
+---+---------------+
|id1| A,D|
|id2| B,D|
|id3| A,B,C|
|id4| A,D|
+---+---------------+
Run Code Online (Sandbox Code Playgroud)
input_str = """
id1 1 0 0 2
id2 0 3 0 1
id3 1 2 5 0
id4 4 0 0 1
""".split()
input_values = list(map(lambda x: x.strip() if x.strip() != 'null' else None, input_str))
cols = list(map(lambda x: x.strip() if x.strip() != 'null' else None, "ID A B C D".split()))
n = len(input_values)
n_cols = 5
input_list = [tuple(input_values[i:i+n_cols]) for i in range(0,n,n_cols)]
sparkDF = sql.createDataFrame(input_list, cols)
sparkDF.show()
+---+---+---+---+---+
| ID| A| B| C| D|
+---+---+---+---+---+
|id1| 1| 0| 0| 2|
|id2| 0| 3| 0| 1|
|id3| 1| 2| 5| 0|
|id4| 4| 0| 0| 1|
+---+---+---+---+---+
Run Code Online (Sandbox Code Playgroud)
to_check = ['id','A','B','C','D']
sparkDF_marked = reduce(lambda df
, x: df.withColumn(x,F.when(F.col(x) > 0 ,x).otherwise(None))\
if x != 'id' else df.withColumn(x,F.col(x)) \
,to_check, sparkDF
)
sparkDF_marked.show()
+---+----+----+----+----+
| id| A| B| C| D|
+---+----+----+----+----+
|id1| A|null|null| D|
|id2|null| B|null| D|
|id3| A| B| C|null|
|id4| A|null|null| D|
+---+----+----+----+----+
Run Code Online (Sandbox Code Playgroud)
to_concat = ['A','B','C','D']
sparkDF_marked.select(['id',F.concat_ws(',',*to_concat).alias('GreaterThanZero')]).show()
+---+---------------+
| id|GreaterThanZero|
+---+---------------+
|id1| A,D|
|id2| B,D|
|id3| A,B,C|
|id4| A,D|
+---+---------------+
Run Code Online (Sandbox Code Playgroud)
该解决方案虽然有效,但有一些微妙的细微差别,您需要小心,特别是reduce 代码片段和to_check。to_concat
to_check可以轻松地替换为 -sparkDF.columns对于实际数据,但请让我知道更大数据集的性能。