小编Bor*_*ris的帖子

Spark中的Access Array列

Spark DataFrame包含Array [Double]类型的列.当我尝试在map()函数中将其返回时,它会抛出一个ClassCastException异常.以下Scala代码生成异常.

case class Dummy( x:Array[Double] )
val df = sqlContext.createDataFrame(Seq(Dummy(Array(1,2,3))))
val s = df.map( r => {
   val arr:Array[Double] = r.getAs[Array[Double]]("x")
   arr.sum
})
s.foreach(println)
Run Code Online (Sandbox Code Playgroud)

例外是

java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [D
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:24)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:23)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

Cam有人解释我为什么不起作用?我该怎么做呢?我使用Spark 1.5.1和scala 2.10.6

谢谢

arrays scala classcastexception apache-spark apache-spark-sql

10
推荐指数
1
解决办法
6296
查看次数

与aws-java-sdk链接时,在读取json文件时发生Spark崩溃

让我们config.json成为一个小的json文件:

{
    "toto": 1
}
Run Code Online (Sandbox Code Playgroud)

我做了一个简单的代码,用于读取json文件sc.textFile(因为该文件可以在S3,本地或HDFS上,因此textFile很方便)

import org.apache.spark.{SparkContext, SparkConf}

object testAwsSdk {
  def main( args:Array[String] ):Unit = {
    val sparkConf = new SparkConf().setAppName("test-aws-sdk").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val json = sc.textFile("config.json") 
    println(json.collect().mkString("\n"))
  }
}
Run Code Online (Sandbox Code Playgroud)

SBT文件仅拉取spark-core

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.5.1" % "compile"
)
Run Code Online (Sandbox Code Playgroud)

程序按预期工作,在标准输出上写入config.json的内容.

现在我想链接aws-java-sdk,亚马逊的sdk来访问S3.

libraryDependencies ++= Seq(
  "com.amazonaws" % "aws-java-sdk" % "1.10.30" % "compile",
  "org.apache.spark" %% "spark-core" % "1.5.1" % "compile"
)
Run Code Online (Sandbox Code Playgroud)

执行相同的代码,spark抛出以下异常.

Exception in thread "main" com.fasterxml.jackson.databind.JsonMappingException: Could …
Run Code Online (Sandbox Code Playgroud)

jackson apache-spark aws-java-sdk

8
推荐指数
1
解决办法
3585
查看次数

在bigquery中加载geojson

在 Google Big Query 中加载以下 geojson 文件的最佳方式是什么?

http://storage.googleapis.com/velibs/stations/test.json

我在 Google Storage 上有很多这样的 json 文件(更大),我无法下载/修改/上传它们(这将需要很长时间)。请注意,该文件不是换行符分隔的,所以我猜它需要在线修改。

谢谢大家。

json load geojson google-bigquery

7
推荐指数
1
解决办法
3847
查看次数

在冰川中部分地触发读取S3中的分区数据

我在S3中的镶木地板中有一个数据集,该数据集按日期(dt)进行了划分,并且最早的日期存储在AWS Glacier中,以节省一些资金。例如,我们有...

s3://my-bucket/my-dataset/dt=2017-07-01/    [in glacier]
...
s3://my-bucket/my-dataset/dt=2017-07-09/    [in glacier]
s3://my-bucket/my-dataset/dt=2017-07-10/    [not in glacier]
...
s3://my-bucket/my-dataset/dt=2017-07-24/    [not in glacier]
Run Code Online (Sandbox Code Playgroud)

我想读取此数据集,但只读取尚未在冰川中的一部分日期,例如:

val from = "2017-07-15"
val to = "2017-08-24"
val path = "s3://my-bucket/my-dataset/"
val X = spark.read.parquet(path).where(col("dt").between(from, to))
Run Code Online (Sandbox Code Playgroud)

不幸的是,我有例外

java.io.IOException: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: The operation is not valid for the object's storage class (Service: Amazon S3; Status Code: 403; Error Code: InvalidObjectState; Request ID: C444D508B6042138)
Run Code Online (Sandbox Code Playgroud)

我似乎在Glacier中有一些分区时,spark不喜欢分区数据集。我总是可以特别地阅读每个日期,在当前日期和reduce(_ union _)末尾添加该列,但是它看起来很丑陋,它不是必须的。

有什么技巧可以读取数据存储中的可用数据,即使冰川中有旧数据也是如此?

partitioning amazon-s3 amazon-glacier apache-spark

5
推荐指数
2
解决办法
1232
查看次数

如何在spark-ml CrossValidatorModel中获得最佳逻辑回归的系数?

我使用逻辑回归和spark-ml管道训练一个简单的CrossValidatorModel.我可以预测新数据,但我想超越黑盒子并对系数进行一些分析

 val lr = new LogisticRegression().
  setFitIntercept(true).
  setMaxIter(maxIter).
  setElasticNetParam(alpha).
  setStandardization(true).
  setFamily("binomial").
  setWeightCol("weight").
  setFeaturesCol("features").
  setLabelCol("response")

val assembler = new VectorAssembler().
  setInputCols(Array("feat1", "feat2")).
  setOutputCol("features")

val modelPipeline = new Pipeline().
  setStages(Array(assembler,lr))

val evaluator = new BinaryClassificationEvaluator()
  .setLabelCol("response")
Run Code Online (Sandbox Code Playgroud)

然后我定义了一个参数网格,我在网格上训练以获得最佳模型和AUC

val paramGrid = new ParamGridBuilder().
  addGrid(lr.regParam, lambdas).
  build()

val pipeline = new CrossValidator().
  setEstimator(modelPipeline).
  setEvaluator(evaluator).
  setEstimatorParamMaps(paramGrid).
  setNumFolds(nfolds)

val cvModel = pipeline.fit(train)
Run Code Online (Sandbox Code Playgroud)

如何获得最佳逻辑回归模型的系数(beta)?

scala cross-validation logistic-regression apache-spark apache-spark-ml

4
推荐指数
1
解决办法
2186
查看次数

将无效数据设置为 Spark DataFrames 中的缺失数据

让 x 是定义为(在 Scala 中)的两列字符串的数据框

case class Pair(X: String, Y: String)

val x = sqlContext.createDataFrame(Seq(
   Pair("u1", "1"), 
   Pair("u2", "wrong value"), 
   Pair("u3", "5"), 
   Pair("u4", "2")
))
Run Code Online (Sandbox Code Playgroud)

我想清理这个数据框,使第二列的每个值都是

  1. 如果可能,转换为 Int
  2. 替换为 null、Na 或任何表示“缺失值”的符号(不是 NaN,这是不同的)

我在考虑使用 udf 函数

val stringToInt = udf[Int, String](x => try {
     x.toInt
   } catch {
     case e: Exception => null
   })

x.withColumn("Y", stringToInt(x("Y")))
Run Code Online (Sandbox Code Playgroud)

...但 null 不是字符串,编译器拒绝它。请问有什么解决办法?只要我可以清理我的数据框,完全不同的方法也可以

scala user-defined-functions missing-data dataframe apache-spark

2
推荐指数
1
解决办法
1825
查看次数