相关疑难解决方法(0)

从Spark DataFrame中的单个列派生多个列

我有一个带有巨大可解析元数据的DF作为Dataframe中的单个字符串列,我们称之为DFA,使用ColmnA.

我想打破这一列,将ColmnA分成多个列,通过一个函数,ClassXYZ = Func1(ColmnA).此函数返回一个具有多个变量的类ClassXYZ,现在每个变量都必须映射到新列,例如ColmnA1,ColmnA2等.

如何通过调用此Func1一次,使用这些附加列从一个Dataframe到另一个Data转换,而不必重复它来创建所有列.

如果我每次都要调用这个巨大的函数添加一个新列,它很容易解决,但这是我希望避免的.

请使用工作或伪代码建议.

谢谢

桑杰

scala user-defined-functions dataframe apache-spark apache-spark-sql

48
推荐指数
3
解决办法
5万
查看次数

如何使用UDF添加多个列?

我想将UDF的返回值添加到单独列中的现有数据框中.我如何以足智多谋的方式实现这一目标?

这是我到目前为止所拥有的一个例子.

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType  

df = spark.createDataFrame([("Alive",4)],["Name","Number"])
df.show(1)

+-----+------+
| Name|Number|
+-----+------+
|Alive|     4|
+-----+------+

def example(n):
        return [[n+2], [n-2]]

#  schema = StructType([
#          StructField("Out1", ArrayType(IntegerType()), False),
#          StructField("Out2", ArrayType(IntegerType()), False)])

example_udf = udf(example)
Run Code Online (Sandbox Code Playgroud)

现在我可以按如下方式向数据框添加一列

newDF = df.withColumn("Output", example_udf(df["Number"]))
newDF.show(1)
+-----+------+----------+
| Name|Number|Output    |
+-----+------+----------+
|Alive|     4|[[6], [2]]|
+-----+------+----------+
Run Code Online (Sandbox Code Playgroud)

但是我不希望这两个值在同一列中,而是在单独的列中.

理想情况下,我现在要分割输出列,以避免调用示例函数两次(每次返回值一次),如此此处所述,但在我的情况下,我得到一个数组数组,我看不到拆分如何在那里工作(请注意每个数组将包含多个值,用","分隔.

结果应该如何

我最终想要的是这个

+-----+------+----+----+
| Name|Number|Out1|Out2|
+-----+------+----+----+
|Alive|     4|   6|   2|
+-----+------+----+----+
Run Code Online (Sandbox Code Playgroud)

请注意,StructType返回类型的使用是可选的,不一定必须是解决方案的一部分.

编辑:我注释掉了StructType的使用(并编辑了udf赋值),因为它不是示例函数的返回类型所必需的.但是,如果返回值类似,则必须使用它

return …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark

10
推荐指数
2
解决办法
9303
查看次数

使用 Between 子句连接两个 pyspark 数据帧以从一系列 Ip 中查找 ip 详细信息

我正在尝试为以下 sql 查询编写 pyspark 代码:

Create table table1 as
Select a.ip_address,a.ip_number,b.ip_start_int,b.ip_end_int,b.post_code_id,b.city,b.region_name,b.two_letter_country
from nk_ip_address_check a 
join 
ip_additional_pulse b
on a.ip_number between b.ip_start_int and b.ip_end_int
Run Code Online (Sandbox Code Playgroud)

上面的查询在两个表之间进行连接,并使用“ Between ”子句和“ on ”子句。我写了一个UDF,它的作用相同,但看起来很慢。有什么方法可以在 pyspark 代码中编写上述查询,这会给我更好的性能。

下面是我正在使用的代码

def ip_mapping(ip_int):
    ip_qry = "select country_code,region_code,city_code,postal_code from de_pulse_ip_pqt where ip_start_int < {} and ip_end_int > {}".format(ip_int,ip_int)
    result = spark.sql(ip_qry)
    country_code = result.rdd.map(lambda x: x['country_code']).first()
    return country_code

ip_mapped = udf(ip_mapping, IntegerType())  
df_final = df.withColumn("country_code", ip_mapped("ip_int"))
Run Code Online (Sandbox Code Playgroud)

这是非常低效的。此外,如果我有region_code,我必须通过更改函数ip_mapping的返回值来调用。

df_final = df.withColumn("region_code", ip_mapped("ip_int"))
Run Code Online (Sandbox Code Playgroud)

join apache-spark apache-spark-sql pyspark

5
推荐指数
1
解决办法
7268
查看次数

PySpark:向DataFrame添加更多列的最佳实践

Spark Dataframes具有一种withColumn可以一次添加一个新列的方法。要添加多个列,需要一个链withColumn。这是最佳做法吗?

我觉得使用mapPartitions具有更多优势。假设我有一个由3组成withColumn的链,然后有一个过滤器Row根据某些条件将s 去除。这是四个不同的操作(不过,我不确定其中是否有宽泛的转换)。但是如果我做得到,我可以一口气做完mapPartitions。如果我有一个数据库连接,而我希望每个RDD分区打开一次,这也有帮助。

我的问题分为两个部分。

第一部分,这是我对mapPartitions的实现。这种方法是否有无法预料的问题?有没有更优雅的方式做到这一点?

df2 = df.rdd.mapPartitions(add_new_cols).toDF()

def add_new_cols(rows):
    db = open_db_connection()
    new_rows = []
    new_row_1 = Row("existing_col_1", "existing_col_2", "new_col_1", "new_col_2")
    i = 0
    for each_row in rows:
        i += 1
        # conditionally omit rows
        if i % 3 == 0:
            continue
        db_result = db.get_some_result(each_row.existing_col_2)
        new_col_1 = ''.join([db_result, "_NEW"])
        new_col_2 = db_result
        new_f_row = new_row_1(each_row.existing_col_1, each_row.existing_col_2, new_col_1, new_col_2)
        new_rows.append(new_f_row)

    db.close()
    return iter(new_rows)
Run Code Online (Sandbox Code Playgroud)

第二部分,在和mapPartitions链上使用时要权衡些什么?withColumn …

apache-spark apache-spark-sql pyspark pyspark-sql

4
推荐指数
1
解决办法
1066
查看次数