Pra*_*nks 2 apache-spark apache-spark-sql pyspark
我试图实现两个时间戳列值之间的差异。尝试使用 Spark 中可用的不同方法来获得相同的结果。我能够使用 Spark SQL 和普通函数获得相同的结果。但是,当我尝试将此函数注册为 UDF 时,它开始抛出错误。
数据:
id|end_date|start_date|location
1|2015-10-14 00:00:00|2015-09-14 00:00:00|CA-SF
2|2015-10-15 01:00:20|2015-08-14 00:00:00|CA-SD
3|2015-10-16 02:30:00|2015-01-14 00:00:00|NY-NY
4|2015-10-17 03:00:20|2015-02-14 00:00:00|NY-NY
5|2015-10-18 04:30:00|2014-04-14 00:00:00|CA-SD
Run Code Online (Sandbox Code Playgroud)
使用 SparkSQL:工作正常!!
data.createOrReplaceTempView("data_tbl")
query = "SELECT id, end_date, start_date,\
datediff(end_date,start_date) as dtdiff FROM data_tbl"
spark.sql(query).show()
Run Code Online (Sandbox Code Playgroud)
使用Python函数:工作正常!
from pyspark.sql.functions import datediff
def get_diff(x, y):
result = datediff(x,y)
return result
data.withColumn('differ',get_diff('end_date','start_date')).show()
Run Code Online (Sandbox Code Playgroud)
两种情况的结果:
+---+-------------------+-------------------+--------+------+
| id| end_date| start_date|location|differ|
+---+-------------------+-------------------+--------+------+
| 1|2015-10-14 00:00:00|2015-09-14 00:00:00| CA-SF| 30|
| 2|2015-10-15 01:00:20|2015-08-14 00:00:00| CA-SD| 62|
| 3|2015-10-16 02:30:00|2015-01-14 00:00:00| NY-NY| 275|
| 4|2015-10-17 03:00:20|2015-02-14 00:00:00| NY-NY| 245|
| 5|2015-10-18 04:30:00|2014-04-14 00:00:00| CA-SD| 552|
+---+-------------------+-------------------+--------+------+
Run Code Online (Sandbox Code Playgroud)
将函数注册为 UDF:不工作!
from pyspark.sql.functions import udf, datediff
get_diff_udf = udf(lambda x, y: datediff(x,y))
data.withColumn('differ',get_diff_udf('end_date','start_date')).show()
Run Code Online (Sandbox Code Playgroud)
错误:
Py4JJavaError: An error occurred while calling o934.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 18.0 failed 1 times, most recent failure: Lost task 0.0 in stage 18.0 (TID 18, localhost, executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
Run Code Online (Sandbox Code Playgroud)
您需要通过将OBJC_DISABLE_INITIALIZE_FORK_SAFETY环境变量设置为 来禁用分叉安全YES。这为我解决了同样的问题。
您可以将其包含在脚本中:
import os
os.environ['OBJC_DISABLE_INITIALIZE_FORK_SAFETY'] = 'YES'
Run Code Online (Sandbox Code Playgroud)
要了解更多信息fork safety或为什么我们需要设置该环境变量:
多重处理会导致 Python 崩溃,并给出一个错误:调用 fork() 时可能已在另一个线程中进行
| 归档时间: |
|
| 查看次数: |
3391 次 |
| 最近记录: |