小编pau*_*ult的帖子

对组内的 pyspark 数据帧进行排序

我想对"time"每个"id"组内的列进行排序。

数据如下:

id  time  name
132  12   Lucy
132  10   John
132  15   Sam
78   11   Kate
78   7    Julia
78   2    Vivien
245  22   Tom
Run Code Online (Sandbox Code Playgroud)

我想得到这个:

id  time  name
132  10   John
132  12   Lucy
132  15   Sam
78   2    Vivien
78   7    Julia
78   11   Kate
245  22   Tom
Run Code Online (Sandbox Code Playgroud)

我试过

df.orderby(['id','time'])
Run Code Online (Sandbox Code Playgroud)

但我不需要排序"id"

我有两个问题:

  1. "time"我可以在相同的范围内排序"id"吗?如何?
  2. "time"如果我只是排序会比使用orderby()对两列进行排序更有效吗?

apache-spark apache-spark-sql pyspark

8
推荐指数
1
解决办法
7787
查看次数

如何计算PySpark DataFrame的平均值和标准差?

我调用的PySpark DataFrame(不是pandas)df非常大collect().因此,下面给出的代码效率不高.它使用的是少量数据,但现在却失败了.

import numpy as np

myList = df.collect()
total = []
for product,nb in myList:
    for p2,score in nb:
            total.append(score)
mean = np.mean(total)
std = np.std(total)
Run Code Online (Sandbox Code Playgroud)

有没有办法通过使用或类似获得meanstd作为两个变量pyspark.sql.functions

from pyspark.sql.functions import mean as mean_, std as std_
Run Code Online (Sandbox Code Playgroud)

withColumn但是,我可以使用这种方法逐行应用计算,并且它不返回单个变量.

更新:

样本内容df:

+----------+------------------+
|product_PK|          products|
+----------+------------------+
|       680|[[691,1], [692,5]]|
|       685|[[691,2], [692,2]]|
|       684|[[691,1], [692,3]]|
Run Code Online (Sandbox Code Playgroud)

我应计算的平均值和标准偏差score值,例如值1[691,1]的分数之一.

python apache-spark apache-spark-sql pyspark

7
推荐指数
1
解决办法
3万
查看次数

PySpark - 逐行转换为 JSON

我有一个非常大的 pyspark 数据框。我需要将数据帧转换为每一行的 JSON 格式的字符串,然后将该字符串发布到 Kafka 主题。我最初使用了以下代码。

for message in df.toJSON().collect():
        kafkaClient.send(message) 
Run Code Online (Sandbox Code Playgroud)

但是,数据框非常大,因此在尝试collect().

我正在考虑使用 aUDF因为它逐行处理它。

from pyspark.sql.functions import udf, struct

def get_row(row):
    json = row.toJSON()
    kafkaClient.send(message) 
    return "Sent"

send_row_udf = F.udf(get_row, StringType())
df_json = df.withColumn("Sent", get_row(struct([df[x] for x in df.columns])))
df_json.select("Sent").show()
Run Code Online (Sandbox Code Playgroud)

但是我收到一个错误,因为列被输入到函数而不是行。

出于说明目的,我们可以使用下面的 df,我们可以假设必须发送 Col1 和 Col2。

df= spark.createDataFrame([("A", 1), ("B", 2), ("D", 3)],["Col1", "Col2"])
Run Code Online (Sandbox Code Playgroud)

每行的 JSON 字符串:

'{"Col1":"A","Col2":1}'
'{"Col1":"B","Col2":2}'
'{"Col1":"D","Col2":3}'
Run Code Online (Sandbox Code Playgroud)

python json pyspark spark-dataframe

7
推荐指数
1
解决办法
1万
查看次数

计算SPARKSQL中重复行的数量

我有要求我需要在Hive表中计算SparkSQL中重复行的数量。

