大数据集缓存中的"GC开销限制超出"到火花存储器中(通过sparklyr和RStudio)

ren*_*key 7 r cassandra apache-spark sparklyr

我对我正在尝试使用的大数据技术非常陌生,但到目前为止,我已设法在RStudio中设置sparklyr以连接到独立的Spark集群.数据存储在Cassandra中,我可以成功地将大型数据集引入Spark内存(缓存)以对其进行进一步分析.

然而,最近我在将一个特别大的数据集引入Spark内存时遇到了很多麻烦,即使集群应该有足够的资源(60个内核,200GB RAM)来处理其大小的数据集.

我认为,通过限制数据被缓存到的利益,我可以克服这个问题(使用我以前的查询答案代码短短选择列在这里),但事实并非如此.发生的事情是我的本地机器上的jar进程加速占用所有本地RAM和CPU资源,整个进程冻结,并且群集执行程序不断被删除并重新添加.奇怪的是,即使我只选择1行进行缓存(这应该使这个数据集比我没有问题缓存到Spark内存中的其他数据集小得多)也会发生这种情况.

我已经查看了日志,这些似乎是过程中早期唯一的信息性错误/警告:

17/03/06 11:40:27 ERROR TaskSchedulerImpl: Ignoring update with state FINISHED for TID 33813 because its task set is gone (this is likely the result of receiving duplicate task finished status updates) or its executor has been marked as failed.
17/03/06 11:40:27 INFO DAGScheduler: Resubmitted ShuffleMapTask(0, 8167), so marking it as still running
...
17/03/06 11:46:59 WARN TaskSetManager: Lost task 3927.3 in stage 0.0 (TID 54882, 213.248.241.186, executor 100): ExecutorLostFailure (executor 100 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 167626 ms
17/03/06 11:46:59 INFO DAGScheduler: Resubmitted ShuffleMapTask(0, 3863), so marking it as still running
17/03/06 11:46:59 WARN TaskSetManager: Lost task 4300.3 in stage 0.0 (TID 54667, 213.248.241.186, executor 100): ExecutorLostFailure (executor 100 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 167626 ms
17/03/06 11:46:59 INFO DAGScheduler: Resubmitted ShuffleMapTask(0, 14069), so marking it as still running
Run Code Online (Sandbox Code Playgroud)

然后在20分钟左右后,整个工作崩溃了:

java.lang.OutOfMemoryError: GC overhead limit exceeded
Run Code Online (Sandbox Code Playgroud)

我已经更改了我的连接配置以增加心跳间隔(spark.executor.heartbeatInterval: '180s'),并且已经了解了如何通过更改纱线群集(使用spark.yarn.executor.memoryOverhead)上的设置来增加memoryOverhead ,而不是在独立群集上.

在我的配置文件中,我通过一次添加以下每个设置进行了实验(其中没有一个有效):

spark.memory.fraction: 0.3
spark.executor.extraJavaOptions: '-Xmx24g'
spark.driver.memory: "64G"
spark.driver.extraJavaOptions: '-XX:MaxHeapSize=1024m'
spark.driver.extraJavaOptions: '-XX:+UseG1GC'
Run Code Online (Sandbox Code Playgroud)

更新:我的完整当前yml配置文件如下:

default:
# local settings
  sparklyr.sanitize.column.names: TRUE
  sparklyr.cores.local: 3
  sparklyr.shell.driver-memory: "8G"

# remote core/memory settings
  spark.executor.memory: "32G"
  spark.executor.cores: 5
  spark.executor.heartbeatInterval: '180s'
  spark.ext.h2o.nthreads: 10
  spark.cores.max: 30
  spark.memory.storageFraction: 0.6
  spark.memory.fraction: 0.3
  spark.network.timeout: 300
  spark.driver.extraJavaOptions: '-XX:+UseG1GC'

# other configs for spark
  spark.serializer: org.apache.spark.serializer.KryoSerializer
  spark.executor.extraClassPath: /var/lib/cassandra/jar/guava-18.0.jar

# cassandra settings
  spark.cassandra.connection.host: <cassandra_ip>
  spark.cassandra.auth.username: <cassandra_login>
  spark.cassandra.auth.password: <cassandra_pass>
  spark.cassandra.connection.keep_alive_ms: 60000

# spark packages to load
  sparklyr.defaultPackages: 
  - "com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M1"
  - "com.databricks:spark-csv_2.11:1.3.0"
  - "com.datastax.cassandra:cassandra-driver-core:3.0.2"
  - "com.amazonaws:aws-java-sdk-pom:1.10.34"
Run Code Online (Sandbox Code Playgroud)

所以我的问题是:

  1. 有没有人对在这个例子中做什么有任何想法?
  2. 是否有配置设置我可以更改以帮助解决此问题?
  3. 或者,有没有办法以RStudio/sparklyr作为驱动程序批量导入cassandra数据?
  4. 或者另外,有没有办法在数据进入缓存时进行munge /过滤/编辑数据,以便生成的表更小(类似于使用SQL查询,但使用更复杂的dplyr语法)?

ren*_*key 1

好的,我终于成功了!

我最初尝试了 @user6910411 的建议来减少 cassandra 输入分割大小,但这以同样的方式失败了。在尝试了很多其他事情之后,今天我尝试朝相反的方向更改该设置:

spark.cassandra.input.split.size_in_mb: 254 
Run Code Online (Sandbox Code Playgroud)

通过增加分割大小,可以减少 Spark 任务,从而减少开销并减少对 GC 的调用。有效!