相关疑难解决方法(0)

如何获得Spark RDD的SQL row_number等价物?

我需要为包含许多列的数据表生成row_numbers的完整列表.

在SQL中,这将如下所示:

select
   key_value,
   col1,
   col2,
   col3,
   row_number() over (partition by key_value order by col1, col2 desc, col3)
from
   temp
;
Run Code Online (Sandbox Code Playgroud)

现在,让我们说在Spark中我有一个形式为(K,V)的RDD,其中V =(col1,col2,col3),所以我的条目就像

(key1, (1,2,3))
(key1, (1,4,7))
(key1, (2,2,3))
(key2, (5,5,5))
(key2, (5,5,9))
(key2, (7,5,5))
etc.
Run Code Online (Sandbox Code Playgroud)

我想使用sortBy(),sortWith(),sortByKey(),zipWithIndex等命令对它们进行排序,并使用正确的row_number创建一个新的RDD.

(key1, (1,2,3), 2)
(key1, (1,4,7), 1)
(key1, (2,2,3), 3)
(key2, (5,5,5), 1)
(key2, (5,5,9), 2)
(key2, (7,5,5), 3)
etc.
Run Code Online (Sandbox Code Playgroud)

(我不关心括号,所以表格也可以是(K,(col1,col2,col3,rownum))而不是)

我该怎么做呢?

这是我的第一次尝试:

val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3))

val temp1 = sc.parallelize(sample_data)

temp1.collect().foreach(println)

// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
// ((1,2),1,2,3)
// ((1,2),1,4,7)
// ((1,2),2,2,3) …
Run Code Online (Sandbox Code Playgroud)

sql row-number apache-spark rdd

25
推荐指数
2
解决办法
3万
查看次数

Apache Spark的主键

我正在与Apache Spark和PostgreSQL建立JDBC连接,我想在我的数据库中插入一些数据.当我使用append模式时,我需要id为每个模式指定DataFrame.Row.Spark有什么方法可以创建主键吗?

database postgresql hadoop apache-spark

25
推荐指数
2
解决办法
2万
查看次数

使用monotonically_increasing_id()将行号分配给pyspark数据帧

我使用monotonically_increasing_id()使用以下语法将行号分配给pyspark数据帧:

df1 = df1.withColumn("idx", monotonically_increasing_id())
Run Code Online (Sandbox Code Playgroud)

现在df1有26,572,528条记录.所以我期待idx值从0-26,572,527.

但是当我选择max(idx)时,它的值非常大:335,008,054,165.

这个功能发生了什么?使用此函数与具有相似记录数的其他数据集合并是否可靠?

我有大约300个数据帧,我想将它们组合成一个数据帧.因此,一个数据帧包含ID,而其他数据帧包含与行对应的不同记录

python indexing merge pyspark

21
推荐指数
2
解决办法
3万
查看次数

如何在Python中使用Pyspark等效的reset_index()

我想知道 PySpark 中与reset_index()pandas 中使用的命令的等效性。当使用默认命令(reset_index)时,如下:

data.reset_index()
Run Code Online (Sandbox Code Playgroud)

我收到错误:

“DataFrame”对象没有属性“reset_index”错误”

python python-3.x apache-spark apache-spark-sql pyspark

7
推荐指数
1
解决办法
5946
查看次数

pyspark行号数据帧

我有一个数据帧,列时间为a,b,c,d,val.我想创建一个带有附加列的数据框,它将包含每个组中行的行号,其中a,b,c,d是组键.

我尝试使用spark sql,通过定义一个窗口函数,特别是在sql中它看起来像这样:

select time, a,b,c,d,val, row_number() over(partition by a,b,c,d order by     time) as rn from table
group by a,b,c,d,val
Run Code Online (Sandbox Code Playgroud)

我想在数据帧itslef上执行此操作,而不使用sparksql.

谢谢

python apache-spark apache-spark-sql pyspark

6
推荐指数
1
解决办法
2万
查看次数

使用搜索和条件查找两列值之间的差异

