bou*_*ert 1 java user-defined-functions apache-spark
两个sparkworker正在运行,代码如下(JUnit:
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.testng.annotations.Test;
public class UdfTest {
@Test
public void simpleUdf() {
SparkConf conf = new SparkConf()
.set("spark.driver.host", "localhost")
.setMaster("spark://host1:7077")
.set("spark.jars", "/home/.../myjar.jar")
.set("spark.submit.deployMode", "cluster")
.setAppName("RESTWS ML");
SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();
List<Row> rows = new ArrayList<>();
for (long i = 0; i < 10; i++) {
rows.add(RowFactory.create("cr" + i));
}
Dataset<Row> textAsDataset = sparkSession.createDataFrame(rows,
new StructType(new StructField[] { new StructField("contentRepositoryUUID", DataTypes.StringType, false, Metadata.empty()) }));
sparkSession.udf().register("myUdf",
(UDF1<String, String>)(col1) -> myUdf(col1), DataTypes.StringType);
Dataset<Row> rowDataset = textAsDataset.withColumn("text", functions.callUDF("myUdf",
textAsDataset.col("contentRepositoryUUID")
));
rowDataset.show();
}
private String myUdf(String col1) {
new Exception().printStackTrace();
return col1 + " changed";
}
}
Run Code Online (Sandbox Code Playgroud)
创建了一个数据集,我希望myUdf()从工作 java 进程调用 java 函数,但它是从驱动程序线程调用的,堆栈跟踪源自以下行rowDataset.show():
java.lang.Exception
at UdfTest.myUdf(UdfTest.java:53)
at UdfTest.lambda$simpleUdf$45ca9450$1(UdfTest.java:44)
at org.apache.spark.sql.UDFRegistration$$anonfun$259.apply(UDFRegistration.scala:759)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:108)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:107)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1063)
at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:152)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:92)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$24$$anonfun$applyOrElse$23.apply(Optimizer.scala:1364)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$24$$anonfun$applyOrElse$23.apply(Optimizer.scala:1364)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$24.applyOrElse(Optimizer.scala:1364)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$24.applyOrElse(Optimizer.scala:1359)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:259)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:259)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:258)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:329)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:329)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:329)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:248)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1359)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1358)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3365)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
at org.apache.spark.sql.Dataset.show(Dataset.scala:751)
at org.apache.spark.sql.Dataset.show(Dataset.scala:710)
at org.apache.spark.sql.Dataset.show(Dataset.scala:719)
at UdfTest.simpleUdf(UdfTest.java:49)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
at org.testng.internal.Invoker.invokeMethod(Invoker.java:571)
at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:707)
at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:979)
at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
at org.testng.TestRunner.privateRun(TestRunner.java:648)
at org.testng.TestRunner.run(TestRunner.java:505)
at org.testng.SuiteRunner.runTest(SuiteRunner.java:455)
at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:450)
at org.testng.SuiteRunner.privateRun(SuiteRunner.java:415)
at org.testng.SuiteRunner.run(SuiteRunner.java:364)
at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:84)
at org.testng.TestNG.runSuitesSequentially(TestNG.java:1187)
at org.testng.TestNG.runSuitesLocally(TestNG.java:1116)
at org.testng.TestNG.runSuites(TestNG.java:1028)
at org.testng.TestNG.run(TestNG.java:996)
at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:110)
Run Code Online (Sandbox Code Playgroud)
Spark如何决定是否可以从worker调用UDF?
奇怪的是,它已经工作过一次,但现在当我尝试重现这个“分布式 UDF”场景时,某些东西发生了变化,所以我不能。不幸的是,查看 Spark DEBUG 日志对我没有帮助。
虽然堆栈跟踪确实源自调用show(),但关键实际上是......
...
HERE --> at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
...
at org.apache.spark.sql.Dataset.show(Dataset.scala:719)
...
Run Code Online (Sandbox Code Playgroud)
您仍处于查询优化阶段,这是由驱动程序中的 Catalyst 完成的。
其原因是 Spark 的一个记录很少的特性,即使用SparkSession.createDataFrame()( Scala 中的SparkSession.createDatset()/ Seq.toDF())从本地集合创建的数据集仅仅是驱动程序内部的本地关系,而不是真正的分布式:
scala> val df = (0 to 5).toDF
df: org.apache.spark.sql.DataFrame = [value: int]
scala> df.queryExecution.analyzed
res45: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
LocalRelation [value#107]
scala> df.isLocal
res46: Boolean = true
Run Code Online (Sandbox Code Playgroud)
与从 RDD 创建的数据集不同:
scala> val df_from_rdd = sc.parallelize(0 to 5).toDF
df_from_rdd: org.apache.spark.sql.DataFrame = [value: int]
scala> df_from_rdd.queryExecution.analyzed
res47: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
SerializeFromObject [input[0, int, false] AS value#112]
+- ExternalRDD [obj#111]
scala> df_from_rdd.isLocal
res48: Boolean = false
Run Code Online (Sandbox Code Playgroud)
诸如此类的操作Dataset.withColumn()实际上由驱动程序本身执行,作为优化查询计划的延迟评估的一部分,并且永远不会进入执行阶段:
scala> val df_foo = df.withColumn("foo", functions.callUDF("myUdf", $"value"))
df_foo: org.apache.spark.sql.DataFrame = [value: int, foo: string]
scala> df_foo.queryExecution.analyzed
res49: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [value#107, UDF:myUdf(cast(value#107 as string)) AS foo#146]
+- LocalRelation [value#107]
scala> df_foo.queryExecution.optimizedPlan
java.lang.Exception
at $line98.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.myUdf(<console>:25)
at $line99.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:26)
at $line99.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:26)
...
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1358)
...
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
at $line143.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:27)
...
res50: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
LocalRelation [value#107, foo#132]
// Notice: the projection is gone, merged into the local relation
scala> df_foo.queryExecution.optimizedPlan
res51: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
LocalRelation [value#107, foo#163]
// Notice: no stack trace this time
Run Code Online (Sandbox Code Playgroud)
与处理从 RDD 创建的数据集不同:
scala> val df_from_rdd_foo = df_from_rdd.withColumn("foo", functions.callUDF("myUdf", $"value"))
df_from_rdd_foo: org.apache.spark.sql.DataFrame = [value: int, foo: string]
scala> df_from_rdd_foo.queryExecution.optimizedPlan
res52: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [value#112, UDF:myUdf(cast(value#112 as string)) AS foo#135]
+- SerializeFromObject [input[0, int, false] AS value#112]
+- ExternalRDD [obj#111]
Run Code Online (Sandbox Code Playgroud)
它不会在执行程序的 stderr 中产生堆栈跟踪,即不会调用 UDF。另一方面:
scala> df_from_rdd_foo.show()
+-----+---------+
|value| foo|
+-----+---------+
| 0|0 changed|
| 1|1 changed|
| 2|2 changed|
| 3|3 changed|
| 4|4 changed|
| 5|5 changed|
+-----+---------+
Run Code Online (Sandbox Code Playgroud)
在执行器的 stderr 中生成以下堆栈跟踪:
java.lang.Exception
at $line98.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.myUdf(<console>:25)
at $line99.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:26)
at $line99.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:26)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
...
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:409)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Run Code Online (Sandbox Code Playgroud)
Spark 将局部关系视为文字,这也可以从它们在 SQL 中的表示方式中看出(代码改编自此处):
scala> df.queryExecution.analyzed.collect { case r: LocalRelation => r }.head.toSQL("bar")
res55: String = VALUES (0), (1), (2), (3), (4), (5) AS bar(value)
scala> df_foo.queryExecution.optimizedPlan.collect { case r: LocalRelation => r }.head.toSQL("bar")
res56: String = VALUES (0, '0 changed'), (1, '1 changed'), (2, '2 changed'), (3, '3 changed'), (4, '4 changed'), (5, '5 changed') AS bar(value, foo)
Run Code Online (Sandbox Code Playgroud)
或者作为代码:
scala> df.queryExecution.analyzed.asCode
res57: String = LocalRelation(
List(value#107),
Vector([0,0], [0,1], [0,2], [0,3], [0,4], [0,5]),
false
)
scala> df_foo.queryExecution.analyzed.asCode
res58: String = Project(
List(value#107, UDF:myUdf(cast(value#107 as string)) AS foo#163),
LocalRelation(
List(value#107),
Vector([0,0], [0,1], [0,2], [0,3], [0,4], [0,5]),
false
)
)
scala> df_foo.queryExecution.optimizedPlan.asCode
res59: String = LocalRelation(
List(value#107, foo#163),
Vector([0,0 changed], [1,1 changed], [2,2 changed], [3,3 changed], [4,4 changed], [5,5 changed]),
false
)
Run Code Online (Sandbox Code Playgroud)
将发生的情况想象为 Java 编译器将代码替换为int a = 2 * 3;编译int a = 6;器正在执行的实际计算。