小编wdz*_*wdz的帖子

Spark应用程序 - Java.lang.OutOfMemoryError:Java堆空间

我使用的是Spark Standalone单机,128G内存和32个内核.以下是我认为与我的问题相关的设置:

spark.storage.memoryFraction     0.35
spark.default.parallelism        50
spark.sql.shuffle.partitions     50
Run Code Online (Sandbox Code Playgroud)

我有一个Spark应用程序,其中有一个1000个设备的循环.对于每个循环(设备),它准备特征向量,然后调用MLLib的k-Means.在循环的第25到第30次迭代(处理第25到第30个设备)时,它遇到"Java.lang.OutOfMemoryError:Java堆空间"的错误.

我尝试将memoryFraction从0.7增加到0.35,但它没有帮助.我也尝试并行/分区到200没有运气.JVM选项为"-Xms25G -Xmx25G -XX:MaxPermSize = 512m".我的数据大小只有2G左右.

这是堆栈跟踪:

java.lang.OutOfMemoryError: Java heap space
  at java.util.Arrays.copyOf(Arrays.java:2271)
  at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
  at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
  at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
  at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1841)
  at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1533)
  at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
  at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
  at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
  at scala.collection.mutable.HashMap$$anonfun$writeObject$1.apply(HashMap.scala:138)
  at scala.collection.mutable.HashMap$$anonfun$writeObject$1.apply(HashMap.scala:136)
  at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
  at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
  at scala.collection.mutable.HashTable$class.serializeTo(HashTable.scala:125)
  at scala.collection.mutable.HashMap.serializeTo(HashMap.scala:40)
  at scala.collection.mutable.HashMap.writeObject(HashMap.scala:136)
  at sun.reflect.GeneratedMethodAccessor116.invoke(Unknown Source)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:606)
  at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
  at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
  at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
  at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
  at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
  at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
  at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
  at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) …
Run Code Online (Sandbox Code Playgroud)

java jvm heap-memory out-of-memory apache-spark

7
推荐指数
2
解决办法
2万
查看次数

如何在Spark中并行读写多个表?

在我的Spark应用程序中,我尝试从RDBMS读取多个表,进行一些数据处理,然后将多个表写入另一个RDBMS,如下所示(在Scala中):

val reading1 = sqlContext.load("jdbc", Map("url" -> myurl1, "dbtable" -> mytable1))
val reading2 = sqlContext.load("jdbc", Map("url" -> myurl1, "dbtable" -> mytable2))
val reading3 = sqlContext.load("jdbc", Map("url" -> myurl1, "dbtable" -> mytable3))

// data processing
// ..............

myDF1.write.mode("append").jdbc(myurl2, outtable1, new java.util.Properties)
myDF2.write.mode("append").jdbc(myurl2, outtable2, new java.util.Properties)
myDF3.write.mode("append").jdbc(myurl2, outtable3, new java.util.Properties)
Run Code Online (Sandbox Code Playgroud)

我了解可以使用分区并行读取一个表。但是,read1,read2,read3的读操作似乎是顺序的,myDF1,myDF2,myDF3的写操作也是如此。

如何并行读取多个表(mytable1,mytable2,mytable3)?并且还并行写入多个表(我认为逻辑相同)?

parallel-processing scala apache-spark apache-spark-sql

6
推荐指数
1
解决办法
4021
查看次数

如何在 Scala 中使用 Flink 的 KafkaSource?

我正在尝试使用 Flink 的 KafkaSource 运行一个简单的测试程序。我正在使用以下内容:

  • 弗林克 0.9
  • 斯卡拉 2.10.4
  • 卡夫卡 0.8.2.1

我按照此处此处所述的文档来测试 KafkaSource(添加了依赖项,将 Kafka 连接器 flink-connector-kafka 捆绑在插件中)。

下面是我的简单测试程序:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka

object TestKafka {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env
     .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))
     .print
  }
}
Run Code Online (Sandbox Code Playgroud)

但是,编译总是抱怨找不到 KafkaSource:

[ERROR] TestKafka.scala:8: error: not found: type KafkaSource
[ERROR]     .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))
Run Code Online (Sandbox Code Playgroud)

