PySpark DataFrames - 枚举的方式而不转换为Pandas?

Mar*_*iuk 14 python bigdata apache-spark rdd pyspark

我有一个非常大的pyspark.sql.dataframe.DataFrame,名为df.我需要一些枚举记录的方法 - 因此,能够访问具有特定索引的记录.(或选择索引范围的记录组)

在熊猫中,我可以做到

indexes=[2,3,6,7] 
df[indexes]
Run Code Online (Sandbox Code Playgroud)

在这里,我想要类似的东西,(并且不将数据帧转换为pandas)

我能得到的最接近的是:

  • 通过以下方式枚举原始数据框中的所有对象:

    indexes=np.arange(df.count())
    df_indexed=df.withColumn('index', indexes)
    
    Run Code Online (Sandbox Code Playgroud)
    • 使用where()函数搜索我需要的值.

问题:

  1. 为什么它不起作用以及如何使其工作?如何向数据框添加行?
  2. 它会在以后工作,如下所示:

     indexes=[2,3,6,7] 
     df1.where("index in indexes").collect()
    
    Run Code Online (Sandbox Code Playgroud)
  3. 有没有更快更简单的方法来处理它?

zer*_*323 12

它不起作用,因为:

  1. 第二个参数withColumn应该是Column不是集合.np.array不会在这里工作
  2. 当您"index in indexes"作为SQL表达式传递到where indexes超出范围并且它未被解析为有效标识符时

PySpark> = 1.4.0

您可以使用相应的窗口函数和查询使用Column.isin方法或正确格式化的查询字符串来添加行号:

from pyspark.sql.functions import col, rowNumber
from pyspark.sql.window import Window

w = Window.orderBy()
indexed = df.withColumn("index", rowNumber().over(w))

# Using DSL
indexed.where(col("index").isin(set(indexes)))

# Using SQL expression
indexed.where("index in ({0})".format(",".join(str(x) for x in indexes)))
Run Code Online (Sandbox Code Playgroud)

它看起来像没有PARTITION BY子句的窗口函数将所有数据移动到单个分区,所以上面可能不是最好的解决方案.

有没有更快更简单的方法来处理它?

并不是的.Spark DataFrames不支持随机行访问.

PairedRDD可以使用lookup相对较快的方法访问,如果数据是使用分区的话HashPartitioner.还有indexed-rdd项目,它支持高效的查找.

编辑:

独立于PySpark版本,您可以尝试这样的事情:

from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, LongType

row = Row("char")
row_with_index = Row("char", "index")

df = sc.parallelize(row(chr(x)) for x in range(97, 112)).toDF()
df.show(5)

## +----+
## |char|
## +----+
## |   a|
## |   b|
## |   c|
## |   d|
## |   e|
## +----+
## only showing top 5 rows

# This part is not tested but should work and save some work later
schema  = StructType(
    df.schema.fields[:] + [StructField("index", LongType(), False)])

indexed = (df.rdd # Extract rdd
    .zipWithIndex() # Add index
    .map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])) # Map to rows
    .toDF(schema)) # It will work without schema but will be more expensive

# inSet in Spark < 1.3
indexed.where(col("index").isin(indexes))
Run Code Online (Sandbox Code Playgroud)


Joe*_*ris 12

如果你想要一个保证不会碰撞但不需要的数字范围.over(partitionBy())你可以使用monotonicallyIncreasingId().

from pyspark.sql.functions import monotonicallyIncreasingId
df.select(monotonicallyIncreasingId().alias("rowId"),"*")
Run Code Online (Sandbox Code Playgroud)

请注意,值不是特别"整洁".每个分区都有一个值范围,输出不是连续的.例如0, 1, 2, 8589934592, 8589934593, 8589934594.

这是在2015年4月28日在Spark上添加到Spark:https://github.com/apache/spark/commit/d94cd1a733d5715792e6c4eac87f0d5c81aebbe2