bp2*_*010 7 python arrays apache-spark pyspark
arrays_zipSpark 2.3中的等价函数怎么写?
来自 Spark 2.4 的源代码
def arrays_zip(*cols):
"""
Collection function: Returns a merged array of structs in which the N-th struct contains all
N-th values of input arrays.
:param cols: columns of arrays to be merged.
>>> from pyspark.sql.functions import arrays_zip
>>> df = spark.createDataFrame([(([1, 2, 3], [2, 3, 4]))], ['vals1', 'vals2'])
>>> df.select(arrays_zip(df.vals1, df.vals2).alias('zipped')).collect()
[Row(zipped=[Row(vals1=1, vals2=2), Row(vals1=2, vals2=3), Row(vals1=3, vals2=4)])]
"""
sc = SparkContext._active_spark_context
return Column(sc._jvm.functions.arrays_zip(_to_seq(sc, cols, _to_java_column)))
Run Code Online (Sandbox Code Playgroud)
如何在 PySpark 中实现类似的目标?
您可以使用 来UDF获得与 相同的功能arrays_zip。请注意,列类型必须相同才能正常工作(在本例中为IntegerType)。如果列类型有任何差异,请在使用UDF.
from pyspark.sql import functions as F
from pyspark.sql import types as T
def zip_func(*args):
return list(zip(*args))
zip_udf = F.udf(zip_func, T.ArrayType(T.ArrayType(T.IntegerType())))
Run Code Online (Sandbox Code Playgroud)
它的使用方式与 相同arrays_zip,例如:
df = spark.createDataFrame([(([1, 2, 3], [2, 3, 4]))], ['vals1', 'vals2'])
df.select(zip_udf(df.vals1, df.vals2).alias('zipped')).collect()
Run Code Online (Sandbox Code Playgroud)