use*_*916 4 python apache-spark apache-spark-sql pyspark pyspark-sql
我试图从Python列表中删除一个元素:
+---------------+
| sources|
+---------------+
| [62]|
| [7, 32]|
| [62]|
| [18, 36, 62]|
|[7, 31, 36, 62]|
| [7, 32, 62]|
Run Code Online (Sandbox Code Playgroud)
我希望能够rm从上面列表中的每个列表中删除元素.我写了一个函数,可以为列表列表做到这一点:
def asdf(df, rm):
temp = df
for n in range(len(df)):
temp[n] = [x for x in df[n] if x != rm]
return(temp)
Run Code Online (Sandbox Code Playgroud)
哪个删除rm = 1:
a = [[1,2,3],[1,2,3,4],[1,2,3,4,5]]
In: asdf(a,1)
Out: [[2, 3], [2, 3, 4], [2, 3, 4, 5]]
Run Code Online (Sandbox Code Playgroud)
但我不能让它适用于DataFrame:
asdfUDF = udf(asdf, ArrayType(IntegerType()))
In: df.withColumn("src_ex", asdfUDF("sources", 32))
Out: Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.col. Trace:
py4j.Py4JException: Method col([class java.lang.Integer]) does not exist
Run Code Online (Sandbox Code Playgroud)
期望的行为:
In: df.withColumn("src_ex", asdfUDF("sources", 32))
Out:
+---------------+
| src_ex|
+---------------+
| [62]|
| [7]|
| [62]|
| [18, 36, 62]|
|[7, 31, 36, 62]|
| [7, 62]|
Run Code Online (Sandbox Code Playgroud)
(除了将上面的新列附加到PySpark DataFrame之外df)
有什么建议或想法吗?
use*_*411 11
Spark> = 2.4
你可以使用array_remove:
from pyspark.sql.functions import array_remove
df.withColumn("src_ex", array_remove("sources", 32)).show()
Run Code Online (Sandbox Code Playgroud)
+---------------+---------------+
| sources| src_ex|
+---------------+---------------+
| [62]| [62]|
| [7, 32]| [7]|
| [62]| [62]|
| [18, 36, 62]| [18, 36, 62]|
|[7, 31, 36, 62]|[7, 31, 36, 62]|
| [7, 32, 62]| [7, 62]|
+---------------+---------------+
Run Code Online (Sandbox Code Playgroud)
或者filter:
from pyspark.sql.functions import expr
df.withColumn("src_ex", expr("filter(sources, x -> not(x <=> 32))")).show()
Run Code Online (Sandbox Code Playgroud)
+---------------+---------------+
| sources| src_ex|
+---------------+---------------+
| [62]| [62]|
| [7, 32]| [7]|
| [62]| [62]|
| [18, 36, 62]| [18, 36, 62]|
|[7, 31, 36, 62]|[7, 31, 36, 62]|
| [7, 32, 62]| [7, 62]|
+---------------+---------------+
Run Code Online (Sandbox Code Playgroud)
Spark <2.4
很多事情:
DataFrame不是列表列表.在实践中,它甚至不是普通的Python对象,它没有len,也没有Iterable.array类型.DataFrame(或UDF中的任何其他分布式数据结构).str(列名)或Column对象.传递文字使用lit功能.唯一剩下的就是列表理解:
from pyspark.sql.functions import lit, udf
def drop_from_array_(arr, item):
return [x for x in arr if x != item]
drop_from_array = udf(drop_from_array_, ArrayType(IntegerType()))
Run Code Online (Sandbox Code Playgroud)
用法示例:
df = sc.parallelize([
[62], [7, 32], [62], [18, 36, 62], [7, 31, 36, 62], [7, 32, 62]
]).map(lambda x: (x, )).toDF(["sources"])
df.withColumn("src_ex", drop_from_array("sources", lit(32)))
Run Code Online (Sandbox Code Playgroud)
结果:
+---------------+---------------+
| sources| src_ex|
+---------------+---------------+
| [62]| [62]|
| [7, 32]| [7]|
| [62]| [62]|
| [18, 36, 62]| [18, 36, 62]|
|[7, 31, 36, 62]|[7, 31, 36, 62]|
| [7, 32, 62]| [7, 62]|
+---------------+---------------+
Run Code Online (Sandbox Code Playgroud)