在 PySpark 中加入表的条件是:如果点在多边形内

Jam*_*ash 5 gis geospatial geo pyspark

我有 2 个 PySpark 数据框:一个带有点df_pnt,另一个带有多边形df_poly。因为我对 PySpark 不是很熟悉,所以我正在努力正确连接这个数据框,条件是一个点是否在多边形内。我从我从这个页面上的材料构建的这段代码开始:

from shapely import wkt  
import numpy as np
from shapely.geometry import Polygon, Point
import matplotlib.pyplot as plt
import pandas as pd
import geopandas as gpd
from pyspark.sql.types import StringType

# Create simple data
polygon1 = Polygon([[0, 0], [.5, 0], [0.3, 0.2], [0, 0.2]])
polygon2 = Polygon([[0.6, 0], [0.6, 0.3], [0.6, 0.4], [0.7, 0.2]])
polygon3 = Polygon([[0.6, 0.5], [.5, 0.5], [0.3, 0.7], [0.4, 0.8]])
polygon4 = Polygon([[0, .5], [.2, 0.4], [0.5, 0.3], [0.5, 0.1]])

df = pd.DataFrame(data={'id':[0, 1, 2, 3],
                 'geometry':[polygon1, polygon2,   polygon3, polygon4]})
df_poly = gpd.GeoDataFrame(
    df, geometry=df['geometry']); del df


df = pd.DataFrame(data={'id':range(0,15),
                'geometry':[Point(pnt) for pnt in np.random.rand(15,2)]})
df_pnt = gpd.GeoDataFrame(
    df, geometry=df['geometry']); del df

# convert shape to str in pandas df
df_poly['wkt'] = pd.Series(
    map(lambda geom: str(geom.to_wkt()), df_poly['geometry']),
    index=df_poly.index, dtype='str')

df_pnt['wkt'] = pd.Series(
        map(lambda geom: str(geom.to_wkt()), df_pnt['geometry']),
        index=df_pnt.index, dtype='str')

# Now we create geometry column as string column in pyspark df
tmp = df_poly.drop("geometry", axis=1)
df_poly = spark.createDataFrame(tmp).cache(); del tmp

tmp = df_pnt.drop("geometry", axis=1)
df_pnt = spark.createDataFrame(tmp).cache(); del tmp
Run Code Online (Sandbox Code Playgroud)

如果我们想绘制第一个多边形,我们应该启动代码

wkt.loads(df_poly.take(1)[0].wkt)
Run Code Online (Sandbox Code Playgroud)

如果我们想检查一个Polygon对象是否包含一个对象,Point我们需要以下行

Polygon.contains(Point)
Run Code Online (Sandbox Code Playgroud)

问题是如何在加入过程中处理这个自定义条件?的df_poly,所以我想利用广播,以及比DF点小方法

UPD: 如果我需要在 geopandas 中实现它,它看起来像这样:

df_pnt
    id  geometry
0   0   POINT (0.08834 0.23203)
1   1   POINT (0.67457 0.19285)
2   2   POINT (0.71186 0.25128)
3   3   POINT (0.55621 0.35016)
4   4   POINT (0.79637 0.24668)
5   5   POINT (0.40932 0.37155)
6   6   POINT (0.36124 0.68229)
7   7   POINT (0.13476 0.58242)
8   8   POINT (0.41659 0.46298)
9   9   POINT (0.74878 0.78191)
10  10  POINT (0.82088 0.58064)
11  11  POINT (0.28797 0.24399)
12  12  POINT (0.40502 0.99233)
13  13  POINT (0.68928 0.73251)
14  14  POINT (0.37765 0.71518)

df_poly

        id  geometry
