如何使用 DataFrame.explode 和自定义 UDF 将字符串拆分为子字符串?

Chr*_*lis 4 scala apache-spark apache-spark-sql

我使用 Spark 1.5

我有一个数据框A_DF如下:

+--------------------+--------------------+
|                  id|        interactions|
+--------------------+--------------------+
|        id1         |30439831,30447866...|
|        id2         |37597858,34499875...|
|        id3         |30447866,32896718...|
|        id4         |33029476,31988037...|
|        id5         |37663606,37627579...|
|        id6         |37663606,37627579...|
|        id7         |36922232,37675077...|
|        id8         |37359529,37668820...|
|        id9         |37675077,37707778...|
+--------------------+--------------------+
Run Code Online (Sandbox Code Playgroud)

哪里interactions是 a String. 我想通过首先将字符串拆分为一组用逗号分隔的子字符串来分解interactions它,我尝试执行以下操作:

val splitArr = udf { (s: String) => s.split(",").map(_.trim) }

val B_DF = A_DF.explode(splitArr($"interactions"))
Run Code Online (Sandbox Code Playgroud)

但我收到以下错误:

val splitArr = udf { (s: String) => s.split(",").map(_.trim) }

val B_DF = A_DF.explode(splitArr($"interactions"))
Run Code Online (Sandbox Code Playgroud)

我不明白。所以我尝试了更复杂的事情:

val B_DF = A_DF.explode($"interactions") { case (Row(interactions: String) =>
        interactions.split(",").map(_.trim))
     }
Run Code Online (Sandbox Code Playgroud)

我收到检查警告,内容如下:

error: missing arguments for method explode in class DataFrame;
follow this method with `_' if you want to treat it as a partially applied function A_DF.explode(splitArr($"interactions"))
Run Code Online (Sandbox Code Playgroud)

有任何想法吗?

Jac*_*ski 6

从 Spark 2.0.0 开始,Dataset.explode已弃用。除非有理由,否则请远离它。你已被警告过。

\n\n

如果您确实有理由使用DataFrame.explode,请参阅签名:

\n\n
explode[A, B](inputColumn: String, outputColumn: String)(f: (A) \xe2\x87\x92 TraversableOnce[B])(implicit arg0: scala.reflect.api.JavaUniverse.TypeTag[B]): DataFrame\n\nexplode[A <: Product](input: Column*)(f: (Row) \xe2\x87\x92 TraversableOnce[A])(implicit arg0: scala.reflect.api.JavaUniverse.TypeTag[A]): DataFrame\n
Run Code Online (Sandbox Code Playgroud)\n\n

无论哪种情况,explode都会使用两个参数组,因此会出现第一个错误。

\n\n

(这是 Spark 2.1.0-SNAPSHOT

\n\n
scala> spark.version\nres1: String = 2.1.0-SNAPSHOT\n\nscala> val A_DF = Seq(("id1", "30439831,30447866")).toDF("id", "interactions")\nA_DF: org.apache.spark.sql.DataFrame = [id: string, interactions: string]\n\nscala> A_DF.explode(split($"interactions", ","))\n<console>:26: error: missing argument list for method explode in class Dataset\nUnapplied methods are only converted to functions when a function type is expected.\nYou can make this conversion explicit by writing `explode _` or `explode(_)(_)(_)` instead of `explode`.\n       A_DF.explode(split($"interactions", ","))\n                   ^\n
Run Code Online (Sandbox Code Playgroud)\n\n

您可以按如下方式执行此操作(请注意有关弃用的警告,explode因为我使用 2.1.0-SNAPSHOT):

\n\n
scala> A_DF.explode[String, String]("interactions", "parts")(_.split(",")).show\nwarning: there was one deprecation warning; re-run with -deprecation for details\n+---+-----------------+--------+\n| id|     interactions|   parts|\n+---+-----------------+--------+\n|id1|30439831,30447866|30439831|\n|id1|30439831,30447866|30447866|\n+---+-----------------+--------+\n
Run Code Online (Sandbox Code Playgroud)\n\n

您可以按如下方式使用另一个explode

\n\n
scala> import org.apache.spark.sql.Row\nimport org.apache.spark.sql.Row\n\nscala> case class Interaction(id: String, part: String)\ndefined class Interaction\n\nscala> A_DF.explode[Interaction]($"id", $"interactions") { case Row(id: String, ins: String) => ins.split(",").map { it => Interaction(id, it) } }.show\nwarning: there was one deprecation warning; re-run with -deprecation for details\n+---+-----------------+---+--------+\n| id|     interactions| id|    part|\n+---+-----------------+---+--------+\n|id1|30439831,30447866|id1|30439831|\n|id1|30439831,30447866|id1|30447866|\n+---+-----------------+---+--------+\n
Run Code Online (Sandbox Code Playgroud)\n\n

使用explode函数来代替,你应该可以按照scaladoc中的描述(如下引用):

\n\n
\n\n

鉴于此已被弃用,作为替代方案,您可以使用以下方式分解列functions.explode()

\n\n
ds.select(explode(split(\'words, " ")).as("word"))\n
Run Code Online (Sandbox Code Playgroud)\n\n

或者flatMap()

\n\n
ds.flatMap(_.words.split(" "))\n
Run Code Online (Sandbox Code Playgroud)\n\n
\n\n

然后您可以按如下方式使用explode函数:

\n\n
A_DF.select($"id", explode(split(\'interactions, ",") as "part"))\n
Run Code Online (Sandbox Code Playgroud)\n