from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
from pyspark.sql.types import *
from pyspark.sql import Row
app_name="test"
conf = SparkConf().setAppName(app_name)
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
df = sqlContext.sql("select * from  DV_BDFRAWZPH_NOGBD_R000_SG.employee")
Run Code Online (Sandbox Code Playgroud)

到目前为止,我已经对表名进行了硬编码,但实际上它是作为参数来的。话虽这么说,我们也不知道列数或它们的名称。在python pandas中,我们有类似df.duplicated.sum()之类的东西来计算重复记录的数量。我们这里有这样的东西吗?

+---+---+---+
| 1 | A | B |
+---+---+---+
| 1 | A | B |
+---+---+---+
| 2 | B | E |
+---+---+---+
| 2 | B | E |
+---+---+---+
| 3 | D | G |
+---+---+---+
| 4 | …
Run Code Online (Sandbox Code Playgroud)

apache-spark-sql pyspark spark-dataframe pyspark-sql

7
推荐指数
1
解决办法
5587
查看次数

对训练数据进行拟合变换并对测试数据进行变换

我无法理解究竟如何transform()fit_transform()正在一起.

我之后调用fit_transform()我的训练数据集和transform()我的测试集.

但是,如果我打电话fit_transform()给测试集,我会得到不好的结果.

任何人都可以解释一下这是怎么发生的?

python scikit-learn

7
推荐指数
1
解决办法
3437
查看次数

Pyspark Dataframe:获取满足条件的上一行

对于 PySpark DataFrame 中的每一行,我试图从满足特定条件的第一行获取值:

那就是如果我的数据框看起来像这样:

X  | Flag
1  | 1
2  | 0
3  | 0
4  | 0
5  | 1
6  | 0
7  | 0
8  | 0
9  | 1
10 | 0
Run Code Online (Sandbox Code Playgroud)

我想要看起来像这样的输出:

X  | Lag_X | Flag
1  | NULL  | 1
2  | 1     | 0
3  | 1     | 0
4  | 1     | 0
5  | 1     | 1
6  | 5     | 0
7  | 5     | 0
8 …
Run Code Online (Sandbox Code Playgroud)

python pyspark spark-dataframe pyspark-sql

7
推荐指数
1
解决办法
6494
查看次数

获取Spark DataFrame中两个日期之间的所有日期

我有一个DF,我有bookingDtarrivalDt列.我需要找到这两个日期之间的所有日期.

示例代码:

df = spark.sparkContext.parallelize(
            [Row(vyge_id=1000, bookingDt='2018-01-01', arrivalDt='2018-01-05')]).toDF()
diffDaysDF = df.withColumn("diffDays", datediff('arrivalDt', 'bookingDt'))
diffDaysDF.show()
Run Code Online (Sandbox Code Playgroud)

代码输出:

+----------+----------+-------+--------+
| arrivalDt| bookingDt|vyge_id|diffDays|
+----------+----------+-------+--------+
|2018-01-05|2018-01-01|   1000|       4|
+----------+----------+-------+--------+
Run Code Online (Sandbox Code Playgroud)

我尝试的是找到两个日期之间的天数,并使用timedelta函数计算所有日期explode.

dateList = [str(bookingDt + timedelta(i)) for i in range(diffDays)]
Run Code Online (Sandbox Code Playgroud)

预期产量:

基本上,我需要建立一个DF与对之间的每个日的记录bookingDtarrivalDt,包容性.

+----------+----------+-------+----------+
| arrivalDt| bookingDt|vyge_id|txnDt     |
+----------+----------+-------+----------+
|2018-01-05|2018-01-01|   1000|2018-01-01|
+----------+----------+-------+----------+
|2018-01-05|2018-01-01|   1000|2018-01-02|
+----------+----------+-------+----------+
|2018-01-05|2018-01-01|   1000|2018-01-03|
+----------+----------+-------+----------+
|2018-01-05|2018-01-01|   1000|2018-01-04|
+----------+----------+-------+----------+
|2018-01-05|2018-01-01|   1000|2018-01-05|
+----------+----------+-------+----------+
Run Code Online (Sandbox Code Playgroud)

