Lia*_*gpi 12 scala dataframe apache-spark apache-spark-sql
我从csv文件中读取数据,但没有索引.
我想在1到行的数字中添加一列.
我该怎么办,谢谢(scala)
Oma*_*r14 30
使用Scala,您可以使用:
import org.apache.spark.sql.functions._
df.withColumn("id",monotonicallyIncreasingId)
Run Code Online (Sandbox Code Playgroud)
使用Pyspark,您可以使用:
from pyspark.sql.functions import monotonically_increasing_id
df_index = df.select("*").withColumn("id", monotonically_increasing_id())
Run Code Online (Sandbox Code Playgroud)
小智 29
monotonically_increasing_id - 保证生成的ID单调递增且唯一,但不是连续的.
"我想在1到行的数字中添加一列."
假设我们有以下DF
+--------+-------------+-------+ | userId | productCode | count | +--------+-------------+-------+ | 25 | 6001 | 2 | | 11 | 5001 | 8 | | 23 | 123 | 5 | +--------+-------------+-------+
生成从1开始的ID
val w = Window.orderBy("count")
val result = df.withColumn("index", row_number().over(w))
Run Code Online (Sandbox Code Playgroud)
这将添加通过增加count值排序的索引列.
+--------+-------------+-------+-------+ | userId | productCode | count | index | +--------+-------------+-------+-------+ | 25 | 6001 | 2 | 1 | | 23 | 123 | 5 | 2 | | 11 | 5001 | 8 | 3 | +--------+-------------+-------+-------+
Seq*_*nex 17
如何获得一个连续的 id 列id[1, 2, 3, 4...n]:
from pyspark.sql.functions import desc, row_number, monotonically_increasing_id
from pyspark.sql.window import Window
df_with_seq_id = df.withColumn('index_column_name', row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)
Run Code Online (Sandbox Code Playgroud)
请注意,row_number() 从 1 开始,因此如果您想要 0 索引列,则减去 1
Ram*_*ram 10
注意:以上方法没有给出序列号,但是却给出了递增的id。
做到这一点并确保索引顺序的简单方法如下。 zipWithIndex。
样本数据。
+-------------------+
| Name|
+-------------------+
| Ram Ghadiyaram|
| Ravichandra|
| ilker|
| nick|
| Naveed|
| Gobinathan SP|
|Sreenivas Venigalla|
| Jackela Kowski|
| Arindam Sengupta|
| Liangpi|
| Omar14|
| anshu kumar|
+-------------------+
Run Code Online (Sandbox Code Playgroud)
package com.example
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row}
/**
* DistributedDataIndex : Program to index an RDD with
*/
object DistributedDataIndex extends App with Logging {
val spark = builder
.master("local[*]")
.appName(this.getClass.getName)
.getOrCreate()
import spark.implicits._
val df = spark.sparkContext.parallelize(
Seq("Ram Ghadiyaram", "Ravichandra", "ilker", "nick"
, "Naveed", "Gobinathan SP", "Sreenivas Venigalla", "Jackela Kowski", "Arindam Sengupta", "Liangpi", "Omar14", "anshu kumar"
)).toDF("Name")
df.show
logInfo("addColumnIndex here")
// Add index now...
val df1WithIndex = addColumnIndex(df)
.withColumn("monotonically_increasing_id", monotonically_increasing_id)
df1WithIndex.show(false)
/**
* Add Column Index to dataframe to each row
*/
def addColumnIndex(df: DataFrame) = {
spark.sqlContext.createDataFrame(
df.rdd.zipWithIndex.map {
case (row, index) => Row.fromSeq(row.toSeq :+ index)
},
// Create schema for index column
StructType(df.schema.fields :+ StructField("index", LongType, false)))
}
}
Run Code Online (Sandbox Code Playgroud)
结果:
+-------------------+-----+---------------------------+
|Name |index|monotonically_increasing_id|
+-------------------+-----+---------------------------+
|Ram Ghadiyaram |0 |0 |
|Ravichandra |1 |8589934592 |
|ilker |2 |8589934593 |
|nick |3 |17179869184 |
|Naveed |4 |25769803776 |
|Gobinathan SP |5 |25769803777 |
|Sreenivas Venigalla|6 |34359738368 |
|Jackela Kowski |7 |42949672960 |
|Arindam Sengupta |8 |42949672961 |
|Liangpi |9 |51539607552 |
|Omar14 |10 |60129542144 |
|anshu kumar |11 |60129542145 |
+-------------------+-----+---------------------------+
Run Code Online (Sandbox Code Playgroud)