在pyspark中添加UUID的有效方法

bco*_*a12 5 python-3.x apache-spark pyspark

我有一个 DataFrame,我想添加一列不同的 uuid4() 行。我的代码:

from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import StringType

from uuid import uuid4

spark_session = SparkSession.builder.getOrCreate()

df = spark_session.createDataFrame([
        [1, 1, 'teste'],
        [2, 2, 'teste'],
        [3, 0, 'teste'],
        [4, 5, 'teste'],
    ],
    list('abc'))


df = df.withColumn("_tmp", f.lit(1))

uuids = [str(uuid4()) for _ in range(df.count())]
df1 = spark_session.createDataFrame(uuids, StringType())
df1 = df_1.withColumn("_tmp", f.lit(1))


df2 = df.join(df_1, "_tmp", "inner").drop("_tmp")
df2.show()
Run Code Online (Sandbox Code Playgroud)

但我有这个错误:

Py4JJavaError: An error occurred while calling o1571.showString.
: org.apache.spark.sql.AnalysisException: Detected implicit cartesian product for INNER join between logical plans
Run Code Online (Sandbox Code Playgroud)

我已经尝试使用别名并使用 monotonically_increasing_id 作为连接列,但我 在这里看到我不能相信 monotonically_increasing_id 作为合并列。我期待着:

+---+---+-----+------+
|  a|  b|    c| value|
+---+---+-----+------+
|  1|  1|teste| uuid4|
|  2|  2|teste| uuid4|
|  3|  0|teste| uuid4|
|  4|  5|teste| uuid4|
+---+---+-----+------+
Run Code Online (Sandbox Code Playgroud)

在这种情况下,正确的方法是什么?

bco*_*a12 8

我按照@Tetlanesh 的建议使用 row_number。我必须创建一个 ID 列以确保 row_number 计算 Window 的每一行。

from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from uuid import uuid4
from pyspark.sql.window import Window
from pyspark.sql.types import StringType

from pyspark.sql.functions import row_number


spark_session = SparkSession.builder.getOrCreate()

df = spark_session.createDataFrame([
        [1, 1, 'teste'],
        [1, 2, 'teste'],
        [2, 0, 'teste'],
        [2, 5, 'teste'],
    ],
    list('abc'))

df = df.alias("_tmp")
df.registerTempTable("_tmp")

df2 = self.spark_session.sql("select *, uuid() as uuid from _tmp")

df2.show()
Run Code Online (Sandbox Code Playgroud)

另一种方法是使用 windows,但它不如第一种有效:


df = df.withColumn("_id", f.lit(1))
df = df.withColumn("_tmp", row_number().over(Window.orderBy('_id')))

uuids = [(str(uuid4()), 1) for _ in range(df.count())]
df1 = spark_session.createDataFrame(uuids, ['uuid', '_id'])
df1 = df1.withColumn("_tmp", row_number().over(Window.orderBy('_id')))


df2 = df.join(df1, "_tmp", "inner").drop('_id')

df2.show()
Run Code Online (Sandbox Code Playgroud)

两个输出:

+---+---+-----+------+
|  a|  b|    c|  uuid|
+---+---+-----+------+
|  1|  1|teste| uuid4|
|  2|  2|teste| uuid4|
|  3|  0|teste| uuid4|
|  4|  5|teste| uuid4|
+---+---+-----+------+
Run Code Online (Sandbox Code Playgroud)