如何在Apache Beam项目中直接使用google-cloud-storage

Iva*_*vin 4 java google-cloud-storage google-cloud-dataflow apache-beam

我们正在开发Apache Beam项目(版本2.4.0),我们还希望通过google-cloud-storage API直接使用存储桶.但是,将一些波束依赖性与云存储相结合,会导致难以解决的依赖性问题.

我们看到Beam 2.4.0依赖于云存储1.22.0,所以这就是我们在下面使用它的原因.我们遇到了与1.27.0相同的问题.以下pom.xml指定了我们在项目中使用的四个梁依赖关系,其中最后两个导致问题.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.bol</groupId>
    <artifactId>beam-plus-storage</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <beam.version>2.4.0</beam.version>
    </properties>

    <dependencies>
        <!-- These first two dependencies do not clash -->
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-direct-java</artifactId>
            <version>${beam.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-extensions-join-library</artifactId>
            <version>${beam.version}</version>
        </dependency>
        <!-- This one leads to java.lang.ClassNotFoundException: com.google.api.gax.rpc.HeaderProvider -->
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
            <version>${beam.version}</version>
        </dependency>
        <!-- This one leads to java.lang.NoSuchMethodError: com.google.api.services.storage.Storage$Objects$List.setUserProject(...) -->
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
            <version>${beam.version}</version>
        </dependency>

        <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>google-cloud-storage</artifactId>
            <version>1.22.0</version>
        </dependency>

    </dependencies>
</project>
Run Code Online (Sandbox Code Playgroud)

以下是存储API的最小工作/损坏用法,列出了公共存储桶中的文件.

import com.google.api.gax.paging.Page;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;

public class CloudStorageReader {

    public static void main(String[] args) {
        Storage storage = StorageOptions.getDefaultInstance().getService();
        Page<Blob> list = storage.list("gcp-public-data-landsat", Storage.BlobListOption.currentDirectory(), Storage.BlobListOption.prefix("LC08/PRE/044/034/LC80440342016259LGN00/"));
        for (Blob blob : list.getValues()) {
            System.out.println(blob);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

删除最后两个依赖项时,列出存储桶的内容可以正常工作.使用java-io beam依赖项时,找不到HeaderProvider类.由于数据流依赖性,找不到setUserProject方法.请参阅pom中的注释以获取完整的错误消息.

我们花了很多时间来修复HeaderProvider错误,这是导入所有四个波束依赖关系时出现的错误.我们为冲突依赖项添加了显式导入,并在梁导入中添加了排除.每次我们添加显式依赖项时,都会弹出另一个相关问题.我们尝试了maven阴影,由于我们项目的包装,这不是那么实用,所以从来没有让它起作用.

最后,我们为云存储交互创建了一个单独的子模块+ jar,为我们的打包/运行带来了更多的复杂性.

最后一点,我们在尝试使用BigQuery API时遇到了同样的问题,但通过重用包私有光束代码解决了这个问题.

如果某人确实有(相对简单的)方法让这些库协同工作,或者确认这确实是一个具有挑战性的依赖性问题,可能需要在Apache Beam中解决,那将是非常棒的.

Rya*_*ell 6

您可以利用Beam包含的FileSystems API列出存储桶中的存储桶,读/写文件和删除对象,而不是包含单独的云存储依赖项.下面是一个示例,其中列出了存储桶下的所有文件,然后将其中一个文件读入字符串.

// Set the default pipeline options so the various filesystems are
// loaded into the registry. This shouldn't be necessary if used
// within a pipeline.
FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());

// List Bucket
MatchResult listResult = FileSystems.match("gs://filesystems-demo/**/*");
listResult
    .metadata()
    .forEach(
        metadata -> {
          ResourceId resourceId = metadata.resourceId();
          System.out.println(resourceId.toString());
        });


// Read file
ResourceId existingFileResourceId = FileSystems
    .matchSingleFileSpec("gs://filesystems-demo/test-file1.csv")
    .resourceId();

try (ByteArrayOutputStream out = new ByteArrayOutputStream();
    ReadableByteChannel readerChannel = FileSystems.open(existingFileResourceId);
    WritableByteChannel writerChannel = Channels.newChannel(out)) {
  ByteStreams.copy(readerChannel, writerChannel);

  System.out.println("File contents: \n" + out.toString());
}


// Write file
String contentToWrite = "Laces out Dan!";

ResourceId newFileResourceId = FileSystems
    .matchNewResource("gs://filesystems-demo/new-file.txt", false);

try (ByteArrayInputStream in = new ByteArrayInputStream(contentToWrite.getBytes());
    ReadableByteChannel readerChannel = Channels.newChannel(in);
    WritableByteChannel writerChannel = FileSystems.create(newFileResourceId, MimeTypes.TEXT)) {

  ByteStreams.copy(readerChannel, writerChannel);
}
Run Code Online (Sandbox Code Playgroud)

  • 虽然此解决方法适用于读取和列出文件,但不适用于创建文件。这是由于 [GcsResourceId.fromGcsPath()](https://github.com/apache/beam/blob/master/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/ apache/beam/sdk/extensions/gcp/storage/GcsResourceId.java#L37) 是包私有的,并且 [构造函数](https://github.com/apache/beam/blob/master/sdks/java/extensions/ google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceId.java#L42) 是私有的。最好的解决方案是解决依赖问题。 (2认同)
  • 我添加了一个使用 FileSystems API 写入 GCS 的示例。:) 您不需要直接使用 GcsResourceId 或 GcsFileSystem,Beam 将使用基于 uri 方案的正确实现(例如 gs://)。这更灵活,因为您可以在测试中使用本地文件,而无需将一堆模拟组合在一起。如果您确实需要读取、写入、删除、复制和重命名之外的内容,我只建议包含其他依赖项。否则,SDK 已经拥有您需要的一切。 (2认同)