我需要为包含许多列的数据表生成row_numbers的完整列表.
在SQL中,这将如下所示:
select
key_value,
col1,
col2,
col3,
row_number() over (partition by key_value order by col1, col2 desc, col3)
from
temp
;
Run Code Online (Sandbox Code Playgroud)
现在,让我们说在Spark中我有一个形式为(K,V)的RDD,其中V =(col1,col2,col3),所以我的条目就像
(key1, (1,2,3))
(key1, (1,4,7))
(key1, (2,2,3))
(key2, (5,5,5))
(key2, (5,5,9))
(key2, (7,5,5))
etc.
Run Code Online (Sandbox Code Playgroud)
我想使用sortBy(),sortWith(),sortByKey(),zipWithIndex等命令对它们进行排序,并使用正确的row_number创建一个新的RDD.
(key1, (1,2,3), 2)
(key1, (1,4,7), 1)
(key1, (2,2,3), 3)
(key2, (5,5,5), 1)
(key2, (5,5,9), 2)
(key2, (7,5,5), 3)
etc.
Run Code Online (Sandbox Code Playgroud)
(我不关心括号,所以表格也可以是(K,(col1,col2,col3,rownum))而不是)
我该怎么做呢?
这是我的第一次尝试:
val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3))
val temp1 = sc.parallelize(sample_data)
temp1.collect().foreach(println)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
// ((1,2),1,2,3)
// ((1,2),1,4,7)
// ((1,2),2,2,3) …Run Code Online (Sandbox Code Playgroud) 我正在与Apache Spark和PostgreSQL建立JDBC连接,我想在我的数据库中插入一些数据.当我使用append模式时,我需要id为每个模式指定DataFrame.Row.Spark有什么方法可以创建主键吗?
我使用monotonically_increasing_id()使用以下语法将行号分配给pyspark数据帧:
df1 = df1.withColumn("idx", monotonically_increasing_id())
Run Code Online (Sandbox Code Playgroud)
现在df1有26,572,528条记录.所以我期待idx值从0-26,572,527.
但是当我选择max(idx)时,它的值非常大:335,008,054,165.
这个功能发生了什么?使用此函数与具有相似记录数的其他数据集合并是否可靠?
我有大约300个数据帧,我想将它们组合成一个数据帧.因此,一个数据帧包含ID,而其他数据帧包含与行对应的不同记录
我想知道 PySpark 中与reset_index()pandas 中使用的命令的等效性。当使用默认命令(reset_index)时,如下:
data.reset_index()
Run Code Online (Sandbox Code Playgroud)
我收到错误:
“DataFrame”对象没有属性“reset_index”错误”
我有一个数据帧,列时间为a,b,c,d,val.我想创建一个带有附加列的数据框,它将包含每个组中行的行号,其中a,b,c,d是组键.
我尝试使用spark sql,通过定义一个窗口函数,特别是在sql中它看起来像这样:
select time, a,b,c,d,val, row_number() over(partition by a,b,c,d order by time) as rn from table
group by a,b,c,d,val
Run Code Online (Sandbox Code Playgroud)
我想在数据帧itslef上执行此操作,而不使用sparksql.
谢谢
在 pyspark 中,我有一个如下所示的数据框,其中根据 id 和 k1 的值对行进行排序。此外,每一行都有一个唯一的升序编号(rowid)。
-----------------------
rowid | id | k1 | k2 |
-----------------------
1 | 1 | v1 | l1 |
2 | 1 | v1 | v1 |
3 | 1 | v1 | l2 |
4 | 2 | v2 | v2 |
5 | 2 | v2 | l3 |
6 | 3 | v3 | l3 |
----------------------
Run Code Online (Sandbox Code Playgroud)
对于id的每个唯一值,我想计算k1==k2的第一行的rowid与观察到该id的记录的第一行对应的rowid的差+1,并存储结果在新列中(即排名)。输出应如下所示。
----------------
id | k1 |rank |
-----------------
1 | v1 | 2 | …Run Code Online (Sandbox Code Playgroud) 我有一个 DataFrame,我想添加一列不同的 uuid4() 行。我的代码:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import StringType
from uuid import uuid4
spark_session = SparkSession.builder.getOrCreate()
df = spark_session.createDataFrame([
[1, 1, 'teste'],
[2, 2, 'teste'],
[3, 0, 'teste'],
[4, 5, 'teste'],
],
list('abc'))
df = df.withColumn("_tmp", f.lit(1))
uuids = [str(uuid4()) for _ in range(df.count())]
df1 = spark_session.createDataFrame(uuids, StringType())
df1 = df_1.withColumn("_tmp", f.lit(1))
df2 = df.join(df_1, "_tmp", "inner").drop("_tmp")
df2.show()
Run Code Online (Sandbox Code Playgroud)
但我有这个错误:
Py4JJavaError: An error occurred while calling o1571.showString.
: org.apache.spark.sql.AnalysisException: Detected implicit cartesian …Run Code Online (Sandbox Code Playgroud) 我想在pyspark数据框中从指定的数字开始创建具有连续数字的列。例如,我想将A列添加到我的数据帧df中,该列将从5开始到数据帧的长度,递增 1,因此5、6、7,...,长度( df )。
使用pyspark方法的一些简单解决方案?
下面是在数据帧中添加序列号列的逻辑.当我从分隔文件中读取数据时,它按预期工作.今天我有一个新任务从oracle表读取数据并添加序列号和进一步处理.当我从oracle表中读取它时,我面临着以下逻辑的问题,在数据帧中添加序列号.
oracleTableDF是我的数据帧
//creating Sequence no. logic for SeqNum
val rowRDD = oracleTableDF.rdd.zipWithIndex().map(indexedRow => Row.fromSeq((((indexedRow._2.toLong+1)).toLong) +: indexedRow._1.toSeq))
//creating StructType to add Seqnum in schema
val newstructure = StructType(Array(StructField("SeqNum",LongType)).++(oracleTableDF.schema.fields))
//creating new Data Frame with seqnum
oracleTableDF = spark.createDataFrame(rowRDD, newstructure)
Run Code Online (Sandbox Code Playgroud)
我无法找到实际问题.因为当我从文件中读取逻辑时,逻辑在集群中按预期工作.但是当我从oracle表中读到它时面临一些问题.它在本地模式下也按预期工作.
以下是错误:
"ERROR scheduler.TaskSetManager:阶段1.0中的任务0失败4次;中止作业org.apache.spark.SparkException:作业因阶段失败而中止:阶段1.0中的任务0失败4次,最近失败:阶段丢失任务0.3 1.0(TID 4,xxxx,executor 1):java.lang.NoClassDefFoundError:无法初始化类oracleDataProcess $"
apache-spark ×6
pyspark ×6
python ×5
python-3.x ×2
database ×1
dataframe ×1
hadoop ×1
indexing ×1
merge ×1
postgresql ×1
pyspark-sql ×1
rdd ×1
row-number ×1
scala ×1
sql ×1