在极坐标数据框中每行应用 Python UDF 函数会引发意外异常“预期元组,得到列表”

Nik*_*kSp 4 python user-defined-functions dataframe python-polars

我在 Python 中有以下极坐标 DF

df = pl.DataFrame({
    "user_movies": [[7064, 7153, 78009], [6, 7, 1042], [99, 110, 3927], [2, 11, 152081], [260, 318, 195627]],
    "user_ratings": [[5.0, 5.0, 5.0], [4.0, 2.0, 4.0], [4.0, 4.0, 3.0], [3.5, 3.0, 4.0], [1.0, 4.5, 0.5]],
    "common_movies": [[7064, 7153], [7], [110, 3927], [2], [260, 195627]]
})
print(df.head())
Run Code Online (Sandbox Code Playgroud)

我想创建一个名为“common_movie_ ratings”的新列,该列将从每个评级列表中仅获取常见电影中评级的电影的索引。例如,对于第一行,我应该仅返回电影的评分 [7064, 7153,],对于第二行,我应该返回电影的评分 [7],依此类推。

为此,我创建了以下函数:

def get_common_movie_ratings(row): #Each row is a tuple of arrays.
    common_movies = row[2] #the index of the tuple denotes the 3rd array, which represents the common_movies column.
    user_ratings = row[1]
    ratings_for_common_movies= [user_ratings[list(row[0]).index(movie)] for movie in common_movies]
    return ratings_for_common_movies
Run Code Online (Sandbox Code Playgroud)

最后,我在数据帧上应用 UDF 函数,例如

df["common_movie_ratings"] = df.apply(get_common_movie_ratings, return_dtype=pl.List(pl.Float64))
Run Code Online (Sandbox Code Playgroud)

每次我应用该函数时,在第三次迭代/行时我都会收到以下错误

预期元组,得到列表

每第三次迭代后的输出

我还尝试了一种不同的 UDF 函数方法,例如

def get_common_movie_ratings(row):
   common_movies = row[2]
   user_ratings = row[1]
   ratings = [user_ratings[i] for i, movie in enumerate(row[0]) if movie in common_movies]
   return ratings
Run Code Online (Sandbox Code Playgroud)

但在第三次迭代时,我再次收到相同的错误。

