Spark SQL性能 - 在BETWEEN最小值和最大值之间加入值

Afp*_*Afp 4 python apache-spark apache-spark-sql pyspark

我有两个存储的文件:

  1. IP范围 - 国家/地区查询
  2. 来自不同IP的请求列表

IP存储为整数(使用inet_aton()).

我尝试通过将两个文件加载到数据框并将它们注册为临时表来使用Spark SQL来连接这些数据.

GeoLocTable - ipstart, ipend, ...additional Geo location data
Recordstable - INET_ATON, ...3 more fields
Run Code Online (Sandbox Code Playgroud)

我尝试使用Spark SQL来使用像这样的SQL语句来连接这些数据 -

"select a.*, b.* from Recordstable a left join GeoLocTable b on a.INET_ATON between b.ipstart and b.ipend"
Run Code Online (Sandbox Code Playgroud)

RecordsTable中有大约850K记录,GeoLocTable中有大约250M记录.存在的连接大约有20个执行器运行大约2个小时.

我尝试过缓存和播放GeoLocTable,但它似乎并没有帮助.我已经碰到了spark.sql.autoBroadcastJoinThreshold = 300000000和spark.sql.shuffle.partitions = 600.

Spark UI显示正在执行的BroadcastNestedLoopJoin.这是我应该期待的最好的吗?我尝试搜索将执行此类连接的条件,但文档似乎很少.

PS - 我正在使用PySpark与Spark合作.

zer*_*323 8

问题的根源非常简单.当你执行join和join条件不是基于相同的时候Spark现在唯一可以做的就是将它扩展为Cartesian产品,然后过滤内部发生的几乎是什么BroadcastNestedLoopJoin.所以逻辑上你有这个巨大的嵌套循环来测试所有850K*2.5M记录.

这种方法显然效率极低.由于查找表似乎适合内存,因此最简单的改进是使用本地排序数据结构而不是Spark DataFrame.假设您的数据如下所示:

geo_loc_table = sc.parallelize([
    (1, 10, "foo"), (11, 36, "bar"), (37, 59, "baz"),
]).toDF(["ipstart", "ipend", "loc"])

records_table = sc.parallelize([
    (1,  11), (2, 38), (3, 50)
]).toDF(["id", "inet"])
Run Code Online (Sandbox Code Playgroud)

我们可以按参数数据进行投影和排序,ipstart并创建广播变量:

geo_start_bd = sc.broadcast(geo_loc_table
  .select("ipstart")
  .orderBy("ipstart") 
  .flatMap(lambda x: x)
  .collect())
Run Code Online (Sandbox Code Playgroud)

接下来,我们将使用UDF和bisect模块进行扩充 records_table

from bisect import bisect_right
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType

# https://docs.python.org/3/library/bisect.html#searching-sorted-lists
def find_le(x):
    'Find rightmost value less than or equal to x'
    i = bisect_right(geo_start_bd.value, x)
    if i:
        return geo_start_bd.value[i-1]
    return None

records_table_with_ipstart = records_table.withColumn(
    "ipstart", udf(find_le, LongType())("inet")
)
Run Code Online (Sandbox Code Playgroud)

最后加入两个数据集:

 records_table_with_ipstart.join(geo_loc_table, ["ipstart"], "left")
Run Code Online (Sandbox Code Playgroud)