如何遍历pyspark中的每一行dataFrame

Art*_*rde 40 for-loop dataframe apache-spark apache-spark-sql pyspark

例如

sqlContext = SQLContext(sc)

sample=sqlContext.sql("select Name ,age ,city from user")
sample.show()
Run Code Online (Sandbox Code Playgroud)

上面的语句在终端上打印整个表,但我想使用for或while访问该表中的每一行以执行进一步的计算.

Dav*_*vid 38

您将定义自定义函数并使用映射.

def customFunction(row):

   return (row.name, row.age, row.city)

sample2 = sample.rdd.map(customFunction)
Run Code Online (Sandbox Code Playgroud)

要么

sample2 = sample.rdd.map(lambda x: (x.name, x.age, x.city))
Run Code Online (Sandbox Code Playgroud)

然后,自定义函数将应用于数据帧的每一行.请注意,sample2将是a RDD,而不是数据帧.

如果要执行更复杂的计算,则需要映射.如果您只需要添加派生列,则可以使用withColumn,返回数据帧.

sample3 = sample.withColumn('age2', sample.age + 2)
Run Code Online (Sandbox Code Playgroud)


zer*_*323 34

你根本做不到.DataFrames与其他分布式数据结构相同,不可迭代,只能使用专用的高阶函数和/或SQL方法进行访问.

你当然可以 collect

for row in df.rdd.collect():
    do_something(row)
Run Code Online (Sandbox Code Playgroud)

或转换 toLocalIterator

for row in df.rdd.toLocalIterator():
    do_something(row)
Run Code Online (Sandbox Code Playgroud)

并如上所示在本地迭代,但它胜过使用Spark的所有目的.

  • 有时候有必要......打破规则.谢谢你的解决方案.它对我有用. (4认同)
  • “它超越了使用 Spark 的所有目的”是相当强烈且主观的语言。“collect()”方法的存在是有原因的,并且它有许多有效的用例。Spark 处理完数据后,迭代最终结果可能是与外部 API 或遗留系统集成/写入的唯一方法。 (3认同)
  • 做了一些阅读,看起来用“where()”形成一个新的数据帧将是正确执行此操作的 Spark 方式。 (2认同)

aar*_*ers 8

使用python中的列表推导,您只需使用两行就可以将整列值收集到一个列表中:

df = sqlContext.sql("show tables in default")
tableList = [x["tableName"] for x in df.rdd.collect()]
Run Code Online (Sandbox Code Playgroud)

在上面的例子中,我们返回数据库'default'中的表列表,但是可以通过替换sql()中使用的查询来调整相同的表.

或者更简略:

tableList = [x["tableName"] for x in sqlContext.sql("show tables in default").rdd.collect()]
Run Code Online (Sandbox Code Playgroud)

对于三列的示例,我们可以创建一个字典列表,然后在for循环中迭代它们.

sql_text = "select name, age, city from user"
tupleList = [{name:x["name"], age:x["age"], city:x["city"]} 
             for x in sqlContext.sql(sql_text).rdd.collect()]
for row in tupleList:
    print("{} is a {} year old from {}".format(
        row["name"],
        row["age"],
        row["city"]))
Run Code Online (Sandbox Code Playgroud)


小智 8

这可能不是最佳实践,但您可以简单地使用 定位特定列collect(),将其导出为行列表,然后循环访问该列表。

假设这是你的 df:

+----------+----------+-------------------+-----------+-----------+------------------+ 
|      Date|  New_Date|      New_Timestamp|date_sub_10|date_add_10|time_diff_from_now|
+----------+----------+-------------------+-----------+-----------+------------------+ 
|2020-09-23|2020-09-23|2020-09-23 00:00:00| 2020-09-13| 2020-10-03| 51148            | 
|2020-09-24|2020-09-24|2020-09-24 00:00:00| 2020-09-14| 2020-10-04| -35252           |
|2020-01-25|2020-01-25|2020-01-25 00:00:00| 2020-01-15| 2020-02-04| 20963548         |
|2020-01-11|2020-01-11|2020-01-11 00:00:00| 2020-01-01| 2020-01-21| 22173148         |
+----------+----------+-------------------+-----------+-----------+------------------+
Run Code Online (Sandbox Code Playgroud)

循环遍历日期列中的行:

rows = df3.select('Date').collect()

final_list = []
for i in rows:
    final_list.append(i[0])

print(final_list)
Run Code Online (Sandbox Code Playgroud)


小智 6

像这样试一试

    result = spark.createDataFrame([('SpeciesId','int'), ('SpeciesName','string')],["col_name", "data_type"]); 
    for f in result.collect(): 
        print (f.col_name)
Run Code Online (Sandbox Code Playgroud)