在 pyspark 中,我有一个如下所示的数据框,其中根据 id 和 k1 的值对行进行排序。此外,每一行都有一个唯一的升序编号(rowid)。

-----------------------
rowid | id | k1  | k2 |
-----------------------
1     | 1  | v1 | l1  |
2     | 1  | v1 | v1  |
3     | 1  | v1 | l2  |
4     | 2  | v2 | v2  |
5     | 2  | v2 | l3  |
6     | 3  | v3 | l3  |
----------------------
Run Code Online (Sandbox Code Playgroud)

对于id的每个唯一值,我想计算k1==k2的第一行的rowid与观察到该id的记录的第一行对应的rowid的差+1,并存储结果在新列中(即排名)。输出应如下所示。

----------------
 id | k1  |rank |
-----------------
 1  | v1  | 2   | …
Run Code Online (Sandbox Code Playgroud)

python pyspark pyspark-sql

5
推荐指数
1
解决办法
594
查看次数

在pyspark中添加UUID的有效方法

我有一个 DataFrame,我想添加一列不同的 uuid4() 行。我的代码:

from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import StringType

from uuid import uuid4

spark_session = SparkSession.builder.getOrCreate()

df = spark_session.createDataFrame([
        [1, 1, 'teste'],
        [2, 2, 'teste'],
        [3, 0, 'teste'],
        [4, 5, 'teste'],
    ],
    list('abc'))


df = df.withColumn("_tmp", f.lit(1))

uuids = [str(uuid4()) for _ in range(df.count())]
df1 = spark_session.createDataFrame(uuids, StringType())
df1 = df_1.withColumn("_tmp", f.lit(1))


df2 = df.join(df_1, "_tmp", "inner").drop("_tmp")
df2.show()
Run Code Online (Sandbox Code Playgroud)

但我有这个错误:

Py4JJavaError: An error occurred while calling o1571.showString.
: org.apache.spark.sql.AnalysisException: Detected implicit cartesian …
Run Code Online (Sandbox Code Playgroud)

python-3.x apache-spark pyspark

5
推荐指数
1
解决办法
2686
查看次数

如何在 pyspark 数据框中创建连续数字列?

我想在pyspark数据框中从指定的数字开始创建具有连续数字的列。例如,我想将A列添加到我的数据帧df中,该列将从5开始到数据帧的长度,递增 1,因此567,...,长度( df )。

使用pyspark方法的一些简单解决方案?

python sequential-number dataframe pyspark

4
推荐指数
2
解决办法
2万
查看次数

在dataframe usnig scala中添加序列号列

下面是在数据帧中添加序列号列的逻辑.当我从分隔文件中读取数据时,它按预期工作.今天我有一个新任务从oracle表读取数据并添加序列号和进​​一步处理.当我从oracle表中读取它时,我面临着以下逻辑的问题,在数据帧中添加序列号.

oracleTableDF是我的数据帧

   //creating Sequence no. logic for SeqNum
   val rowRDD = oracleTableDF.rdd.zipWithIndex().map(indexedRow => Row.fromSeq((((indexedRow._2.toLong+1)).toLong) +: indexedRow._1.toSeq)) 

  //creating StructType to add Seqnum in schema
        val newstructure = StructType(Array(StructField("SeqNum",LongType)).++(oracleTableDF.schema.fields))

  //creating new Data Frame with seqnum
  oracleTableDF = spark.createDataFrame(rowRDD, newstructure)
Run Code Online (Sandbox Code Playgroud)

我无法找到实际问题.因为当我从文件中读取逻辑时,逻辑在集群中按预期工作.但是当我从oracle表中读到它时面临一些问题.它在本地模式下也按预期工作.

以下是错误:

"ERROR scheduler.TaskSetManager:阶段1.0中的任务0失败4次;中止作业org.apache.spark.SparkException:作业因阶段失败而中止:阶段1.0中的任务0失败4次,最近失败:阶段丢失任务0.3 1.0(TID 4,xxxx,executor 1):java.lang.NoClassDefFoundError:无法初始化类oracleDataProcess $"

scala apache-spark

3
推荐指数
1
解决办法
5854
查看次数