Spark DataFrame包含Array [Double]类型的列.当我尝试在map()函数中将其返回时,它会抛出一个ClassCastException异常.以下Scala代码生成异常.
case class Dummy( x:Array[Double] )
val df = sqlContext.createDataFrame(Seq(Dummy(Array(1,2,3))))
val s = df.map( r => {
val arr:Array[Double] = r.getAs[Array[Double]]("x")
arr.sum
})
s.foreach(println)
Run Code Online (Sandbox Code Playgroud)
例外是
java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [D
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:24)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:23)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)
Cam有人解释我为什么不起作用?我该怎么做呢?我使用Spark 1.5.1和scala 2.10.6
谢谢
arrays scala classcastexception apache-spark apache-spark-sql
让我们config.json成为一个小的json文件:
{
"toto": 1
}
Run Code Online (Sandbox Code Playgroud)
我做了一个简单的代码,用于读取json文件sc.textFile(因为该文件可以在S3,本地或HDFS上,因此textFile很方便)
import org.apache.spark.{SparkContext, SparkConf}
object testAwsSdk {
def main( args:Array[String] ):Unit = {
val sparkConf = new SparkConf().setAppName("test-aws-sdk").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val json = sc.textFile("config.json")
println(json.collect().mkString("\n"))
}
}
Run Code Online (Sandbox Code Playgroud)
SBT文件仅拉取spark-core库
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.5.1" % "compile"
)
Run Code Online (Sandbox Code Playgroud)
程序按预期工作,在标准输出上写入config.json的内容.
现在我想链接aws-java-sdk,亚马逊的sdk来访问S3.
libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk" % "1.10.30" % "compile",
"org.apache.spark" %% "spark-core" % "1.5.1" % "compile"
)
Run Code Online (Sandbox Code Playgroud)
执行相同的代码,spark抛出以下异常.
Exception in thread "main" com.fasterxml.jackson.databind.JsonMappingException: Could …Run Code Online (Sandbox Code Playgroud) 在 Google Big Query 中加载以下 geojson 文件的最佳方式是什么?
http://storage.googleapis.com/velibs/stations/test.json
我在 Google Storage 上有很多这样的 json 文件(更大),我无法下载/修改/上传它们(这将需要很长时间)。请注意,该文件不是换行符分隔的,所以我猜它需要在线修改。
谢谢大家。
我在S3中的镶木地板中有一个数据集,该数据集按日期(dt)进行了划分,并且最早的日期存储在AWS Glacier中,以节省一些资金。例如,我们有...
s3://my-bucket/my-dataset/dt=2017-07-01/ [in glacier]
...
s3://my-bucket/my-dataset/dt=2017-07-09/ [in glacier]
s3://my-bucket/my-dataset/dt=2017-07-10/ [not in glacier]
...
s3://my-bucket/my-dataset/dt=2017-07-24/ [not in glacier]
Run Code Online (Sandbox Code Playgroud)
我想读取此数据集,但只读取尚未在冰川中的一部分日期,例如:
val from = "2017-07-15"
val to = "2017-08-24"
val path = "s3://my-bucket/my-dataset/"
val X = spark.read.parquet(path).where(col("dt").between(from, to))
Run Code Online (Sandbox Code Playgroud)
不幸的是,我有例外
java.io.IOException: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: The operation is not valid for the object's storage class (Service: Amazon S3; Status Code: 403; Error Code: InvalidObjectState; Request ID: C444D508B6042138)
Run Code Online (Sandbox Code Playgroud)
我似乎在Glacier中有一些分区时,spark不喜欢分区数据集。我总是可以特别地阅读每个日期,在当前日期和reduce(_ union _)末尾添加该列,但是它看起来很丑陋,它不是必须的。
有什么技巧可以读取数据存储中的可用数据,即使冰川中有旧数据也是如此?
我使用逻辑回归和spark-ml管道训练一个简单的CrossValidatorModel.我可以预测新数据,但我想超越黑盒子并对系数进行一些分析
val lr = new LogisticRegression().
setFitIntercept(true).
setMaxIter(maxIter).
setElasticNetParam(alpha).
setStandardization(true).
setFamily("binomial").
setWeightCol("weight").
setFeaturesCol("features").
setLabelCol("response")
val assembler = new VectorAssembler().
setInputCols(Array("feat1", "feat2")).
setOutputCol("features")
val modelPipeline = new Pipeline().
setStages(Array(assembler,lr))
val evaluator = new BinaryClassificationEvaluator()
.setLabelCol("response")
Run Code Online (Sandbox Code Playgroud)
然后我定义了一个参数网格,我在网格上训练以获得最佳模型和AUC
val paramGrid = new ParamGridBuilder().
addGrid(lr.regParam, lambdas).
build()
val pipeline = new CrossValidator().
setEstimator(modelPipeline).
setEvaluator(evaluator).
setEstimatorParamMaps(paramGrid).
setNumFolds(nfolds)
val cvModel = pipeline.fit(train)
Run Code Online (Sandbox Code Playgroud)
如何获得最佳逻辑回归模型的系数(beta)?
scala cross-validation logistic-regression apache-spark apache-spark-ml
让 x 是定义为(在 Scala 中)的两列字符串的数据框
case class Pair(X: String, Y: String)
val x = sqlContext.createDataFrame(Seq(
Pair("u1", "1"),
Pair("u2", "wrong value"),
Pair("u3", "5"),
Pair("u4", "2")
))
Run Code Online (Sandbox Code Playgroud)
我想清理这个数据框,使第二列的每个值都是
我在考虑使用 udf 函数
val stringToInt = udf[Int, String](x => try {
x.toInt
} catch {
case e: Exception => null
})
x.withColumn("Y", stringToInt(x("Y")))
Run Code Online (Sandbox Code Playgroud)
...但 null 不是字符串,编译器拒绝它。请问有什么解决办法?只要我可以清理我的数据框,完全不同的方法也可以
scala user-defined-functions missing-data dataframe apache-spark
apache-spark ×5
scala ×3
amazon-s3 ×1
arrays ×1
aws-java-sdk ×1
dataframe ×1
geojson ×1
jackson ×1
json ×1
load ×1
missing-data ×1
partitioning ×1