0   0   POLYGON ((0.00000 0.00000, 0.50000 0.00000, 0....
1   1   POLYGON ((0.60000 0.00000, 0.60000 0.30000, 0....
2   2   POLYGON ((0.60000 0.50000, 0.50000 0.50000, 0....
3   3   POLYGON ((0.00000 0.50000, 0.20000 0.40000, 0....

gpd.sjoin(df_pnt, df_poly, how="left", op='intersects')

    id_left     geometry    index_right     id_right
0   0   POINT (0.08834 0.23203)     NaN     NaN
1   1   POINT (0.67457 0.19285)     1.0     1.0
2   2   POINT (0.71186 0.25128)     NaN     NaN
3   3   POINT (0.55621 0.35016)     NaN     NaN
4   4   POINT (0.79637 0.24668)     NaN     NaN
5   5   POINT (0.40932 0.37155)     NaN     NaN
6   6   POINT (0.36124 0.68229)     2.0     2.0
7   7   POINT (0.13476 0.58242)     NaN     NaN
8   8   POINT (0.41659 0.46298)     NaN     NaN
9   9   POINT (0.74878 0.78191)     NaN     NaN
10  10  POINT (0.82088 0.58064)     NaN     NaN
11  11  POINT (0.28797 0.24399)     NaN     NaN
12  12  POINT (0.40502 0.99233)     NaN     NaN
13  13  POINT (0.68928 0.73251)     NaN     NaN
14  14  POINT (0.37765 0.71518)     2.0     2.0
Run Code Online (Sandbox Code Playgroud)

数据

小智 0

您可以包装Polygon.contains()UDF 并在其上连接表。不过,仅在内连接中才允许将用户定义的函数作为连接条件。

创建示例数据框:

from pyspark.sql import types as T
from pyspark.sql import functions as F
from shapely.geometry import Polygon, Point
from typing import Tuple, List


# create points dataframe
point = T.StructType(
    [
        T.StructField("x", T.FloatType()),
        T.StructField("y", T.FloatType()),
    ]
)

# point table
df_pnt = spark_session.createDataFrame(
    data=[
        (0, (0.08834, 0.23203)),
        (1, (0.67457, 0.19285)),
        (2, (0.71186, 0.25128)),
    ],
    schema=T.StructType(
        [
            T.StructField("id", T.IntegerType()),
            T.StructField("point", point),
        ]
    ),
)
df_pnt.printSchema()
df_pnt.show()

# create polygon dataframe 
polygon = T.ArrayType(point)
df_plg = spark_session.createDataFrame(
    data=[
        (0, [[0.0, 0.0], [0.5, 0.0], [0.3, 0.2], [0.0, 0.2]]),
        (1, [[0.6, 0.0], [0.6, 0.3], [0.6, 0.4], [0.7, 0.2]]),
        (2, [[0.6, 0.5], [0.5, 0.5], [0.3, 0.7], [0.4, 0.8]]),
        (3, [[0.0, 0.5], [0.2, 0.4], [0.5, 0.3], [0.5, 0.1]]),
    ],
    schema=T.StructType(
        [
            T.StructField("id", T.IntegerType()),
            T.StructField("polygon", polygon),
        ]
    ),
)
df_plg.printSchema()
df_plg.show(truncate=False)
Run Code Online (Sandbox Code Playgroud)

输出:

root
 |-- id: integer (nullable = true)
 |-- point: struct (nullable = true)
 |    |-- x: float (nullable = true)
 |    |-- y: float (nullable = true)

+---+------------------+
| id|             point|
+---+------------------+
|  0|[0.08834, 0.23203]|
|  1|[0.67457, 0.19285]|
|  2|[0.71186, 0.25128]|
+---+------------------+

root
 |-- id: integer (nullable = true)
 |-- polygon: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- x: float (nullable = true)
 |    |    |-- y: float (nullable = true)

+---+------------------------------------------------+
|id |polygon                                         |
+---+------------------------------------------------+
|0  |[[0.0, 0.0], [0.5, 0.0], [0.3, 0.2], [0.0, 0.2]]|
|1  |[[0.6, 0.0], [0.6, 0.3], [0.6, 0.4], [0.7, 0.2]]|
|2  |[[0.6, 0.5], [0.5, 0.5], [0.3, 0.7], [0.4, 0.8]]|
|3  |[[0.0, 0.5], [0.2, 0.4], [0.5, 0.3], [0.5, 0.1]]|
+---+------------------------------------------------+
Run Code Online (Sandbox Code Playgroud)

根据自定义连接条件连接表:

root
 |-- id: integer (nullable = true)
 |-- point: struct (nullable = true)
 |    |-- x: float (nullable = true)
 |    |-- y: float (nullable = true)

+---+------------------+
| id|             point|
+---+------------------+
|  0|[0.08834, 0.23203]|
|  1|[0.67457, 0.19285]|
|  2|[0.71186, 0.25128]|
+---+------------------+

root
 |-- id: integer (nullable = true)
 |-- polygon: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- x: float (nullable = true)
 |    |    |-- y: float (nullable = true)

+---+------------------------------------------------+
|id |polygon                                         |
+---+------------------------------------------------+
|0  |[[0.0, 0.0], [0.5, 0.0], [0.3, 0.2], [0.0, 0.2]]|
|1  |[[0.6, 0.0], [0.6, 0.3], [0.6, 0.4], [0.7, 0.2]]|
|2  |[[0.6, 0.5], [0.5, 0.5], [0.3, 0.7], [0.4, 0.8]]|
|3  |[[0.0, 0.5], [0.2, 0.4], [0.5, 0.3], [0.5, 0.1]]|
+---+------------------------------------------------+
Run Code Online (Sandbox Code Playgroud)

输出:

+---+------------------+---+------------------------------------------------+
|id |point             |id |polygon                                         |
+---+------------------+---+------------------------------------------------+
|1  |[0.67457, 0.19285]|1  |[[0.6, 0.0], [0.6, 0.3], [0.6, 0.4], [0.7, 0.2]]|
+---+------------------+---+------------------------------------------------+


+---+------------------+---+------------------------------------------------+-----+
|id |point             |id |polygon                                         |is_in|
+---+------------------+---+------------------------------------------------+-----+
|0  |[0.08834, 0.23203]|0  |[[0.0, 0.0], [0.5, 0.0], [0.3, 0.2], [0.0, 0.2]]|false|
|0  |[0.08834, 0.23203]|1  |[[0.6, 0.0], [0.6, 0.3], [0.6, 0.4], [0.7, 0.2]]|false|
|0  |[0.08834, 0.23203]|2  |[[0.6, 0.5], [0.5, 0.5], [0.3, 0.7], [0.4, 0.8]]|false|
|0  |[0.08834, 0.23203]|3  |[[0.0, 0.5], [0.2, 0.4], [0.5, 0.3], [0.5, 0.1]]|false|
|1  |[0.67457, 0.19285]|0  |[[0.0, 0.0], [0.5, 0.0], [0.3, 0.2], [0.0, 0.2]]|false|
|1  |[0.67457, 0.19285]|1  |[[0.6, 0.0], [0.6, 0.3], [0.6, 0.4], [0.7, 0.2]]|true |
|2  |[0.71186, 0.25128]|0  |[[0.0, 0.0], [0.5, 0.0], [0.3, 0.2], [0.0, 0.2]]|false|
|2  |[0.71186, 0.25128]|1  |[[0.6, 0.0], [0.6, 0.3], [0.6, 0.4], [0.7, 0.2]]|false|
|1  |[0.67457, 0.19285]|2  |[[0.6, 0.5], [0.5, 0.5], [0.3, 0.7], [0.4, 0.8]]|false|
|1  |[0.67457, 0.19285]|3  |[[0.0, 0.5], [0.2, 0.4], [0.5, 0.3], [0.5, 0.1]]|false|
|2  |[0.71186, 0.25128]|2  |[[0.6, 0.5], [0.5, 0.5], [0.3, 0.7], [0.4, 0.8]]|false|
|2  |[0.71186, 0.25128]|3  |[[0.0, 0.5], [0.2, 0.4], [0.5, 0.3], [0.5, 0.1]]|false|
+---+------------------+---+------------------------------------------------+-----+
Run Code Online (Sandbox Code Playgroud)