Nik*_*kSp 1 python apache-spark apache-spark-sql pyspark
我有以下火花数据框:
datalake_spark_dataframe_downsampled = pd.DataFrame(
{'id' : ['001', '001', '001', '001', '001', '002', '002', '002'],
'OuterSensorConnected':[0, 0, 0, 1, 0, 0, 0, 1],
'OuterHumidity':[31.784826, 32.784826, 33.784826, 43.784826, 23.784826, 54.784826, 31.784826, 31.784826],
'EnergyConsumption': [70, 70, 70, 70, 70, 70, 70, 70],
'DaysDeploymentDate': [10, 20, 21, 31, 41, 11, 19, 57],
'label': [0, 0, 1, 1, 1, 0, 0, 1]}
)
datalake_spark_dataframe_downsampled = spark.createDataFrame(datalake_spark_dataframe_downsampled )
# printSchema of the datalake_spark_dataframe_downsampled (spark df):
"root
|-- IMEI: string (nullable = true)
|-- OuterSensorConnected: integer (nullable = false)
|-- OuterHumidity: float (nullable = true)
|-- EnergyConsumption: float (nullable = true)
|-- DaysDeploymentDate: integer (nullable = true)
|-- label: integer (nullable = false)"
Run Code Online (Sandbox Code Playgroud)
如您所见,第一个 ID '001'有 5 行,第二个 ID '002'有 3 行。我想要的是过滤掉连接到它们的正标签('1')总共小于 2 的 id 的行。因此,由于第一个 id '001'正标签的数量等于 3(三行,总共有正标签 1),而对于第二个 id '002',只有 1 行带有正标签,我想要所有相关的行到要过滤掉的 id '002'。所以我的最终 df 看起来像:
datalake_spark_dataframe_downsampled_filtered = pd.DataFrame(
{'id' : ['001', '001', '001', '001', '001'],
'OuterSensorConnected':[0, 0, 0, 1],
'OuterHumidity':[31.784826, 32.784826, 33.784826, 43.784826, 23.784826],
'EnergyConsumption': [70, 70, 70, 70, 70],
'DaysDeploymentDate': [10, 20, 21, 31, 41],
'label': [0, 0, 1, 1, 1]}
)
datalake_spark_dataframe_downsampled_filtered = spark.createDataFrame(datalake_spark_dataframe_downsampled_filtered)
Run Code Online (Sandbox Code Playgroud)
这是如何通过 spark.sql() 查询实现的?,例如
datalake_spark_dataframe_downsampled_filtered.createOrReplaceTempView("df_filtered")
spark_dataset_filtered=spark.sql("""SELECT *, count(label) as counted_label FROM df_filtered GROUP BY id HAVING counted_label >=2""") #how to only count the positive values here?
Run Code Online (Sandbox Code Playgroud)
如何使用窗口:
datalake_spark_dataframe_downsampled.createOrReplaceTempView("df_filtered")
spark.sql("""select * from (select *, sum(label) over (partition by id) as Sum_l
from df_filtered) where Sum_l >= 2""").drop("Sum_l").show()
Run Code Online (Sandbox Code Playgroud)
+---+--------------------+-------------+-----------------+------------------+-----+
| id|OuterSensorConnected|OuterHumidity|EnergyConsumption|DaysDeploymentDate|label|
+---+--------------------+-------------+-----------------+------------------+-----+
|001| 0| 31.784826| 70| 10| 0|
|001| 0| 32.784826| 70| 20| 0|
|001| 0| 33.784826| 70| 21| 1|
|001| 1| 43.784826| 70| 31| 1|
|001| 0| 23.784826| 70| 41| 1|
+---+--------------------+-------------+-----------------+------------------+-----+
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
1005 次 |
最近记录: |