我有一个带有巨大可解析元数据的DF作为Dataframe中的单个字符串列,我们称之为DFA,使用ColmnA.
我想打破这一列,将ColmnA分成多个列,通过一个函数,ClassXYZ = Func1(ColmnA).此函数返回一个具有多个变量的类ClassXYZ,现在每个变量都必须映射到新列,例如ColmnA1,ColmnA2等.
如何通过调用此Func1一次,使用这些附加列从一个Dataframe到另一个Data转换,而不必重复它来创建所有列.
如果我每次都要调用这个巨大的函数添加一个新列,它很容易解决,但这是我希望避免的.
请使用工作或伪代码建议.
谢谢
桑杰
scala user-defined-functions dataframe apache-spark apache-spark-sql
题
我想将UDF的返回值添加到单独列中的现有数据框中.我如何以足智多谋的方式实现这一目标?
这是我到目前为止所拥有的一个例子.
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType
df = spark.createDataFrame([("Alive",4)],["Name","Number"])
df.show(1)
+-----+------+
| Name|Number|
+-----+------+
|Alive| 4|
+-----+------+
def example(n):
return [[n+2], [n-2]]
# schema = StructType([
# StructField("Out1", ArrayType(IntegerType()), False),
# StructField("Out2", ArrayType(IntegerType()), False)])
example_udf = udf(example)
Run Code Online (Sandbox Code Playgroud)
现在我可以按如下方式向数据框添加一列
newDF = df.withColumn("Output", example_udf(df["Number"]))
newDF.show(1)
+-----+------+----------+
| Name|Number|Output |
+-----+------+----------+
|Alive| 4|[[6], [2]]|
+-----+------+----------+
Run Code Online (Sandbox Code Playgroud)
但是我不希望这两个值在同一列中,而是在单独的列中.
理想情况下,我现在要分割输出列,以避免调用示例函数两次(每次返回值一次),如此处和此处所述,但在我的情况下,我得到一个数组数组,我看不到拆分如何在那里工作(请注意每个数组将包含多个值,用","分隔.
结果应该如何
我最终想要的是这个
+-----+------+----+----+
| Name|Number|Out1|Out2|
+-----+------+----+----+
|Alive| 4| 6| 2|
+-----+------+----+----+
Run Code Online (Sandbox Code Playgroud)
请注意,StructType返回类型的使用是可选的,不一定必须是解决方案的一部分.
编辑:我注释掉了StructType的使用(并编辑了udf赋值),因为它不是示例函数的返回类型所必需的.但是,如果返回值类似,则必须使用它
return …Run Code Online (Sandbox Code Playgroud) 我正在尝试为以下 sql 查询编写 pyspark 代码:
Create table table1 as
Select a.ip_address,a.ip_number,b.ip_start_int,b.ip_end_int,b.post_code_id,b.city,b.region_name,b.two_letter_country
from nk_ip_address_check a
join
ip_additional_pulse b
on a.ip_number between b.ip_start_int and b.ip_end_int
Run Code Online (Sandbox Code Playgroud)
上面的查询在两个表之间进行连接,并使用“ Between ”子句和“ on ”子句。我写了一个UDF,它的作用相同,但看起来很慢。有什么方法可以在 pyspark 代码中编写上述查询,这会给我更好的性能。
下面是我正在使用的代码
def ip_mapping(ip_int):
ip_qry = "select country_code,region_code,city_code,postal_code from de_pulse_ip_pqt where ip_start_int < {} and ip_end_int > {}".format(ip_int,ip_int)
result = spark.sql(ip_qry)
country_code = result.rdd.map(lambda x: x['country_code']).first()
return country_code
ip_mapped = udf(ip_mapping, IntegerType())
df_final = df.withColumn("country_code", ip_mapped("ip_int"))
Run Code Online (Sandbox Code Playgroud)
这是非常低效的。此外,如果我有region_code,我必须通过更改函数ip_mapping的返回值来调用。
df_final = df.withColumn("region_code", ip_mapped("ip_int"))
Run Code Online (Sandbox Code Playgroud) Spark Dataframes具有一种withColumn可以一次添加一个新列的方法。要添加多个列,需要一个链withColumn。这是最佳做法吗?
我觉得使用mapPartitions具有更多优势。假设我有一个由3组成withColumn的链,然后有一个过滤器Row根据某些条件将s 去除。这是四个不同的操作(不过,我不确定其中是否有宽泛的转换)。但是如果我做得到,我可以一口气做完mapPartitions。如果我有一个数据库连接,而我希望每个RDD分区打开一次,这也有帮助。
我的问题分为两个部分。
第一部分,这是我对mapPartitions的实现。这种方法是否有无法预料的问题?有没有更优雅的方式做到这一点?
df2 = df.rdd.mapPartitions(add_new_cols).toDF()
def add_new_cols(rows):
db = open_db_connection()
new_rows = []
new_row_1 = Row("existing_col_1", "existing_col_2", "new_col_1", "new_col_2")
i = 0
for each_row in rows:
i += 1
# conditionally omit rows
if i % 3 == 0:
continue
db_result = db.get_some_result(each_row.existing_col_2)
new_col_1 = ''.join([db_result, "_NEW"])
new_col_2 = db_result
new_f_row = new_row_1(each_row.existing_col_1, each_row.existing_col_2, new_col_1, new_col_2)
new_rows.append(new_f_row)
db.close()
return iter(new_rows)
Run Code Online (Sandbox Code Playgroud)
第二部分,在和mapPartitions链上使用时要权衡些什么?withColumn …