boto3 无法在 pyspark worker 上创建客户端?

Emm*_*day 4 python boto3 pyspark

我正在尝试使用 boto3 与 AWS 对话,将数据从 Pyspark RDD 的工作人员发送到 SQS 队列。我需要直接从分区发送数据,而不是收集 RDD 并从驱动程序发送数据。

我可以通过本地的 boto3 和 Spark 驱动程序向 SQS 发送消息;此外,我可以导入 boto3 并在分区上创建 boto3 会话。但是,当我尝试从分区创建客户端或资源时,我收到错误消息。我相信 boto3 没有正确创建客户端,但我不完全确定这一点。我的代码如下所示:

def get_client(x):   #the x is required to use pyspark's mapPartitions
    import boto3
    client = boto3.client('sqs', region_name="us-east-1", aws_access_key_id="myaccesskey", aws_secret_access_key="mysecretaccesskey")
    return x

rdd_with_client = rdd.mapPartitions(get_client)
Run Code Online (Sandbox Code Playgroud)

错误:

DataNotFoundError: Unable to load data for: endpoints
Run Code Online (Sandbox Code Playgroud)

更长的回溯:

File "<stdin>", line 4, in get_client
  File "./rebuilt.zip/boto3/session.py", line 250, in client
    aws_session_token=aws_session_token, config=config)
  File "./rebuilt.zip/botocore/session.py", line 810, in create_client
    endpoint_resolver = self.get_component('endpoint_resolver')
  File "./rebuilt.zip/botocore/session.py", line 691, in get_component
    return self._components.get_component(name)
  File "./rebuilt.zip/botocore/session.py", line 872, in get_component
    self._components[name] = factory()
  File "./rebuilt.zip/botocore/session.py", line 184, in create_default_resolver
    endpoints = loader.load_data('endpoints')
  File "./rebuilt.zip/botocore/loaders.py", line 123, in _wrapper
    data = func(self, *args, **kwargs)
  File "./rebuilt.zip/botocore/loaders.py", line 382, in load_data
    raise DataNotFoundError(data_path=name)
DataNotFoundError: Unable to load data for: endpoints
Run Code Online (Sandbox Code Playgroud)

我还尝试修改我的函数以创建资源而不是显式客户端,以查看它是否可以找到并使用默认客户端设置。在这种情况下,我的代码是:

def get_resource(x):
    import boto3
    sqs = boto3.resource('sqs', region_name="us-east-1", aws_access_key_id="myaccesskey", aws_secret_access_key="mysecretaccesskey")
    return x

rdd_with_client = rdd.mapPartitions(get_resource)
Run Code Online (Sandbox Code Playgroud)

我收到一个指向 has_low_level_client 参数的错误,这是因为客户端不存在而触发的;回溯说:

File "/usr/lib/spark/python/pyspark/rdd.py", line 2253, in pipeline_func
  File "/usr/lib/spark/python/pyspark/rdd.py", line 270, in func
  File "/usr/lib/spark/python/pyspark/rdd.py", line 689, in func
  File "<stdin>", line 4, in session_resource
  File "./rebuilt.zip/boto3/session.py", line 329, in resource
    has_low_level_client)
ResourceNotExistsError: The 'sqs' resource does not exist.
The available resources are:
   -
Run Code Online (Sandbox Code Playgroud)

没有可用的资源,因为我认为没有客户可以容纳它们。

几天来我一直在用头撞这个。任何帮助表示赞赏!

Tom*_*ang 7

这是因为您将 boto3 捆绑包作为 zip 文件。

“./rebuilt.zip/boto3”

boto3 初始化的作用是下载一堆文件并将其保存在分发文件夹中。因为您的 boto3 位于一个 zip 包中,所以显然这些文件将无法到达那里。

解决方案是,与其将 boto3 分发到 zip 文件中,不如将 boto3 安装在 Spark 环境中。在这里要小心,您可能希望在主节点和工作节点上都安装 boto3,这取决于您实现应用程序的方式。安全的赌注是安装在两者上。

如果您使用的是 EMR,则可以使用 bootstrap 步骤来完成。