如何选择最后一行以及如何通过索引访问PySpark数据帧?

Sat*_*tya 9 python apache-spark apache-spark-sql pyspark pyspark-sql

来自PySpark的SQL数据帧

name age city
abc   20  A
def   30  B
Run Code Online (Sandbox Code Playgroud)

如何获取最后一行.(如df.limit(1)我可以将第一行数据帧放入新数据帧中).

我如何通过index.like row no访问数据帧行.12或200.

在熊猫我能做到

df.tail(1) # for last row
df.ix[rowno or index] # by index
df.loc[] or by df.iloc[]
Run Code Online (Sandbox Code Playgroud)

我只是好奇如何以这种方式或替代方式访问pyspark数据帧.

谢谢

zer*_*323 5

如何获得最后一行.

假设所有列都可以订阅的冗长而丑陋的方式:

from pyspark.sql.functions import (
    col, max as max_, struct, monotonically_increasing_id
)

last_row = (df
    .withColumn("_id", monotonically_increasing_id())
    .select(max(struct("_id", *df.columns))
    .alias("tmp")).select(col("tmp.*"))
    .drop("_id"))
Run Code Online (Sandbox Code Playgroud)

如果不是所有列都可以订购,您可以尝试:

with_id = df.withColumn("_id", monotonically_increasing_id())
i = with_id.select(max_("_id")).first()[0]

with_id.where(col("_id") == i).drop("_id")
Run Code Online (Sandbox Code Playgroud)

注意.lastpyspark.sql.functions/`oassql.functions中有函数但是考虑到相应表达式的描述,这里不是一个好的选择.

如何通过index.like访问数据帧行

你不能.Spark DataFrame并可通过索引访问.您可以zipWithIndex稍后使用和过滤添加索引.请记住这个O(N)操作.


Dan*_*kyy 5

如何获得最后一行。

如果您有可用于对数据框进行排序的列,例如“索引”,那么一种获取最后一条记录的简单方法是使用SQL:1)按降序对表进行排序,以及2)从该顺序中获取第一个值

df.createOrReplaceTempView("table_df")
query_latest_rec = """SELECT * FROM table_df ORDER BY index DESC limit 1"""
latest_rec = self.sqlContext.sql(query_latest_rec)
latest_rec.show()
Run Code Online (Sandbox Code Playgroud)

以及如何按索引访问数据框行。12或200。

您可以在任何行中获取记录的类似方式

row_number = 12
df.createOrReplaceTempView("table_df")
query_latest_rec = """SELECT * FROM (select * from table_df ORDER BY index ASC limit {0}) ord_lim ORDER BY index DESC limit 1"""
latest_rec = self.sqlContext.sql(query_latest_rec.format(row_number))
latest_rec.show()
Run Code Online (Sandbox Code Playgroud)

如果没有“索引”列,则可以使用

from pyspark.sql.functions import monotonically_increasing_id

df = df.withColumn("index", monotonically_increasing_id())
Run Code Online (Sandbox Code Playgroud)


Hen*_*cio 5

from pyspark.sql import functions as F

expr = [F.last(col).alias(col) for col in df.columns]

df.agg(*expr)
Run Code Online (Sandbox Code Playgroud)

只是一个提示:看起来您仍然有与熊猫或R合作的人的心态。Spark是我们处理数据的方式的另一范式。您不再需要访问单个单元格中的数据,现在您可以处理整个数据块。如果您像以前一样继续收集东西并采取行动,您将失去火花提供的整个并行性概念。看一下Spark中的转换与动作的概念。

  • @yeliabsalohcin Spark 是一个处理时间序列的好工具。但是,在这个问题中,用户想知道一种使用类似于 pandas 的索引来访问数据的方法,我的意思是,您应该注意 Spark 的工作方式与 pandas 或 R 不同。 (2认同)