Apache Beam&Dataflow的数据存储性能不佳

tpx*_*tpx 5 java google-cloud-datastore google-cloud-platform google-cloud-dataflow apache-beam

我在数据存储写入速度方面遇到了巨大的性能问题.大多数情况下它保持在100个元素/秒以下.

当使用数据存储客户端(com.google.cloud:google-cloud-datastore)在本地计算机上标记写入速度并且并行运行批量写入时,我能够实现大约2600个元素/秒的速度.

我使用Java API建立了一个简单的Apache Beam管道.这是图表:

全管道

在没有数据存储节点的情况下运行时的速度:

管线没有的数据存储写

这种方式要快得多.这一切都指向DatastoreV1.Write成为这个管道中的瓶颈 - 通过没有写节点的管道速度和DatastoreV1.Write与其他节点的挂起时间相比的挂起时间来判断.


方法我试图解决这个问题:

•增加初始工人的数量(尝试过1和10,没有明显的差异).数据存储在一段时间后(可能在前2个节点完成处理之后)将写入次数减少到1.基于此我怀疑DatastoreIO.v1().write()不会并行运行其worker.为什么呢?

管道日志工人

•确保所有内容都在同一位置运行:GCP项目,数据流管道工作者和元数据,存储 - 所有都设置为us-central.这个建议在这里

•尝试使用不同的实体密钥生成策略(根据本文).目前使用这种方法:Key.Builder keyBuilder = DatastoreHelper.makeKey("someKind", UUID.randomUUID().toString());.我不完全确定这会产生足够均匀分布的密钥,但我猜即使它不是性能也不应该这么低?


请注意,我无法使用提供的Apache Beam和Google库而没有解决方法:由于依赖性问题,我不得不强迫google-api-client版本为1.22.0&Guava为23.0(例如参见https) ://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/607).

查看DatastoreV1.Write节点日志:

数据存储日志写

它每5周左右就推动500个实体的批量生产,这不是很快.

总的来说,它看起来像DatastoreIO.v1().write()速度很慢,而且它的工作者并没有并行运行.任何想法如何解决这个或可能是什么原因?

tpx*_*tpx 5

我不应该回答这个问题.

在接触到GCP支持后,我得到了一个建议,原因可能是从压缩(gzip)文件读取的TextIO.Read节点.显然这是一个不可并行化的操作.实际上,在切换到源的未压缩文件后,性能得到改善.

建议的另一个解决方案是在从源读取后对管道进行手动重新定位.这意味着向管道中的项添加任意键,按任意键分组,然后删除任意键.它也有效.这种方法归结为以下代码:

管道代码:

pipeline.apply(TextIO.read().from("foo").withCompression(Compression.GZIP)  
        .apply(ParDo.of(new PipelineRepartitioner.AddArbitraryKey<>()))
        .apply(GroupByKey.create())
        .apply(ParDo.of(new PipelineRepartitioner.RemoveArbitraryKey<>()))
        /* further transforms */ 
Run Code Online (Sandbox Code Playgroud)

助手类:

public class PipelineRepartitioner<T> {
    public static class AddArbitraryKey<T> extends DoFn<T, KV<Integer, T>> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            c.output(KV.of(ThreadLocalRandom.current().nextInt(), c.element()));
        }
    }

    public static class RemoveArbitraryKey<T> extends DoFn<KV<Integer, Iterable<T>>, T> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            for (T s : c.element().getValue()) {
                c.output(s);
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

我已经在Apache Beam Jira上看到了与该问题相关的门票,所以这可以在将来修复.