使用 Between 子句连接两个 pyspark 数据帧以从一系列 Ip 中查找 ip 详细信息

bra*_*raj 5 join apache-spark apache-spark-sql pyspark

我正在尝试为以下 sql 查询编写 pyspark 代码:

Create table table1 as
Select a.ip_address,a.ip_number,b.ip_start_int,b.ip_end_int,b.post_code_id,b.city,b.region_name,b.two_letter_country
from nk_ip_address_check a 
join 
ip_additional_pulse b
on a.ip_number between b.ip_start_int and b.ip_end_int
Run Code Online (Sandbox Code Playgroud)

上面的查询在两个表之间进行连接,并使用“ Between ”子句和“ on ”子句。我写了一个UDF,它的作用相同,但看起来很慢。有什么方法可以在 pyspark 代码中编写上述查询,这会给我更好的性能。

下面是我正在使用的代码

def ip_mapping(ip_int):
    ip_qry = "select country_code,region_code,city_code,postal_code from de_pulse_ip_pqt where ip_start_int < {} and ip_end_int > {}".format(ip_int,ip_int)
    result = spark.sql(ip_qry)
    country_code = result.rdd.map(lambda x: x['country_code']).first()
    return country_code

ip_mapped = udf(ip_mapping, IntegerType())  
df_final = df.withColumn("country_code", ip_mapped("ip_int"))
Run Code Online (Sandbox Code Playgroud)

这是非常低效的。此外,如果我有region_code,我必须通过更改函数ip_mapping的返回值来调用。

df_final = df.withColumn("region_code", ip_mapped("ip_int"))
Run Code Online (Sandbox Code Playgroud)

Yar*_*ron 0

那么,对于 DF 中的每个 IP,您在 IP->GeoIP 丰富的另一个 DF 中执行搜索吗?

简单的解决方案 -> 考虑使用 MaxMind DB - https://github.com/maxmind/GeoIP2-python https://www.maxmind.com/en/home

无论如何,您应该对每个 IP 执行一次操作,并返回特定 IP 的所有 GeoIP 数据。

您的 ip_mapping 函数应返回一个项目列表(例如:(国家/地区代码、城市代码、地区代码))

您的 UDF 应使用 array 模式,并且 UDF 的结果将是多列输出(有关详细信息,请参阅/sf/answers/2472657631/ )