我想对"time"每个"id"组内的列进行排序。
数据如下:
id time name
132 12 Lucy
132 10 John
132 15 Sam
78 11 Kate
78 7 Julia
78 2 Vivien
245 22 Tom
Run Code Online (Sandbox Code Playgroud)
我想得到这个:
id time name
132 10 John
132 12 Lucy
132 15 Sam
78 2 Vivien
78 7 Julia
78 11 Kate
245 22 Tom
Run Code Online (Sandbox Code Playgroud)
我试过
df.orderby(['id','time'])
Run Code Online (Sandbox Code Playgroud)
但我不需要排序"id"。
我有两个问题:
"time"我可以在相同的范围内排序"id"吗?如何?"time"如果我只是排序会比使用orderby()对两列进行排序更有效吗?我调用的PySpark DataFrame(不是pandas)df非常大collect().因此,下面给出的代码效率不高.它使用的是少量数据,但现在却失败了.
import numpy as np
myList = df.collect()
total = []
for product,nb in myList:
for p2,score in nb:
total.append(score)
mean = np.mean(total)
std = np.std(total)
Run Code Online (Sandbox Code Playgroud)
有没有办法通过使用或类似获得mean和std作为两个变量pyspark.sql.functions?
from pyspark.sql.functions import mean as mean_, std as std_
Run Code Online (Sandbox Code Playgroud)
withColumn但是,我可以使用这种方法逐行应用计算,并且它不返回单个变量.
更新:
样本内容df:
+----------+------------------+
|product_PK| products|
+----------+------------------+
| 680|[[691,1], [692,5]]|
| 685|[[691,2], [692,2]]|
| 684|[[691,1], [692,3]]|
Run Code Online (Sandbox Code Playgroud)
我应计算的平均值和标准偏差score值,例如值1中[691,1]的分数之一.
我有一个非常大的 pyspark 数据框。我需要将数据帧转换为每一行的 JSON 格式的字符串,然后将该字符串发布到 Kafka 主题。我最初使用了以下代码。
for message in df.toJSON().collect():
kafkaClient.send(message)
Run Code Online (Sandbox Code Playgroud)
但是,数据框非常大,因此在尝试collect().
我正在考虑使用 aUDF因为它逐行处理它。
from pyspark.sql.functions import udf, struct
def get_row(row):
json = row.toJSON()
kafkaClient.send(message)
return "Sent"
send_row_udf = F.udf(get_row, StringType())
df_json = df.withColumn("Sent", get_row(struct([df[x] for x in df.columns])))
df_json.select("Sent").show()
Run Code Online (Sandbox Code Playgroud)
但是我收到一个错误,因为列被输入到函数而不是行。
出于说明目的,我们可以使用下面的 df,我们可以假设必须发送 Col1 和 Col2。
df= spark.createDataFrame([("A", 1), ("B", 2), ("D", 3)],["Col1", "Col2"])
Run Code Online (Sandbox Code Playgroud)
每行的 JSON 字符串:
'{"Col1":"A","Col2":1}'
'{"Col1":"B","Col2":2}'
'{"Col1":"D","Col2":3}'
Run Code Online (Sandbox Code Playgroud) 我有要求我需要在Hive表中计算SparkSQL中重复行的数量。
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
from pyspark.sql.types import *
from pyspark.sql import Row
app_name="test"
conf = SparkConf().setAppName(app_name)
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
df = sqlContext.sql("select * from DV_BDFRAWZPH_NOGBD_R000_SG.employee")
Run Code Online (Sandbox Code Playgroud)
到目前为止,我已经对表名进行了硬编码,但实际上它是作为参数来的。话虽这么说,我们也不知道列数或它们的名称。在python pandas中,我们有类似df.duplicated.sum()之类的东西来计算重复记录的数量。我们这里有这样的东西吗?
+---+---+---+
| 1 | A | B |
+---+---+---+
| 1 | A | B |
+---+---+---+
| 2 | B | E |
+---+---+---+
| 2 | B | E |
+---+---+---+
| 3 | D | G |
+---+---+---+
| 4 | …Run Code Online (Sandbox Code Playgroud) 我无法理解究竟如何transform()和fit_transform()正在一起.
我之后调用fit_transform()我的训练数据集和transform()我的测试集.
但是,如果我打电话fit_transform()给测试集,我会得到不好的结果.
任何人都可以解释一下这是怎么发生的?
对于 PySpark DataFrame 中的每一行,我试图从满足特定条件的第一行获取值:
那就是如果我的数据框看起来像这样:
X | Flag
1 | 1
2 | 0
3 | 0
4 | 0
5 | 1
6 | 0
7 | 0
8 | 0
9 | 1
10 | 0
Run Code Online (Sandbox Code Playgroud)
我想要看起来像这样的输出:
X | Lag_X | Flag
1 | NULL | 1
2 | 1 | 0
3 | 1 | 0
4 | 1 | 0
5 | 1 | 1
6 | 5 | 0
7 | 5 | 0
8 …Run Code Online (Sandbox Code Playgroud) 我有一个DF,我有bookingDt和arrivalDt列.我需要找到这两个日期之间的所有日期.
示例代码:
df = spark.sparkContext.parallelize(
[Row(vyge_id=1000, bookingDt='2018-01-01', arrivalDt='2018-01-05')]).toDF()
diffDaysDF = df.withColumn("diffDays", datediff('arrivalDt', 'bookingDt'))
diffDaysDF.show()
Run Code Online (Sandbox Code Playgroud)
代码输出:
+----------+----------+-------+--------+
| arrivalDt| bookingDt|vyge_id|diffDays|
+----------+----------+-------+--------+
|2018-01-05|2018-01-01| 1000| 4|
+----------+----------+-------+--------+
Run Code Online (Sandbox Code Playgroud)
我尝试的是找到两个日期之间的天数,并使用timedelta函数计算所有日期explode.
dateList = [str(bookingDt + timedelta(i)) for i in range(diffDays)]
Run Code Online (Sandbox Code Playgroud)
预期产量:
基本上,我需要建立一个DF与对之间的每个日的记录bookingDt和arrivalDt,包容性.
+----------+----------+-------+----------+
| arrivalDt| bookingDt|vyge_id|txnDt |
+----------+----------+-------+----------+
|2018-01-05|2018-01-01| 1000|2018-01-01|
+----------+----------+-------+----------+
|2018-01-05|2018-01-01| 1000|2018-01-02|
+----------+----------+-------+----------+
|2018-01-05|2018-01-01| 1000|2018-01-03|
+----------+----------+-------+----------+
|2018-01-05|2018-01-01| 1000|2018-01-04|
+----------+----------+-------+----------+
|2018-01-05|2018-01-01| 1000|2018-01-05|
+----------+----------+-------+----------+
Run Code Online (Sandbox Code Playgroud) 在pyspark中,我有一个可变长度的double数组,我希望找到其均值。但是,平均值函数需要单个数字类型。
有没有一种方法可以找到一个数组的平均值而不分解该数组?我有几个不同的数组,我希望能够执行以下操作:
df.select(col("Segment.Points.trajectory_points.longitude"))
Run Code Online (Sandbox Code Playgroud)
DataFrame [经度:数组]
df.select(avg(col("Segment.Points.trajectory_points.longitude"))).show()
Run Code Online (Sandbox Code Playgroud)
Run Code Online (Sandbox Code Playgroud)org.apache.spark.sql.AnalysisException: cannot resolve 'avg(Segment.Points.trajectory_points.longitude)' due to data type mismatch: function average requires numeric types, not ArrayType(DoubleType,true);;
如果我有3个具有以下数组的唯一记录,我希望将这些值的平均值作为输出。这将是3个平均经度值。
输入:
[Row(longitude=[-80.9, -82.9]),
Row(longitude=[-82.92, -82.93, -82.94, -82.96, -82.92, -82.92]),
Row(longitude=[-82.93, -82.93])]
Run Code Online (Sandbox Code Playgroud)
输出:
-81.9,
-82.931,
-82.93
Run Code Online (Sandbox Code Playgroud)
我正在使用Spark版本2.1.3。
爆炸解决方案:
因此,我已经通过爆炸实现了这一目标,但我希望避免这一步。这就是我所做的
from pyspark.sql.functions import col
import pyspark.sql.functions as F
longitude_exp = df.select(
col("ID"),
F.posexplode("Segment.Points.trajectory_points.longitude").alias("pos", "longitude")
)
longitude_reduced = long_exp.groupBy("ID").agg(avg("longitude"))
Run Code Online (Sandbox Code Playgroud)
这成功地取了意思。但是,由于我将在几列中执行此操作,因此必须将同一DF爆炸几次。我将继续努力,以找到一种更清洁的方式来完成此任务。
这是我做的练习。我当然可以通过使用sorted()或Python标准库中的其他方式对列表进行排序,但是在这种情况下我不能。我认为我只应该使用reduce()。
from functools import reduce
arr = [17, 2, 3, 6, 1, 3, 1, 9, 5, 3]
sorted_arr = reduce(lambda a,b : (b,a) if a > b else (a,b), arr)
Run Code Online (Sandbox Code Playgroud)
我得到的错误:
TypeError: '>' not supported between instances of 'tuple' and 'int'
Run Code Online (Sandbox Code Playgroud)
这是可以预期的,因为我的reduce函数将一个元组插入到int数组中,而不是2个单独的整数。然后将元组与一个整数进行比较...
有没有一种方法可以将2个数字插入列表,然后仅对列表中的第二个数字运行该函数?还是一种使用reduce()交换数字的方法?
文档对reduce函数几乎没有说什么,所以我现在没有想法。 https://docs.python.org/3/library/functools.html?highlight=reduce#functools.reduce
代码是:
from pyspark.sql import functions as F
df = df.select(F.trim("MyColumn"))
Run Code Online (Sandbox Code Playgroud)
错误是:
Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.trim. Trace:
py4j.Py4JException: Method trim([class java.lang.String]) does not exist
Run Code Online (Sandbox Code Playgroud)
trimPySpark 2.x 中已弃用吗?我不明白为什么它不起作用,而同一命名空间中的其他一些函数却工作得很好
pyspark ×8
python ×5
apache-spark ×3
pyspark-sql ×2
json ×1
python-3.x ×1
scikit-learn ×1
sorting ×1