我们想要构建一个Cloud Dataflow Streaming流水线,该流水线从Pubsub接收事件,并对每个事件执行多个类似ETL的操作。这些操作之一是每个事件都有一个设备ID,需要将其转换为不同的值(我们称其为mapping -id),设备ID-> mapped-id的映射由外部服务通过REST API。同一设备ID可能会在多个事件中重复-因此,这些设备ID->映射的ID映射可以被缓存和重新使用。由于我们可能每秒通过管道最多处理每秒3M事件,因此需要尽可能减少对REST API的调用,并在实际需要调用时对其进行优化。
考虑到此设置,我有以下问题。
为了优化REST API调用,Dataflow是否提供任何内置的优化(如连接池),或者如果我们决定使用自己的此类机制,是否需要牢记任何限制/限制?
我们正在研究一些内存中缓存选项,以本地缓存映射,其中一些也由本地文件支持。因此,在不影响工作程序中常规Dataflow操作的情况下,这些缓存可以使用多少内存(占实例内存的一部分)是否有任何限制?如果我们使用文件支持的缓存,那么每个工作程序上是否都有可以由应用程序本身安全地用于构建这些文件的路径?
唯一设备ID的数量可能约为数百万个-因此并非所有这些映射都可以存储在每个实例中。因此,为了能够更好地利用本地缓存,我们需要在device-id和处理它们的工人之间获得某种亲和力。在进行此转换之前,我可以根据device-id进行分组。如果这样做,是否可以保证相同的设备ID或多或少由同一工作人员处理?如果有某种合理的保证,那么除了第一次调用就可以了,我大多数时候都不必使用外部REST API。还是有更好的方法来确保ID与工作人员之间的这种亲和力。
谢谢
我的 Spark 驱动程序在运行大约 10 小时后内存不足,并出现错误Exception in thread "dispatcher-event-loop-17" java.lang.OutOfMemoryError: GC overhead limit exceeded。为了进一步调试,我启用了 G1GC 模式以及 GC 日志选项spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j.properties -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:+UseG1GC -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp
,但看起来它没有对驱动程序生效。
10 小时后,该作业再次卡在驱动程序上,并且我在驱动程序节点的 stdout 下没有看到任何 GC 日志/var/log/hadoop-yar/userlogs/[application-id]/[container-id]/stdout- 所以不知道还能在哪里查看。根据Spark GC 调优文档,这些设置似乎只发生在工作节点上(在本例中我可以看到,在我使用了相同的配置后,工作节点在 stdout 中有 GC 日志spark.executor.extraJavaOptions)。是否有办法从驱动程序启用/获取 GC 日志?在 Spark UI -> 环境下,我看到下面列出了这些选项,spark.driver.extraJavaOptions这就是我认为它可以工作的原因。
环境:
集群在 Google Dataproc 上运行,我使用/usr/bin/spark-submit --master yarn --deploy-mode cluster ...master 来提交作业。
编辑在命令
期间为驱动程序设置相同的选项spark-submit,我可以在驱动程序的标准输出上看到 GC 日志。只是由于某种原因,通过 SparkConf …