PySpark错误:AttributeError:'NoneType'对象没有属性'_jvm'

Rev*_*van 9 ipython apache-spark apache-spark-sql pyspark pyspark-sql

我有时间戳数据集,格式为

我在pyspark中编写了一个udf来处理这个数据集并返回键值的Map.但我收到以下错误消息.

数据集:df_ts_list

+--------------------+
|             ts_list|
+--------------------+
|[1477411200, 1477...|
|[1477238400, 1477...|
|[1477022400, 1477...|
|[1477224000, 1477...|
|[1477256400, 1477...|
|[1477346400, 1476...|
|[1476986400, 1477...|
|[1477321200, 1477...|
|[1477306800, 1477...|
|[1477062000, 1477...|
|[1477249200, 1477...|
|[1477040400, 1477...|
|[1477090800, 1477...|
+--------------------+
Run Code Online (Sandbox Code Playgroud)

Pyspark UDF:

>>> def on_time(ts_list):
...     import sys
...     import os
...     sys.path.append('/usr/lib/python2.7/dist-packages')
...     os.system("sudo apt-get install python-numpy -y")
...     import numpy as np
...     import datetime
...     import time
...     from datetime import timedelta
...     ts = np.array(ts_list)
...     if ts.size == 0:
...             count = 0
...             duration = 0
...             st = time.mktime(datetime.now())
...             ymd = str(datetime.fromtimestamp(st).date())
...     else:
...             ts.sort()
...             one_tag = []
...             start = float(ts[0])
...             for i in range(len(ts)):
...                     if i == (len(ts)) - 1:
...                             end = float(ts[i])
...                             a_round = [start, end]
...                             one_tag.append(a_round)
...                     else:
...                             diff = (datetime.datetime.fromtimestamp(float(ts[i+1])) - datetime.datetime.fromtimestamp(float(ts[i])))
...                             if abs(diff.total_seconds()) > 3600:
...                                     end = float(ts[i])
...                                     a_round = [start, end]
...                                     one_tag.append(a_round)
...                                     start = float(ts[i+1])
...             one_tag = [u for u in one_tag if u[1] - u[0] > 300]
...             count = int(len(one_tag))
...             duration = int(np.diff(one_tag).sum())
...             ymd = str(datetime.datetime.fromtimestamp(time.time()).date())
...     return {'count':count,'duration':duration, 'ymd':ymd}
Run Code Online (Sandbox Code Playgroud)

Pyspark代码:

>>> on_time=udf(on_time, MapType(StringType(),StringType()))
>>> df_ts_list.withColumn("one_tag", on_time("ts_list")).select("one_tag").show()
Run Code Online (Sandbox Code Playgroud)

错误:

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/worker.py", line 172, in main
    process()
  File "/usr/lib/spark/python/pyspark/worker.py", line 167, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/pyspark/worker.py", line 106, in <lambda>
    func = lambda _, it: map(mapper, it)
  File "/usr/lib/spark/python/pyspark/worker.py", line 92, in <lambda>
    mapper = lambda a: udf(*a)
  File "/usr/lib/spark/python/pyspark/worker.py", line 70, in <lambda>
    return lambda *a: f(*a)
  File "<stdin>", line 27, in on_time
  File "/usr/lib/spark/python/pyspark/sql/functions.py", line 39, in _
    jc = getattr(sc._jvm.functions, name)(col._jc if isinstance(col, Column) else col)
AttributeError: 'NoneType' object has no attribute '_jvm'
Run Code Online (Sandbox Code Playgroud)

任何帮助,将不胜感激!

fms*_*msf 20

Mariusz的回答并没有真正帮助我.所以,如果你喜欢我,我发现这是因为它是谷歌上唯一的结果,你是pyspark的新手(一般都是火花),这对我有用.

在我的情况下,我得到了这个错误,因为我在设置pyspark环境之前尝试执行pyspark代码.

确保pyspark可用并在进行呼叫之前设置依赖于pyspark.sql.functions为我修复问题.

  • 或者,对于像我一样愚蠢的人,如果您在“pandas_udf”(应该接收 pandas 代码...)内编写 pyspark 代码,您可能会遇到此错误 (6认同)
  • 作为其他人的额外...我没有设置我的火花会话时我遇到了这个错误,并且我使用装饰器定义了一个pyspark UDF来添加模式.我通常在我的main中设置spark会话,但在这种情况下,当传递一个复杂的模式需要在脚本的顶部设置它时.谢谢你的快速提示! (5认同)
  • @Mari 我能建议的是,在 Spark 上下文初始化之前,您不能使用 pyspark 函数。就我而言,我使用它们作为默认参数值,但它们是在导入时而不是运行时评估的,因此 Spark 上下文未初始化。所以我只是将其更改为 None 并在函数内部进行检查。 (2认同)

Mar*_*usz 16

错误消息说在udf的第27行你调用了一些pyspark sql函数.这是行,abs()所以我想在你上面的某个地方调用from pyspark.sql.functions import *它会覆盖python的abs()功能.

  • @mufmuf 当然,您可以使用`__builtin__.abs` 作为指向python 函数的指针 (3认同)
  • 或者您可以将 pyspark.sql.functions 导入为 F 并使用 F.function_name 调用 pyspark 函数 (3认同)

SAR*_*ose 5

需要明确的是,很多人遇到的问题是源于一种糟糕的编程风格。那是from blah import *

当你们这样做

from pyspark.sql.functions import *
Run Code Online (Sandbox Code Playgroud)

你覆盖了很多python 内置函数。我强烈建议导入功能,如

import pyspark.sql.functions as f
# or 
import pyspark.sql.functions as pyf
Run Code Online (Sandbox Code Playgroud)

  • 这个建议帮助我纠正了导入时使用“*”的坏习惯。希望其他人也能纠正这个问题 (3认同)