在 Dataproc 上使用 Spark 进行跨账户 GCS 访问

Sha*_*kar 5 google-cloud-storage google-bigquery apache-spark google-cloud-platform google-cloud-dataproc

我正在尝试使用在帐户 B 中的 Dataproc 上运行的 Spark 将帐户 A 的 GCS 中的数据提取到帐户 B 的 BigQuery。

我试图设置GOOGLE_APPLICATION_CREDENTIALS为服务帐户密钥文件,该文件允许访问帐户 A 中的必要存储桶。但是如果我开始,spark-shell我会收到以下错误。

Exception in thread "main" java.io.IOException: Error accessing Bucket dataproc-40222d04-2c40-42f9-a5de-413a123f949d-asia-south1
Run Code Online (Sandbox Code Playgroud)

按照我的理解,设置环境变量就是将访问权限从账户B切换到账户A。

有没有办法在 Spark 中同时拥有访问权限,即对帐户 B 的默认访问权限和对帐户 A 的额外访问权限?

更新:我尝试spark-shell按照Igor's Answer 的配置运行,但错误仍然存​​在。这是我尝试过的命令和堆栈跟踪。

$ spark-shell --conf spark.hadoop.fs.gs.auth.service.account.json.keyfile=/home/shasank/watchful-origin-299914-fa29998bad08.json --jars gs://hadoop-lib/bigquery/bigquery-connector-hadoop2-latest.jar
Run Code Online (Sandbox Code Playgroud)
Exception in thread "main" java.io.IOException: Error accessing Bucket dataproc-40999d04-2b99-99f9-a5de-999ad23f949d-asia-south1
  at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getBucket(GoogleCloudStorageImpl.java:1895)
  at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getItemInfo(GoogleCloudStorageImpl.java:1846)
  at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.getFileInfoInternal(GoogleCloudStorageFileSystem.java:1125)
  at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.getFileInfo(GoogleCloudStorageFileSystem.java:1116)
  at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.exists(GoogleCloudStorageFileSystem.java:440)
  at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.configureBuckets(GoogleHadoopFileSystemBase.java:1738)
  at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.configureBuckets(GoogleHadoopFileSystem.java:76)
  at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.configure(GoogleHadoopFileSystemBase.java:1659)
  at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.initialize(GoogleHadoopFileSystemBase.java:683)
  at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.initialize(GoogleHadoopFileSystemBase.java:646)
  at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3242)
  at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:121)
  at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3291)
  at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3259)
  at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:470)
  at org.apache.spark.deploy.DependencyUtils$.org$apache$spark$deploy$DependencyUtils$$resolveGlobPath(DependencyUtils.scala:165)
  at org.apache.spark.deploy.DependencyUtils$$anonfun$resolveGlobPaths$2.apply(DependencyUtils.scala:146)
  at org.apache.spark.deploy.DependencyUtils$$anonfun$resolveGlobPaths$2.apply(DependencyUtils.scala:144)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
  at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
  at org.apache.spark.deploy.DependencyUtils$.resolveGlobPaths(DependencyUtils.scala:144)
  at org.apache.spark.deploy.SparkSubmit$$anonfun$doPrepareSubmitEnvironment$3.apply(SparkSubmit.scala:403)
  at org.apache.spark.deploy.SparkSubmit$$anonfun$doPrepareSubmitEnvironment$3.apply(SparkSubmit.scala:403)
  at scala.Option.map(Option.scala:146)
  at org.apache.spark.deploy.SparkSubmit$.doPrepareSubmitEnvironment(SparkSubmit.scala:403)
  at org.apache.spark.deploy.SparkSubmit$.prepareSubmitEnvironment(SparkSubmit.scala:250)
  at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:171)
  at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
  at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by:
com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException:
  403 Forbidden {
    "code" : 403,
    "errors" : [ {
      "domain" : "global",
      "message" : "ingestor@watchful-origin-299914.iam.gserviceaccount.com does not have storage.buckets.get access to dataproc-40999d04-2b99-99f9-a5de-999ad23f949d-asia-south1.",
      "reason" : "forbidden" } ],
    "message" : "ingestor@watchful-origin-299914.iam.gserviceaccount.com does not have storage.buckets.get access to  dataproc-40999d04-2b99-99f9-a5de-999ad23f949d-asia-south1." }
  at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:150)
  at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
  at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
  at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:401)
  at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1097)
  at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:499)
  at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:432)
  at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:549)
  at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getBucket(GoogleCloudStorageImpl.java:1889)
  ... 32 more
Run Code Online (Sandbox Code Playgroud)

Igo*_*hak 3

为此,您需要重新配置 GCS 和 BQ 连接器以使用不同的服务帐户进行身份验证,默认情况下它们都使用 GCE VM 服务帐户。

为此,请参阅GCS 连接器配置手册中的方法 2 。

相同的配置适用于 Hadoop BQ 连接器,但您需要将fs.gs.属性名称中的前缀替换为bq.mapred.前缀:

spark.hadoop.fs.gs.auth.service.account.json.keyfile=/path/to/local/gcs/key/file.json
spark.hadoop.bq.mapred.auth.service.account.json.keyfile=/path/to/local/bq/key/file.json
Run Code Online (Sandbox Code Playgroud)

更新:

要在 GCS 连接器初始化期间禁用 Dataproc 暂存存储桶检查,您需要使用最新的 GCS 连接器版本(目前为 1.9.17)并将 GCS 连接器系统存储桶属性设置为空字符串:

spark.hadoop.fs.gs.system.bucket=
Run Code Online (Sandbox Code Playgroud)

请注意,此系统存储桶功能在即将推出的 GCS 连接器 2.0 中已被删除,因此这将不再是一个问题。

  • 此外,设置“GOOGLE_APPLICATION_CREDENTIALS”可能会产生意想不到的后果,因为它不仅适用于 GCS 和 BQ 连接器,还适用于所有 Google API 客户端库。 (2认同)