如何在 spark 数据框中创建唯一的自动生成的 Id 列

Aya*_*was 7 apache-spark

我有一个数据框,我必须在其中一列中生成一个唯一的 Id。此 id 必须使用偏移量生成。因为,我需要使用自动生成的 id 保留此数据帧,现在如果新数据进入自动生成的 id 不应与现有数据冲突。我检查了单调递增函数,但它不接受任何偏移量。这是我试过的:

df=df.coalesce(1);
df = df.withColumn(inputCol,functions.monotonically_increasing_id());
Run Code Online (Sandbox Code Playgroud)

但是有没有办法让 monotonically_increasing_id() 从起始偏移量开始?

oll*_*ik1 9

您只需添加它即可为 id 提供最小值。请注意,不保证这些值将从最小值开始

.withColumn("id", monotonically_increasing_id + 123)
Run Code Online (Sandbox Code Playgroud)

说明:列+的运算符重载https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Column.scala#L642

  • @AyanBis在Java中`.withColumn("id",functions.monotonically_increasing_id().plus(123))`。至少在当前的实现中,如果只有一个分区,它似乎从 min 开始。如果是这种情况,如果您想自己控制行为,您也可以使用“df.mapPartitions(_.zipWithIndex)...”(对不起,scala)之类的东西 (2认同)

abi*_*sis 6

或者,如果您不想将您的程序限制在一个分区中,df.coalesce(1)您可以使用zipWithIndex以 index = 0 开头的分区作为下一个:

lines = [["a1", "a2", "a3"],
            ["b1", "b2", "b3"],
            ["c1", "c2", "c3"]]

    cols = ["c1", "c2", "c3"]

    df = spark.createDataFrame(lines, cols)

    start_indx = 10
    df = df.rdd.zipWithIndex() \
           .map(lambda (r, indx): (indx + start_indx, r[0], r[1], r[2])) \
           .toDF(["id", "c1", "c2", "c3"])

    df.show(10, False)
Run Code Online (Sandbox Code Playgroud)

在这种情况下,我设置了start_index = 10. 这将是输出:

+---+---+---+---+
|id |c1 |c2 |c3 |
+---+---+---+---+
|10 |a1 |a2 |a3 |
|11 |b1 |b2 |b3 |
|12 |c1 |c2 |c3 |
+---+---+---+---+
Run Code Online (Sandbox Code Playgroud)