apache-spark-sql pyspark

7
推荐指数
3
解决办法
6595
查看次数

求pyspark数组的均值<double>

在pyspark中,我有一个可变长度的double数组,我希望找到其均值。但是,平均值函数需要单个数字类型。

有没有一种方法可以找到一个数组的平均值而不分解该数组?我有几个不同的数组,我希望能够执行以下操作:

df.select(col("Segment.Points.trajectory_points.longitude"))
Run Code Online (Sandbox Code Playgroud)

DataFrame [经度:数组]

df.select(avg(col("Segment.Points.trajectory_points.longitude"))).show()
Run Code Online (Sandbox Code Playgroud)
org.apache.spark.sql.AnalysisException: cannot resolve
'avg(Segment.Points.trajectory_points.longitude)' due to data type
mismatch: function average requires numeric types, not
ArrayType(DoubleType,true);;
Run Code Online (Sandbox Code Playgroud)

如果我有3个具有以下数组的唯一记录,我希望将这些值的平均值作为输出。这将是3个平均经度值。

输入:

[Row(longitude=[-80.9, -82.9]),
 Row(longitude=[-82.92, -82.93, -82.94, -82.96, -82.92, -82.92]),
 Row(longitude=[-82.93, -82.93])]
Run Code Online (Sandbox Code Playgroud)

输出:

-81.9,
-82.931,
-82.93
Run Code Online (Sandbox Code Playgroud)

我正在使用Spark版本2.1.3。


爆炸解决方案:

因此,我已经通过爆炸实现了这一目标,但我希望避免这一步。这就是我所做的

from pyspark.sql.functions import col
import pyspark.sql.functions as F

longitude_exp = df.select(
    col("ID"), 
    F.posexplode("Segment.Points.trajectory_points.longitude").alias("pos", "longitude")
)

longitude_reduced = long_exp.groupBy("ID").agg(avg("longitude"))
Run Code Online (Sandbox Code Playgroud)

这成功地取了意思。但是,由于我将在几列中执行此操作,因此必须将同一DF爆炸几次。我将继续努力,以找到一种更清洁的方式来完成此任务。

apache-spark apache-spark-sql pyspark

7
推荐指数
2
解决办法
530
查看次数

是否可以对带有reduce的列表进行排序?

这是我做的练习。我当然可以通过使用sorted()或Python标准库中的其他方式对列表进行排序,但是在这种情况下我不能。我认为我只应该使用reduce()

from functools import reduce
arr = [17, 2, 3, 6, 1, 3, 1, 9, 5, 3]
sorted_arr = reduce(lambda a,b : (b,a) if a > b else (a,b), arr)
Run Code Online (Sandbox Code Playgroud)

我得到的错误:

TypeError: '>' not supported between instances of 'tuple' and 'int'
Run Code Online (Sandbox Code Playgroud)

这是可以预期的,因为我的reduce函数将一个元组插入到int数组中,而不是2个单独的整数。然后将元组与一个整数进行比较...

有没有一种方法可以将2个数字插入列表,然后仅对列表中的第二个数字运行该函数?还是一种使用reduce()交换数字的方法?

文档对reduce函数几乎没有说什么,所以我现在没有想法。 https://docs.python.org/3/library/functools.html?highlight=reduce#functools.reduce

python sorting python-3.x

7
推荐指数
1
解决办法
319
查看次数

如何在 PySpark 2.x 中使用修剪?

代码是:

from pyspark.sql import functions as F
df = df.select(F.trim("MyColumn"))
Run Code Online (Sandbox Code Playgroud)

错误是:

Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.trim. Trace:
py4j.Py4JException: Method trim([class java.lang.String]) does not exist
Run Code Online (Sandbox Code Playgroud)

trimPySpark 2.x 中已弃用吗?我不明白为什么它不起作用,而同一命名空间中的其他一些函数却工作得很好

apache-spark-sql pyspark

6
推荐指数
1
解决办法
3382
查看次数