TypeError:得到一个意外的关键字参数

Nir*_*mal 8 python user-defined-functions apache-spark apache-spark-sql pyspark

下面看似简单的代码会引发以下错误:

Traceback (most recent call last):
  File "/home/nirmal/process.py", line 165, in <module>
    'time_diff': f.last(adf['time_diff']).over(window_device_rows)
TypeError: __call__() got an unexpected keyword argument 'this_campaign'
Run Code Online (Sandbox Code Playgroud)

码:

# Function to flag network timeouts
def flag_network_timeout(**kwargs):
    if kwargs['this_network'] != kwargs['last_network'] \
            or kwargs['this_campaign'] != kwargs['last_campaign'] \
            or kwargs['this_adgroup'] != kwargs['last_adgroup'] \
            or kwargs['this_creative'] != kwargs['last_creative'] \
            or kwargs['time_diff'] > network_timeout:
        return 1
    else:
        return 0
flag_network_timeout = f.udf(flag_network_timeout, IntegerType())

# Column spec to go over the device events and flag network resets
network_timeout_flag = flag_network_timeout(**{
    'last_network': f.first(adf['network']).over(window_device_rows),
    'last_campaign': f.first(adf['campaign']).over(window_device_rows),
    'last_adgroup': f.first(adf['adgroup']).over(window_device_rows),
    'last_creative': f.first(adf['creative']).over(window_device_rows),
    'this_network': f.last(adf['network']).over(window_device_rows),
    'this_campaign': f.last(adf['campaign']).over(window_device_rows),
    'this_adgroup': f.last(adf['adgroup']).over(window_device_rows),
    'this_creative': f.last(adf['creative']).over(window_device_rows),
    'time_diff': f.last(adf['time_diff']).over(window_device_rows)
})

# Update dataframe with the new columns
adf = adf.select('*', network_timeout_flag.alias('network_timeout'))
Run Code Online (Sandbox Code Playgroud)

我做错了什么?谢谢.

zer*_*323 12

你得到一个例外,因为UserDefinedFunction.__call__只支持varargs而不支持关键字args.

def __call__(self, *cols):
    sc = SparkContext._active_spark_context
    jc = self._judf.apply(_to_seq(sc, cols, _to_java_column))
    return Column(jc)
Run Code Online (Sandbox Code Playgroud)

在更基本的层次上,UDF只能接收Column参数,这些参数将在运行时扩展为相应的值,而不是标准的Python对象.

就个人而言,我根本不会使用**kwargs它,但忽略了你可以通过编写SQL表达式来实现你想要的东西:

def flag_network_timeout_(**kwargs):

    cond = (
        (kwargs['this_network'] != kwargs['last_network']) |
        (kwargs['this_campaign'] != kwargs['last_campaign']) |
        (kwargs['this_adgroup'] != kwargs['last_adgroup']) |
        (kwargs['this_creative'] != kwargs['last_creative']) |
        (kwargs['time_diff'] > network_timeout))

    return f.when(cond, 1).otherwise(0)
Run Code Online (Sandbox Code Playgroud)

  • 老实说,我没有得到第二部分。它与UDF有何不同? (2认同)