我想编写Spark UDAF,其中列的类型可以是任何在其上定义了Scala Numeric的列.我已经搜查了互联网,但发现只有具体类型,如例子DoubleType,LongType.这不可能吗?但是如何将UDAF与其他数值一起使用呢?
scala aggregate-functions user-defined-functions apache-spark apache-spark-sql
我有默认参数和使用Play Json Read的问题.这是我的代码:
case class Test(action: String, storeResult: Option[Boolean] = Some(true), returndata: Option[Boolean] = Some(true))
val json =
"""
{"action": "Test"}"""
implicit val testReads: Reads[Test] =
(
(JsPath \\ "action").read[String](minLength[String](1)) and
(JsPath \\ "store_result").readNullable[Boolean] and
(JsPath \\ "returndata").readNullable[Boolean]
) (Test.apply _)
val js = Json.parse(json)
js.validate[Test] match {
case JsSuccess(a, _) => println(a)
case JsError(errors) =>
println("Here")
println(errors)
}
Run Code Online (Sandbox Code Playgroud)
我最终希望得到的是
Test("Test", Some(true), Some(true))
Run Code Online (Sandbox Code Playgroud)
但我得到了:
Test("Test",None,None)
Run Code Online (Sandbox Code Playgroud)
为什么会这样?如果我没有在json中提供参数,为什么它没有得到默认值?如何实现我想要的?
我正在寻找一种方法来增加for循环中的两个变量,就像可以在Java中那样:
for (int j = column + 1, i = row + 1; j < size && i < size; j++, i++)
Run Code Online (Sandbox Code Playgroud)
如果我写在斯卡拉
for(j <- start to end; i <- start2 to end2)
Run Code Online (Sandbox Code Playgroud)
这将转换为嵌套循环,但是我需要它在原始循环中运行,并在第一个增量结束时停止。
我想知道这是一个糟糕的代码风格,用return替换map并继续尝试可读性?假设我在里面尝试了一些变量然后我需要对它做任何事情.
val myData: Try[String]
Run Code Online (Sandbox Code Playgroud)
我可以:
myData.flatMap{
some long code
}
Run Code Online (Sandbox Code Playgroud)
或者我可以这样做:
if (myData.isFailure) return myData
val myString = myData.get
some long code that use myString
Run Code Online (Sandbox Code Playgroud) 使用此代码 println 将仅针对指定的异常执行。我想知道是否可以否定该行以使其对未指定的所有其他异常执行。我知道可以使用 2 个案例,但我想知道是否可以使用一个案例来完成。
val myHandler: PartialFunction[Throwable, Unit] = {
case e @ (_: MappingException | _: ParseException | _: SomeOtherException) =>
println("Got it")
}
Run Code Online (Sandbox Code Playgroud) 我正在使用以下代码创建数据框:
val data = List(
List(444.1235D),
List(67.5335D),
List(69.5335D),
List(677.5335D),
List(47.5335D),
List(null)
)
val rdd = sparkContext.parallelize(data).map(Row.fromSeq(_))
val schema = StructType(Array(
StructField("value", DataTypes.DoubleType, true)
))
val df = sqlContext.createDataFrame(rdd, schema)
Run Code Online (Sandbox Code Playgroud)
然后我将我的udf应用于它:
val multip: Dataset[Double] = df.select(doubleUdf(df("value"))).as[Double]
Run Code Online (Sandbox Code Playgroud)
然后我尝试在此数据集上使用reduce:
val multipl = multip.reduce(_ * _)
Run Code Online (Sandbox Code Playgroud)
在这里我得到0.0的结果.我也尝试过滤掉空值
val multipl = multip.filter(_ != null).reduce(_ * _)
Run Code Online (Sandbox Code Playgroud)
结果相同.如果我从数据中删除null值,一切正常.如何使用空值减少工作量?
我的udf定义如下:
val doubleUdf: UserDefinedFunction = udf((v: Any) => Try(v.toString.toDouble).toOption)
Run Code Online (Sandbox Code Playgroud) 我试图了解如何将 an 传递Arc<Mutex<T>>给另一个函数。这是我的代码,为了清楚起见,我删除了其中的一部分:
pub struct Pool {
inner: Arc<Mutex<PostgresDb>>,
}
fn main() {
let postgres: Arc<Mutex<PostgresDb>>; //removed creation of the instance
setup(&postgres)
}
fn setup(postgres: &Arc<Mutex<PostgresDb>>) -> () {
let pool = Arc::new(Pool::new(*postgres));
}
Run Code Online (Sandbox Code Playgroud)
我收到错误:
pub struct Pool {
inner: Arc<Mutex<PostgresDb>>,
}
fn main() {
let postgres: Arc<Mutex<PostgresDb>>; //removed creation of the instance
setup(&postgres)
}
fn setup(postgres: &Arc<Mutex<PostgresDb>>) -> () {
let pool = Arc::new(Pool::new(*postgres));
}
Run Code Online (Sandbox Code Playgroud)
我如何正确通过postgres?