我在这里想念什么?

scala apache-kafka apache-flink

5
推荐指数
1
解决办法
2055
查看次数

Spark程序性能——GC&任务反序列化&并发执行

我有一个4台机器的集群,1台master,3台worker,每台128G内存,64个核心。我在独立模式下使用 Spark 1.5.0。我的程序使用 JDBC 从 Oracle 表中读取数据,然后执行 ETL、操作数据,并执行 k-means 等机器学习任务。

我有一个 DataFrame (myDF.cache()),它是与其他两个 DataFrame 的连接结果,并进行缓存。DataFrame包含2700万行,数据大小约为1.5G。我需要过滤数据并计算24直方图,如下:

val h1 = myDF.filter("pmod(idx, 24) = 0").select("col1").histogram(arrBucket) 
val h2 = myDF.filter("pmod(idx, 24) = 1").select("col1").histogram(arrBucket) 
// ...... 
val h24 = myDF.filter("pmod(idx, 24) = 23").select("col1").histogram(arrBucket) 
Run Code Online (Sandbox Code Playgroud)

问题:

  1. 由于我的 DataFrame 已被缓存,因此我希望过滤器、选择和直方图非常快。但每次计算的实际时间约为7秒,这是不可接受的。从 UI 来看,GC 时间需要 5 秒,任务反序列化时间需要 4 秒。我尝试了不同的 JVM 参数,但无法进一步改进。现在我正在使用

    -Xms25G -Xmx25G -XX:MaxPermSize=512m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 \
    -XX:ParallelGCThreads=32 \
    -XX:ConcGCThreads=8 -XX:InitiatingHeapOccupancyPercent=70 
    
    Run Code Online (Sandbox Code Playgroud)

让我困惑的是,数据的大小与可用内存相比根本不算什么。为什么每次运行过滤器/选择/直方图时都会启动 GC?有没有办法减少GC时间和任务反序列化时间?

  1. 我必须对 h[1-24] 进行并行计算,而不是顺序计算。我尝试了未来,类似:

    -Xms25G -Xmx25G -XX:MaxPermSize=512m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 \
    -XX:ParallelGCThreads=32 \
    -XX:ConcGCThreads=8 -XX:InitiatingHeapOccupancyPercent=70 
    
    Run Code Online (Sandbox Code Playgroud)

问题是,这里的 Future 仅意味着作业几乎同时提交给调度程序,而不是它们最终被调度并同时运行。这里使用 Future …

garbage-collection scala concurrent-programming apache-spark apache-spark-sql

5
推荐指数
1
解决办法
2165
查看次数

任何人都可以在Scala中共享Flink Kafka示例吗?

任何人都可以在Scala中分享Flink Kafka(主要是从Kafka接收消息)的工作示例吗?我知道Spark中有一个KafkaWordCount示例.我只需要在Flink打印出Kafka消息.这真的很有帮助.

scala apache-kafka apache-flink

4
推荐指数
1
解决办法
2956
查看次数

Spark 性能 - 如何并行化大循环?

我有一个包含 8000 个循环的 Spark 应用程序,它在 5 个节点的集群上运行。每个节点有 125GB 内存和 32 个内核。相关代码如下所示:

for (m <- 0 until deviceArray.size) { // there are 1000 device 
  var id = deviceArray(m)

  for (t <- 1 to timePatterns) { // there are 8 time patterns
     var hrpvData = get24HoursPVF(dataDF, id, t).cache()

  var hrpvDataZI = hrpvData.zipWithIndex

  var clustersLSD = runKMeans(hrpvData, numClusters, numIterations)

  var clusterPVPred = hrpvData.map(x => clustersLSD.predict(x))
  var clusterPVMap = hrpvDataZI.zip(clusterPVPred)

  var pvhgmRDD = clusterPVMap.map{r => (r._2, r._1._2)}.groupByKey

  var arrHGinfo = pvhgmRDD.collect 

  // Post process …
Run Code Online (Sandbox Code Playgroud)

parallel-processing performance scala apache-spark

2
推荐指数
1
解决办法
2840
查看次数