Ran*_*nty 5 scala apache-spark
(使用本地计算机上的官方网站上的spark-2.1.0-bin-hadoop2.7版本)
当我在spark-shell中执行一个简单的spark命令时,在抛出错误之前,它开始打印出数千行代码。这些“代码”是什么?
我在本地计算机上运行spark。我运行的命令很简单df.count,df即DataFrame。
请查看下面的屏幕截图(代码飞速飞快,我只能截取屏幕截图以查看发生了什么)。更多详细信息在图像下方。
更多细节:
我创建的数据帧df由
val df: DataFrame = spark.createDataFrame(rows, schema)
// rows: RDD[Row]
// schema: StructType
// There were about 3000 columns and 700 rows (testing set) of data in df.
// The following line ran successfully and returned the correct value
rows.count
// The following line threw exception after printing out tons of codes as shown in the screenshot above
df.count
Run Code Online (Sandbox Code Playgroud)
在“代码”之后引发的异常是:
...
/* 181897 */ apply_81(i);
/* 181898 */ result.setTotalSize(holder.totalSize());
/* 181899 */ return result;
/* 181900 */ }
/* 181901 */ }
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:889)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:941)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:938)
at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
... 29 more
Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method "(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass;[Ljava/lang/Object;)V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection" grows beyond 64 KB
at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941)
at org.codehaus.janino.CodeContext.write(CodeContext.java:854)
at org.codehaus.janino.CodeContext.writeShort(CodeContext.java:959)
Run Code Online (Sandbox Code Playgroud)
编辑:正如@TzachZohar所指出的,这看起来像是已修复但并未从spark项目发布的已知错误(https://issues.apache.org/jira/browse/SPARK-16845)之一。
我拉动了spark master,从源头上构建了它,然后重试了我的示例。现在,在生成的代码之后出现了一个新的异常:
/* 308608 */ apply_1560(i);
/* 308609 */ apply_1561(i);
/* 308610 */ result.setTotalSize(holder.totalSize());
/* 308611 */ return result;
/* 308612 */ }
/* 308613 */ }
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:941)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:998)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:995)
at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
... 29 more
Caused by: org.codehaus.janino.JaninoRuntimeException: Constant pool for class org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection has grown past JVM limit of 0xFFFF
at org.codehaus.janino.util.ClassFile.addToConstantPool(ClassFile.java:499)
Run Code Online (Sandbox Code Playgroud)
似乎请求请求正在解决第二个问题:https : //github.com/apache/spark/pull/16648
Jak*_*ke 1
这是一个错误。它与 JVM 上生成的运行时代码有关。所以Scala团队似乎很难解决。(关于 JIRA 有很多讨论)。
我在进行行操作时发生了错误。即使 700 行数据帧上的 df.head() 也会导致异常。
我的解决方法是将数据帧转换为稀疏数据 RDD(即 RDD[LabeledPoint])并在 RDD 上运行行操作。它速度更快,内存效率更高。但是,它仅适用于数字数据。分类变量(因子、目标等)需要转换为 Double。
也就是说,我自己是 Scala 新手,所以我的代码可能有点业余。但它有效。
创建行
@throws(classOf[Exception])
private def convertRowToLabeledPoint(rowIn: Row, fieldNameSeq: Seq[String], label: Int): LabeledPoint =
{
try
{
logger.info(s"fieldNameSeq $fieldNameSeq")
val values: Map[String, Long] = rowIn.getValuesMap(fieldNameSeq)
val sortedValuesMap = ListMap(values.toSeq.sortBy(_._1): _*)
//println(s"convertRowToLabeledPoint row values ${sortedValuesMap}")
print(".")
val rowValuesItr: Iterable[Long] = sortedValuesMap.values
var positionsArray: ArrayBuffer[Int] = ArrayBuffer[Int]()
var valuesArray: ArrayBuffer[Double] = ArrayBuffer[Double]()
var currentPosition: Int = 0
rowValuesItr.foreach
{
kv =>
if (kv > 0)
{
valuesArray += kv.toDouble;
positionsArray += currentPosition;
}
currentPosition = currentPosition + 1;
}
new LabeledPoint(label, org.apache.spark.mllib.linalg.Vectors.sparse(positionsArray.size, positionsArray.toArray, valuesArray.toArray))
}
catch
{
case ex: Exception =>
{
throw new Exception(ex)
}
}
}
private def castColumnTo(df: DataFrame, cn: String, tpe: DataType): DataFrame =
{
//println("castColumnTo")
df.withColumn(cn, df(cn).cast(tpe)
)
}
Run Code Online (Sandbox Code Playgroud)
提供 Dataframe 并返回 RDD LabeledPOint
@throws(classOf[Exception])
def convertToLibSvm(spark:SparkSession,mDF : DataFrame, targetColumnName:String): RDD[LabeledPoint] =
{
try
{
val fieldSeq: scala.collection.Seq[StructField] = mDF.schema.fields.toSeq.filter(f => f.dataType == IntegerType || f.dataType == LongType)
val fieldNameSeq: Seq[String] = fieldSeq.map(f => f.name)
val indexer = new StringIndexer()
.setInputCol(targetColumnName)
.setOutputCol(targetColumnName+"_Indexed")
val mDFTypedIndexed = indexer.fit(mDF).transform(mDF).drop(targetColumnName)
val mDFFinal = castColumnTo(mDFTypedIndexed, targetColumnName+"_Indexed", IntegerType)
//mDFFinal.show()
//only doubles accepted by sparse vector, so that's what we filter for
var positionsArray: ArrayBuffer[LabeledPoint] = ArrayBuffer[LabeledPoint]()
mDFFinal.collect().foreach
{
row => positionsArray += convertRowToLabeledPoint(row, fieldNameSeq, row.getAs(targetColumnName+"_Indexed"));
}
spark.sparkContext.parallelize(positionsArray.toSeq)
}
catch
{
case ex: Exception =>
{
throw new Exception(ex)
}
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1087 次 |
| 最近记录: |