更新 - 数据输入和场景范围(此处

Dea*_*gor 5

你的方法出了什么问题

\n

忽略 python UDF 的性能损失,您的方法中有两件事出了问题。

\n
    \n
  1. apply现在map_rows在您尝试使用它的上下文中,期望输出是一个元组,其中元组的每个元素都是一个输出列。您的函数不输出元组。如果将返回行更改为return (ratings_for_common_movies,)那么它会输出一个元组并且可以工作。

    \n
  2. \n
  3. 您无法使用方括号表示法将列添加到极坐标数据框中。唯一可以位于 左侧的=是 df,绝不能df[\'new_column\']=<something>。如果您使用的旧版本允许这样做,那么您不应该这样做,部分原因是新版本不允许这样做。这意味着你必须做类似的事情df.with_columns(new_column=<some_expression>)

    \n
  4. \n
\n

在使用时向现有 df 添加列的情况下,map_rows您可以使用hstack如下命令:

\n
df=df.hstack(df.map_rows(get_common_movie_ratings)).rename({\'column_0\':\'common_movie_ratings\'})\n
Run Code Online (Sandbox Code Playgroud)\n

上述实际上是一种反模式,因为当本机方法可以工作时使用任何map_rowsmap_elements等都会更慢且效率更低。滚动到底部查看方法map_elements

\n

本机解决方案序言

\n

如果我们假设列表总是 3 长那么你可以这样做......

\n
# this is the length of the user_movies lists\nn_count=3\n\ndf.with_columns(\n    # first gather the items from user_movies based on (yet to be created) \n    # indices list \n    pl.col(\'user_movies\').list.gather(\n        # use walrus operator to create new list which is the indices where \n        # user_movies are in common_movies this works by looping through \n        # each element and checking if it\'s in common_movies. When it is in common_movies\n        # then it stores its place in the loop n variable. The n_count is the list size\n        (indices:=pl.concat_list(\n            pl.when(\n                pl.col(\'user_movies\').list.get(n).is_in(pl.col(\'common_movies\'))\n            )\n            .then(pl.lit(n))\n            for n in range(n_count)\n        ).list.drop_nulls())\n    ),\n    # use the same indicies list to gather the corresponding elements from user_ratings\n    pl.col(\'user_ratings\').list.gather(indices)\n)\n
Run Code Online (Sandbox Code Playgroud)\n

请注意,我们通过循环遍历从 0 到列表长度的范围来生成索引列表,当与第 位置n相关的项目位于时,该项目将被放入索引列表中。不幸的是,在类型列的极坐标中没有类似的方法,因此,在不分解列表的情况下,这是我能想到的创建索引列表的最佳方法。nuser_moviescommon_moviesn.indexlist

\n

本机解决方案答案

\n

Polars 本身无法递归设置,n_count因此我们需要手动进行设置。通过使用惰性求值,这比其他方法更快,因为它可以n_count并行计算每个案例。

\n
(\n    pl.concat([ # this is a list comprehension\n        # From here to the "for n_count..." line is the same as the previous code\n        # snippet except that, here, it\'s called inner_df and it\'s being \n        # made into a lazy frame \n        inner_df.lazy().with_columns(\n        pl.col(\'user_movies\').list.gather(\n            (indices:=pl.concat_list(\n                pl.when(\n                    pl.col(\'user_movies\').list.get(n).is_in(pl.col(\'common_movies\'))\n                )\n                .then(pl.lit(n))\n                for n in range(n_count)\n            ).list.drop_nulls())\n        ),\n        pl.col(\'user_ratings\').list.gather(indices)\n    )\n    # this is the iterating part of the list comprehension\n    # it takes the original df, creates a column which is\n    # a row index, then it creates a column which is the\n    # length of the list, it then partitions up the df into\n    # multiple dfs where each of the inner_dfs only has rows\n    # where the list length is the same. By using as_dict=True\n    # and .items(), it gives a convenient way to unpack the\n    # n_count (length of the list) and the inner_df  \n    for n_count, inner_df in (\n        df\n        .with_row_count(\'i\') # original row position\n        .with_columns(n_count=pl.col(\'user_movies\').list.len())\n        .partition_by(\'n_count\', as_dict=True, include_key=False)\n        .items())\n    ])\n    .sort(\'i\') # sort by original row position\n    .drop(\'i\') # drop the row position column\n    .collect() # run all of the queries in parallel\n    )\nshape: (5, 3)\n\xe2\x94\x8c\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\xac\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\xac\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x90\n\xe2\x94\x82 user_movies   \xe2\x94\x86 user_ratings \xe2\x94\x86 common_movies \xe2\x94\x82\n\xe2\x94\x82 ---           \xe2\x94\x86 ---          \xe2\x94\x86 ---           \xe2\x94\x82\n\xe2\x94\x82 list[i64]     \xe2\x94\x86 list[f64]    \xe2\x94\x86 list[i64]     \xe2\x94\x82\n\xe2\x95\x9e\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\xaa\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\xaa\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\xa1\n\xe2\x94\x82 [7064, 7153]  \xe2\x94\x86 [5.0, 5.0]   \xe2\x94\x86 [7064, 7153]  \xe2\x94\x82\n\xe2\x94\x82 [7]           \xe2\x94\x86 [2.0]        \xe2\x94\x86 [7]           \xe2\x94\x82\n\xe2\x94\x82 [110, 3927]   \xe2\x94\x86 [4.0, 3.0]   \xe2\x94\x86 [110, 3927]   \xe2\x94\x82\n\xe2\x94\x82 [2]           \xe2\x94\x86 [3.5]        \xe2\x94\x86 [2]           \xe2\x94\x82\n\xe2\x94\x82 [260, 195627] \xe2\x94\x86 [1.0, 0.5]   \xe2\x94\x86 [260, 195627] \xe2\x94\x82\n\xe2\x94\x94\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\xb4\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\xb4\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x98\n
Run Code Online (Sandbox Code Playgroud)\n

lazy通过在第一部分中转换为,concat它允许并行计算每个帧,其中每个帧是基于列表长度的子集。它还允许 成为indicesCSER,这意味着即使有 2 个引用它,它也只计算一次。

\n

顺便说一句,对于更少的代码但更多的处理/时间,您可以简单地n_counts在序言部分中设置n_count=df.select(n_count=pl.col(\'user_movies\').list.len().max()).item(),然后运行该部分中的其余部分。这种方法会比这种方法慢得多,因为对于每一行,它都会迭代元素直到最大列表长度,这会增加不必要的检查。它也没有得到相同的并行性。换句话说,它用更少的 CPU 核心完成更多的工作。

\n

基准测试

\n

虚假数据创建

\n
n=10_000_000\ndf = (\n    pl.DataFrame({\n        \'user\':np.random.randint(1,int(n/10),size=n),\n        \'user_movies\':np.random.randint(1,50,n),\n        \'user_ratings\':np.random.uniform(1,5, n),\n        \'keep\':np.random.randint(1,100,n)\n    })\n    .group_by(\'user\')\n    .agg(\n        pl.col(\'user_movies\'),\n        pl.col(\'user_ratings\').round(1),\n        common_movies=pl.col(\'user_movies\').filter(pl.col(\'keep\')>75)\n        )\n    .filter(pl.col(\'common_movies\').list.len()>0)\n    .drop(\'user\')\n    )\nprint(df.head(10))\nshape: (5, 3)\n\xe2\x94\x8c\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\xac\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\xac\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x90\n\xe2\x94\x82 user_movies    \xe2\x94\x86 user_ratings      \xe2\x94\x86 common_movies \xe2\x94\x82\n\xe2\x94\x82 ---            \xe2\x94\x86 ---               \xe2\x94\x86 ---           \xe2\x94\x82\n\xe2\x94\x82 list[i64]      \xe2\x94\x86 list[f64]         \xe2\x94\x86 list[i64]     \xe2\x94\x82\n\xe2\x95\x9e\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\xaa\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\xaa\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\xa1\n\xe2\x94\x82 [23, 35, \xe2\x80\xa6 22] \xe2\x94\x86 [3.4, 1.6, \xe2\x80\xa6 4.0] \xe2\x94\x86 [35]          \xe2\x94\x82\n\xe2\x94\x82 [30, 18, \xe2\x80\xa6 26] \xe2\x94\x86 [4.9, 1.9, \xe2\x80\xa6 2.3] \xe2\x94\x86 [10]          \xe2\x94\x82\n\xe2\x94\x82 [25, 19, \xe2\x80\xa6 29] \xe2\x94\x86 [1.7, 1.7, \xe2\x80\xa6 1.1] \xe2\x94\x86 [18, 40, 38]  \xe2\x94\x82\n\xe2\x94\x82 [31, 15, \xe2\x80\xa6 42] \xe2\x94\x86 [2.9, 1.8, \xe2\x80\xa6 4.3] \xe2\x94\x86 [31, 4, \xe2\x80\xa6 42] \xe2\x94\x82\n\xe2\x94\x82 [36, 16, \xe2\x80\xa6 49] \xe2\x94\x86 [1.0, 2.0, \xe2\x80\xa6 4.2] \xe2\x94\x86 [36]          \xe2\x94\x82\n\xe2\x94\x94\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\xb4\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\xb4\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x98\n
Run Code Online (Sandbox Code Playgroud)\n

我的方法(16 个线程):1.92 s \xc2\xb1 每个循环 195 毫秒(意味着 \xc2\xb1 标准偏差 7 次运行,每次 1 次循环)

\n

我的方法(8 个线程):每个循环 2.31 s \xc2\xb1 175 ms(意味着 \xc2\xb1 标准偏差 7 次运行,每次 1 次循环)

\n

我的方法(4个线程):每个循环3.14 s \xc2\xb1 221 ms(意味着\xc2\xb1标准偏差7次运行,每次1个循环)

\n

@jquurious:2.73 s \xc2\xb1 每个循环 130 毫秒(意味着 \xc2\xb1 标准偏差 7 次运行,每次 1 次循环)

\n

map_rows:9.12 s \xc2\xb1 每个循环 195 ms(平均 \xc2\xb1 标准偏差 7 次运行,每次 1 次循环)

\n

具有大 n_count 的前导码:9.77 s \xc2\xb1 每个循环 1.61 s(平均 \xc2\xb1 标准偏差 7 次运行,每次 1 次循环)

\n

我的方法和 map_rows 均使用大约 1 GB 的 RAM,但 explody 接近 3 GB。

\n

结构体和映射元素

\n

在我看来,map_rows您可以使用 ,而不是使用 which ,这真的很笨重map_elements。它有其自身的笨重性,因为您经常需要将输入包装在结构中,但您可以更干净地添加列,并且不必依赖列位置。

\n

例如,您可以定义您的函数并按如下方式使用它:

\n
def get_common_movie_ratings(row): #Each row is a tuple of arrays.\n    common_movies = row[\'common_movies\'] #the index of the tuple denotes the 3rd array, which represents the common_movies column.\n    user_ratings = row[\'user_ratings\']\n    ratings_for_common_movies= [user_ratings[list(row[\'user_movies\']).index(movie)] for movie in common_movies]\n    return ratings_for_common_movies\ndf.with_columns(user_ratings=pl.struct(pl.all()).map_elements(get_common_movie_ratings))\n
Run Code Online (Sandbox Code Playgroud)\n

这里发生的情况是map_elements只能从单个列调用,因此如果您的自定义函数需要多个输入,您可以将它们包装在一个结构中。该结构将变成一个字典,其中键具有列的名称。相对于 ,这种方法没有任何固有的性能优势map_rows,在我看来,它只是更好的语法。

\n

最后

\n

正如 @jqurious 在他的答案的评论中提到的,通过将此逻辑与这些列表的形成结合起来,几乎可以肯定可以在语法和性能方面得到简化。换句话说,你有第 1 步:______ 第 2 步:这个问题。虽然我只能猜测步骤 1 中发生的情况,但将这两个步骤结合起来很可能是值得的努力。

\n