我对Spark和Scala很新.我试图将一个函数称为Spark UDF,但我遇到了这个我似乎无法解决的错误.
据我所知,在Scala中,Array和Seq不一样.WrappedArray是Seq的子类型,WrappedArray和Array之间存在隐式转换,但我不确定为什么在UDF的情况下不会发生这种情况.
任何指示,以帮助我理解和解决这一点非常感谢.
这是代码的片段
def filterMapKeysWithSet(m: Map[Int, Int], a: Array[Int]): Map[Int, Int] = {
val seqToArray = a.toArray
val s = seqToArray.toSet
m filterKeys s
}
val myUDF = udf((m: Map[Int, Int], a: Array[Int]) => filterMapKeysWithSet(m, a))
case class myType(id: Int, m: Map[Int, Int])
val mapRDD = Seq(myType(1, Map(1 -> 100, 2 -> 200)), myType(2, Map(1 -> 100, 2 -> 200)), myType(3, Map(3 -> 300, 4 -> 400)))
val mapDF = mapRDD.toDF
mapDF: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>] …
Run Code Online (Sandbox Code Playgroud) 我试图找出从 Spark 调用 Rest 端点的最佳方法。
我目前的方法(解决方案 [1])看起来像这样 -
val df = ... // some dataframe
val repartitionedDf = df.repartition(numberPartitions)
lazy val restEndPoint = new restEndPointCaller() // lazy evaluation of the object which creates the connection to REST. lazy vals are also initialized once per JVM (executor)
val enrichedDf = repartitionedDf
.map(rec => restEndPoint.getResponse(rec)) // calls the rest endpoint for every record
.toDF
Run Code Online (Sandbox Code Playgroud)
我知道我可以使用 .mapPartitions() 而不是 .map(),但是查看 DAG,看起来 spark 优化了重新分区 -> 无论如何映射到 mapPartition。
在第二种方法(解决方案 [2])中,为每个分区创建一次连接,并为分区内的所有记录重用。
val newDs = myDs.mapPartitions(partition => { …
Run Code Online (Sandbox Code Playgroud) 我有两个大型数据框 [a] 一个,其中包含由 id [b] 标识的所有事件。我想使用 spark 2.0.0 中的 stat.bloomFilter 实现基于 [b] 中的 id 过滤 [a]
但是我在数据集 API 中没有看到任何将布隆过滤器连接到数据框的操作 [a]
val in1 = spark.sparkContext.parallelize(List(0, 1, 2, 3, 4, 5))
val df1 = in1.map(x => (x, x+1, x+2)).toDF("c1", "c2", "c3")
val in2 = spark.sparkContext.parallelize(List(0, 1, 2))
val df2 = in2.map(x => (x)).toDF("c1")
val expectedNumItems: Long = 1000
val fpp: Double = 0.005
val sbf = df.stat.bloomFilter($"c1", expectedNumItems, fpp)
val sbf2 = df2.stat.bloomFilter($"c1", expectedNumItems, fpp)
Run Code Online (Sandbox Code Playgroud)
根据 df2 中的值过滤“df1”的最佳方法是什么?
谢谢!
scala bloom-filter apache-spark apache-spark-sql apache-spark-dataset
我有一个 scala-2.11 函数,它根据提供的类类型从 Map 创建一个案例类。
def createCaseClass[T: TypeTag, A](someMap: Map[String, A]): T = {
val rMirror = runtimeMirror(getClass.getClassLoader)
val myClass = typeOf[T].typeSymbol.asClass
val cMirror = rMirror.reflectClass(myClass)
// The primary constructor is the first one
val ctor = typeOf[T].decl(termNames.CONSTRUCTOR).asTerm.alternatives.head.asMethod
val argList = ctor.paramLists.flatten.map(param => someMap(param.name.toString))
cMirror.reflectConstructor(ctor)(argList: _*).asInstanceOf[T]
}
Run Code Online (Sandbox Code Playgroud)
我试图在 Spark 数据帧的上下文中使用它作为 UDF。但是,我不确定通过案例课程的最佳方法是什么。下面的方法似乎不起作用。
def myUDF[T: TypeTag] = udf { (inMap: Map[String, Long]) =>
createCaseClass[T](inMap)
}
Run Code Online (Sandbox Code Playgroud)
我正在寻找这样的东西 -
case class MyType(c1: String, c2: Long)
val myUDF = udf{(MyType, inMap) => createCaseClass[MyType](inMap)}
Run Code Online (Sandbox Code Playgroud)
感谢解决此问题的想法和建议。
我正在尝试创建一个Spark UDF来从用户定义的case类中提取(key,value)对的Map.
scala函数似乎工作正常,但是当我尝试将它转换为spark2.0中的UDF时,我遇到了"Schema for type Any is not supported"错误.
case class myType(c1: String, c2: Int)
def getCaseClassParams(cc: Product): Map[String, Any] = {
cc
.getClass
.getDeclaredFields // all field names
.map(_.getName)
.zip(cc.productIterator.to) // zipped with all values
.toMap
}
Run Code Online (Sandbox Code Playgroud)
但是当我尝试将函数值实例化为UDF时,会导致以下错误 -
val ccUDF = udf{(cc: Product, i: String) => getCaseClassParams(cc).get(i)}
java.lang.UnsupportedOperationException: Schema for type Any is not supported
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:716)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:668)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:654)
at org.apache.spark.sql.functions$.udf(functions.scala:2841)
Run Code Online (Sandbox Code Playgroud)