Apache Spark的主键

Nho*_*hor 25 database postgresql hadoop apache-spark

我正在与Apache Spark和PostgreSQL建立JDBC连接,我想在我的数据库中插入一些数据.当我使用append模式时,我需要id为每个模式指定DataFrame.Row.Spark有什么方法可以创建主键吗?

zer*_*323 39

斯卡拉:

如果您只需要唯一的数字,则可以使用zipWithUniqueId并重新创建DataFrame.首先是一些导入和虚拟数据:

import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}

val df = sc.parallelize(Seq(
    ("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")
Run Code Online (Sandbox Code Playgroud)

提取架构以供进一步使用:

val schema = df.schema
Run Code Online (Sandbox Code Playgroud)

添加id字段:

val rows = df.rdd.zipWithUniqueId.map{
   case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}
Run Code Online (Sandbox Code Playgroud)

创建DataFrame:

val dfWithPK = sqlContext.createDataFrame(
  rows, StructType(StructField("id", LongType, false) +: schema.fields))
Run Code Online (Sandbox Code Playgroud)

Python中的相同内容:

from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType

row = Row("foo", "bar")
row_with_index = Row(*["id"] + df.columns)

df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()

def make_row(columns):
    def _make_row(row, uid):
        row_dict = row.asDict()
        return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
    return _make_row

f = make_row(df.columns)

df_with_pk = (df.rdd
    .zipWithUniqueId()
    .map(lambda x: f(*x))
    .toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))
Run Code Online (Sandbox Code Playgroud)

如果你喜欢连续编号,你可以替换它zipWithUniqueId,zipWithIndex但它有点贵.

直接使用DataFrameAPI:

(通用Scala,Python,Java,R语法几乎相同)

以前我错过了monotonicallyIncreasingId只要你不需要连续数字就能正常工作的功能:

import org.apache.spark.sql.functions.monotonicallyIncreasingId

df.withColumn("id", monotonicallyIncreasingId).show()
// +---+----+-----------+
// |foo| bar|         id|
// +---+----+-----------+
// |  a|-1.0|17179869184|
// |  b|-2.0|42949672960|
// |  c|-3.0|60129542144|
// +---+----+-----------+
Run Code Online (Sandbox Code Playgroud)

虽然有用monotonicallyIncreasingId是非确定性的.不仅id可能与执行不同,但是当后续操作包含过滤器时,不能使用额外的技巧来识别行.

注意:

也可以使用rowNumber窗口功能:

from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber

w = Window().orderBy()
df.withColumn("id", rowNumber().over(w)).show()
Run Code Online (Sandbox Code Playgroud)

不幸:

WARN窗口:没有为窗口操作定义的分区!将所有数据移动到单个分区,这可能会导致严重的性能下降.

因此,除非您有一种自然的方式对数据进行分区,并确保此时唯一性不是特别有用.


All*_*lyn 9

from pyspark.sql.functions import monotonically_increasing_id

df.withColumn("id", monotonically_increasing_id()).show()
Run Code Online (Sandbox Code Playgroud)

请注意,df.withColumn的第二个参数是monotonically_increasing_id()而不是monotonically_increasing_id.