小编Yas*_*ash的帖子

scala.collection.mutable.WrappedArray $ ofRef无法强制转换为Integer

我对Spark和Scala很新.我试图将一个函数称为Spark UDF,但我遇到了这个我似乎无法解决的错误.

据我所知,在Scala中,Array和Seq不一样.WrappedArray是Seq的子类型,WrappedArray和Array之间存在隐式转换,但我不确定为什么在UDF的情况下不会发生这种情况.

任何指示,以帮助我理解和解决这一点非常感谢.

这是代码的片段

def filterMapKeysWithSet(m: Map[Int, Int], a: Array[Int]): Map[Int, Int] = {
val seqToArray = a.toArray
val s = seqToArray.toSet
m filterKeys s
}

val myUDF = udf((m: Map[Int, Int], a: Array[Int]) => filterMapKeysWithSet(m, a))

case class myType(id: Int, m: Map[Int, Int])
val mapRDD = Seq(myType(1, Map(1 -> 100, 2 -> 200)), myType(2, Map(1 -> 100, 2 -> 200)), myType(3, Map(3 -> 300, 4 -> 400)))
val mapDF = mapRDD.toDF

mapDF: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>] …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql spark-dataframe

16
推荐指数
1
解决办法
1万
查看次数

从 Spark 调用休息服务

我试图找出从 Spark 调用 Rest 端点的最佳方法。

我目前的方法(解决方案 [1])看起来像这样 -

val df = ... // some dataframe

val repartitionedDf = df.repartition(numberPartitions)

lazy val restEndPoint = new restEndPointCaller() // lazy evaluation of the object which creates the connection to REST. lazy vals are also initialized once per JVM (executor)

val enrichedDf = repartitionedDf 
.map(rec => restEndPoint.getResponse(rec)) // calls the rest endpoint for every record
.toDF
Run Code Online (Sandbox Code Playgroud)

我知道我可以使用 .mapPartitions() 而不是 .map(),但是查看 DAG,看起来 spark 优化了重新分区 -> 无论如何映射到 mapPartition。

在第二种方法(解决方案 [2])中,为每个分区创建一次连接,并为分区内的所有记录重用。

  val newDs = myDs.mapPartitions(partition => { …
Run Code Online (Sandbox Code Playgroud)

rest scala apache-spark restapi

8
推荐指数
1
解决办法
9364
查看次数

在 Spark 2.0.0 中使用 stat.bloomFilter 过滤另一个数据帧

我有两个大型数据框 [a] 一个,其中包含由 id [b] 标识的所有事件。我想使用 spark 2.0.0 中的 stat.bloomFilter 实现基于 [b] 中的 id 过滤 [a]

但是我在数据集 API 中没有看到任何将布隆过滤器连接到数据框的操作 [a]

val in1 = spark.sparkContext.parallelize(List(0, 1, 2, 3, 4, 5))
val df1 = in1.map(x => (x, x+1, x+2)).toDF("c1", "c2", "c3")

val in2 = spark.sparkContext.parallelize(List(0, 1, 2))
val df2 = in2.map(x => (x)).toDF("c1")

val expectedNumItems: Long = 1000
val fpp: Double = 0.005

val sbf = df.stat.bloomFilter($"c1", expectedNumItems, fpp)
val sbf2 = df2.stat.bloomFilter($"c1", expectedNumItems, fpp)
Run Code Online (Sandbox Code Playgroud)

根据 df2 中的值过滤“df1”的最佳方法是什么?

谢谢!

scala bloom-filter apache-spark apache-spark-sql apache-spark-dataset

5
推荐指数
1
解决办法
3704
查看次数

将案例类传递给 Spark UDF

我有一个 scala-2.11 函数,它根据提供的类类型从 Map 创建一个案例类。

def createCaseClass[T: TypeTag, A](someMap: Map[String, A]): T = {

    val rMirror = runtimeMirror(getClass.getClassLoader)
    val myClass = typeOf[T].typeSymbol.asClass
    val cMirror = rMirror.reflectClass(myClass)

    // The primary constructor is the first one
    val ctor = typeOf[T].decl(termNames.CONSTRUCTOR).asTerm.alternatives.head.asMethod
    val argList = ctor.paramLists.flatten.map(param => someMap(param.name.toString))

    cMirror.reflectConstructor(ctor)(argList: _*).asInstanceOf[T]
  }
Run Code Online (Sandbox Code Playgroud)

我试图在 Spark 数据帧的上下文中使用它作为 UDF。但是,我不确定通过案例课程的最佳方法是什么。下面的方法似乎不起作用。

def myUDF[T: TypeTag] = udf { (inMap: Map[String, Long]) =>
    createCaseClass[T](inMap)
  }
Run Code Online (Sandbox Code Playgroud)

我正在寻找这样的东西 -

case class MyType(c1: String, c2: Long)

val myUDF = udf{(MyType, inMap) => createCaseClass[MyType](inMap)}
Run Code Online (Sandbox Code Playgroud)

感谢解决此问题的想法和建议。

scala user-defined-functions case-class apache-spark

5
推荐指数
1
解决办法
4536
查看次数

不支持类型为Any的架构

我正在尝试创建一个Spark UDF来从用户定义的case类中提取(key,value)对的Map.

scala函数似乎工作正常,但是当我尝试将它转换为spark2.0中的UDF时,我遇到了"Schema for type Any is not supported"错误.

case class myType(c1: String, c2: Int)
def getCaseClassParams(cc: Product): Map[String, Any] = {

    cc
      .getClass
      .getDeclaredFields // all field names
      .map(_.getName)
      .zip(cc.productIterator.to) // zipped with all values
      .toMap

  }
Run Code Online (Sandbox Code Playgroud)

但是当我尝试将函数值实例化为UDF时,会导致以下错误 -

val ccUDF = udf{(cc: Product, i: String) => getCaseClassParams(cc).get(i)}

java.lang.UnsupportedOperationException: Schema for type Any is not supported
  at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:716)
  at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:668)
  at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:654)
  at org.apache.spark.sql.functions$.udf(functions.scala:2841)
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql udf apache-spark-2.0

3
推荐指数
1
解决办法
5215
查看次数