使用 Jupyter Notebook 中的 PySpark 从 AWS EMR 集群读取存储在 AWS S3 中的解压缩 Shapefile

adi*_*ari 5 gis amazon-s3 shapefile python-3.x pyspark

我对 AWS EMR 和 apache Spark 完全陌生。我正在尝试使用 shapefile 将 GeoID 分配给住宅物业。我无法从我的 s3 存储桶中读取 shapefile。请帮助我了解发生了什么,因为我在互联网上找不到任何解释确切问题的答案。

<!-- language: python 3.4 -->

import shapefile
import pandas as pd

def read_shapefile(shp_path):

"""
Read a shapefile into a Pandas dataframe with a 'coords' column holding
the geometry information. This uses the pyshp package
"""
    #read file, parse out the records and shapes
    sf = shapefile.Reader(shp_path)
    fields = [x[0] for x in sf.fields][1:]
    records = sf.records()
    shps = [s.points for s in sf.shapes()]
    center = [shape(s).centroid.coords[0] for s in sf.shapes()]

    #write into a dataframe
    df = pd.DataFrame(columns=fields, data=records)
    df = df.assign(coords=shps, centroid=center)

    return df

read_shapefile("s3a://uim-raw-datasets/census-bureau/tabblock-2010/tabblock-by-fips/tl_2010_01001_tabblock10")
Run Code Online (Sandbox Code Playgroud)

我想阅读的文件

我从存储桶中读取数据时遇到的错误

我真的很想在 AWS EMR 集群中读取这些 shapefile,因为我不可能在本地单独处理它们。任何形式的帮助表示赞赏。

adi*_*ari 3

我能够从 s3 存储桶中将形状文件作为二进制对象读取,然后围绕它构建一个包装函数,最后将各个文件对象解析为 .dbf、.shp 、.shx 格式的 shapefile.reader() 方法分别地。

发生这种情况是因为 PySpark 无法读取 SparkContext 中未提供的格式。发现此链接很有帮助Using pyshp to read a file-like object from a zipped archive

我的解决方案

def read_shapefile(shp_path):

    import io
    import shapefile

    blocks = sc.binaryFiles(shp_path)
    block_dict = dict(blocks.collect())

    sf = shapefile.Reader(shp=io.BytesIO(block_dict[[i for i in block_dict.keys() if i.endswith(".shp")][0]]),
                              shx=io.BytesIO(block_dict[[i for i in block_dict.keys() if i.endswith(".shx")][0]]),
                              dbf=io.BytesIO(block_dict[[i for i in block_dict.keys() if i.endswith(".dbf")][0]]))

    fields = [x[0] for x in sf.fields][1:]
    records = sf.records()
    shps = [s.points for s in sf.shapes()]
    center = [shape(s).centroid.coords[0] for s in sf.shapes()]

    #write into a dataframe
    df = pd.DataFrame(columns=fields, data=records)
    df = df.assign(coords=shps, centroid=center)

    return df
block_shapes = read_shapefile("s3a://uim-raw-datasets/census-bureau/tabblock-2010/tabblock-by-fips/tl_2010_01001_tabblock10*")
Run Code Online (Sandbox Code Playgroud)

这工作正常,不会破坏。