Ste*_*ven 1 python pandas apache-spark apache-spark-sql pyspark
我正在尝试将一些熊猫代码转换为Spark以进行缩放。myfunc是复杂API的包装,该API接受一个字符串并返回一个新字符串(这意味着我不能使用向量化函数)。
def myfunc(ds):
for attribute, value in ds.items():
value = api_function(attribute, value)
ds[attribute] = value
return ds
df = df.apply(myfunc, axis='columns')
Run Code Online (Sandbox Code Playgroud)
myfunc接收一个DataSeries,将其分解为单个单元格,为每个单元格调用API,并使用相同的列名构建一个新的DataSeries。这有效地修改了DataFrame中的所有单元格。
我是Spark的新手,我想使用来翻译此逻辑pyspark。我已经将熊猫DataFrame转换为Spark:
spark = SparkSession.builder.appName('My app').getOrCreate()
spark_schema = StructType([StructField(c, StringType(), True) for c in df.columns])
spark_df = spark.createDataFrame(df, schema=spark_schema)
Run Code Online (Sandbox Code Playgroud)
这是我迷路的地方。我需要UDF一个pandas_udf吗?如何遍历所有单元格,并为每个单元格返回一个新字符串myfunc?spark_df.foreach()不返回任何东西,也没有map()函数。
我可以修改myfunc从DataSeries- > DataSeries到string> - string如果需要的话。
最简单的方法是重写您的函数以将字符串作为参数(因此它是string-> string)并使用UDF。有一个很好的例子在这里。一次只能在一列上运行。因此,如果您DataFrame的列数合理,则可以一次将UDF应用于每一列:
from pyspark.sql.functions import col
new_df = df.select(udf(col("col1")), udf(col("col2")), ...)
Run Code Online (Sandbox Code Playgroud)
df = sc.parallelize([[1, 4], [2,5], [3,6]]).toDF(["col1", "col2"])
df.show()
+----+----+
|col1|col2|
+----+----+
| 1| 4|
| 2| 5|
| 3| 6|
+----+----+
def plus1_udf(x):
return x + 1
plus1 = spark.udf.register("plus1", plus1_udf)
new_df = df.select(plus1(col("col1")), plus1(col("col2")))
new_df.show()
+-----------+-----------+
|plus1(col1)|plus1(col2)|
+-----------+-----------+
| 2| 5|
| 3| 6|
| 4| 7|
+-----------+-----------+
Run Code Online (Sandbox Code Playgroud)
map可用于Scala DataFrame,但目前不适用于PySpark。较低级的RDD API map在PySpark中确实具有功能。因此,如果一次要转换的列太多,则可以DataFrame像这样对每个单元进行操作:
def map_fn(row):
return [api_function(x) for (column, x) in row.asDict().items()
column_names = df.columns
new_df = df.rdd.map(map_fn).toDF(df.columns)
Run Code Online (Sandbox Code Playgroud)
df = sc.parallelize([[1, 4], [2,5], [3,6]]).toDF(["col1", "col2"])
def map_fn(row):
return [value + 1 for (_, value) in row.asDict().items()]
columns = df.columns
new_df = df.rdd.map(map_fn).toDF(columns)
new_df.show()
+----+----+
|col1|col2|
+----+----+
| 2| 5|
| 3| 6|
| 4| 7|
+----+----+
Run Code Online (Sandbox Code Playgroud)
该文件中foreach只给出了印刷的例子,但我们可以验证看代码,它确实不返回任何东西。
您可以pandas_udf在这篇文章中阅读有关内容,但似乎它最适合矢量化函数,正如您所指出的,由于,您不能使用它api_function。
解决办法是:
udf_func = udf(func, StringType())
for col_name in spark_df.columns:
spark_df = spark_df.withColumn(col_name, udf_func(lit(col_name), col_name))
return spark_df.toPandas()
Run Code Online (Sandbox Code Playgroud)
有 3 个关键见解帮助我解决了这个问题:
withColumn与现有列的名称 ( col_name) 一起使用,Spark 将“覆盖”/隐藏原始列。这基本上提供了直接编辑列的外观,就好像它是可变的一样。spark_df用相同的 DataFrame 变量,我使用相同的原理来模拟可变的 DataFrame,创建一系列逐列转换,每次“覆盖”一列(每 #1 - 见下文)UDFs期望所有参数都是Column类型,这意味着它尝试解析每个参数的列值。因为api_function的第一个参数是一个文字值,对于向量中的所有行都相同,所以您必须使用该lit()函数。简单地将 col_name 传递给函数将尝试提取该列的列值。据我所知,通过col_name等同于通过col(col_name)。假设有 3 列“a”、“b”和“c”,展开这个概念看起来像这样:
spark_df = spark_df.withColumn('a', udf_func(lit('a'), 'a')
.withColumn('b', udf_func(lit('b'), 'b')
.withColumn('c', udf_func(lit('c'), 'c')
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3251 次 |
| 最近记录: |