使用 python 进行 Spark 流:如何添加 UUID 列?

bea*_*bea 9 python uuid apache-spark pyspark

我想在我的数据框中添加一个带有生成 id 的列。我试过了:

uuidUdf = udf(lambda x: str(uuid.uuid4()), StringType())
df = df.withColumn("id", uuidUdf())
Run Code Online (Sandbox Code Playgroud)

但是,当我这样做时,我的输出目录中没有写入任何内容。当我删除这些行时,一切正常,所以一定有一些错误,但我在控制台中没有看到任何内容。

我曾尝试使用 monotonically_increasing_id() 而不是生成 UUID,但在我的测试中,这会产生许多重复项。我需要一个唯一标识符(不必专门是 UUID)。

我怎样才能做到这一点?

Mic*_*ski 13

一个简单的方法:

from pyspark.sql import SparkSession
import pyspark.sql.functions as f

spark = SparkSession.builder.getOrCreate()
df = spark.range(10)
df.withColumn("uuid", f.expr("uuid()")).show(truncate=False)
Run Code Online (Sandbox Code Playgroud)


Ata*_*jee 8

请试试这个:

import uuid
from pyspark.sql.functions import udf

uuidUdf= udf(lambda : str(uuid.uuid4()),StringType())
Df1 = Df.withColumn("id",uuidUdf())
Run Code Online (Sandbox Code Playgroud)

注意:您应该在添加新列后分配给新 DF。( Df1 = Df.withColumn(....)

  • @AlessioG true,如果您迭代结果两次,您将得到不同的结果,因此,如果您必须处理两次,则应该在添加随机列后坚持下去。 (2认同)

Dan*_*rod 7

来自pysparkfunctions.py:

注意:默认情况下,用户定义的函数被认为是确定性的。由于优化,可以消除重复调用,或者甚至可以调用该函数比查询中存在的次数更多的次数。如果您的函数不确定,请调用asNondeterministic用户定义的函数。例如:

from pyspark.sql.types import IntegerType
import random
random_udf = udf(lambda: int(random.random() * 100), IntegerType()).asNondeterministic()
Run Code Online (Sandbox Code Playgroud)

因此对于 UUID 来说,这将是:

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import uuid
random_udf = udf(lambda: str(uuid.uuid4()), StringType()).asNondeterministic()
Run Code Online (Sandbox Code Playgroud)

和用法:

df = df.withColumn('id', random_udf())
Run Code Online (Sandbox Code Playgroud)