Jam*_*ams 4 python apache-spark pyspark databricks
我有一个包含几列的 DataFrame,我想将其用作函数的输入,该函数将每行产生多个输出,每个输出都会进入一个新列。
例如,我有一个函数,它获取地址值并解析为更细粒度的部分:
def parser(address1: str, city: str, state: str) -> Dict[str, str]:
...
Run Code Online (Sandbox Code Playgroud)
输出示例:
{'STREETNUMPREFIX': None,
'STREETNUMBER': '123',
'STREETNUMSUFFIX': None,
'STREETNAME': 'Elm',
'STREETTYPE': 'Ave.'}
Run Code Online (Sandbox Code Playgroud)
假设我有一个包含address1、city、state和 列的 DataFrame,我想parser使用这三列的值作为输入,将上述函数应用于所有行,并将每行的输出存储为与字典匹配的新列回。
这是我迄今为止尝试过的:
def parser(address1: str, city: str, state: str) -> Dict[str, str]:
...
Run Code Online (Sandbox Code Playgroud)
上面的内容只导致了奇怪的空指针异常,没有告诉我如何修复问题:
SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 4 times, most recent failure: Lost task 0.3 in stage 9.0 (TID 86, 172.18.237.92, executor 1): java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:110)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_1$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$11(EvalPythonExec.scala:134)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:731)
at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)
at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:187)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
at org.apache.spark.scheduler.Task.run(Task.scala:117)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:657)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:660)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Run Code Online (Sandbox Code Playgroud)
这是我的 udf 使用的非常简单的示例。
from pyspark.sql.functions import *
from pyspark.sql.types import *
def cal(a: int, b: int) -> [int, int]:
return [a+b, a*b]
cal = udf(cal, ArrayType(StringType()))
df.select('A', 'B', *[cal('A', 'B')[i] for i in range(0, 2)]) \
.toDF('A', 'B', 'Add', 'Muptiple').show()
+---+---+---+--------+
| A| B|Add|Muptiple|
+---+---+---+--------+
| 1| 2| 3| 2|
| 2| 4| 6| 8|
| 3| 6| 9| 18|
+---+---+---+--------+
Run Code Online (Sandbox Code Playgroud)
我检查了你的代码并发现了这个。
def get_schema(columns: [str]) -> StructType:
return StructType([StructField(col_name, StringType(), False) for col_name in columns])
Run Code Online (Sandbox Code Playgroud)
您不允许null所有列都有该值,但它确实存在。于是错误就来了。我建议您更改可False -> True为空的值,然后它就会起作用。
| 归档时间: |
|
| 查看次数: |
7058 次 |
| 最近记录: |