Pyspark:在 window() 上使用有条件的collect_list

Dom*_*nik 6 python pandas apache-spark pyspark

我有以下测试数据:

import pandas as pd
import datetime

data = {'date': ['2014-01-01', '2014-01-02', '2014-01-03', '2014-01-04', '2014-01-05', '2014-01-06'],
     'customerid': [2, 2, 2, 3, 4, 3], 'names': ['Andrew', 'Pete', 'Sean', 'Steve', 'Ray', 'Stef'], 'PaymentType': ['OI', 'CC', 'CC', 'OI', 'OI', 'OI']}
data = pd.DataFrame(data)
data['date'] = pd.to_datetime(data['date'])
Run Code Online (Sandbox Code Playgroud)

以下代码为我提供了两天时间范围内具有匹配 customerid 的名称列表:

import pandas as pd
import datetime

from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql import SparkSession


spark = SparkSession.builder \
    .master('local[*]') \
    .config("spark.driver.memory", "500g") \
    .appName('my-pandasToSparkDF-app') \
    .config("spark.ui.showConsoleProgress", "false")\
    .getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set('spark.sql.execution.arrow.maxRecordsPerBatch', 50000)
spark.sparkContext.setLogLevel("OFF")


data = {'date': ['2014-01-01', '2014-01-02', '2014-01-03', '2014-01-04', '2014-01-05', '2014-01-06'],
     'customerid': [2, 2, 2, 3, 4, 3], 'names': ['Andrew', 'Pete', 'Sean', 'Steve', 'Ray', 'Stef'], 'PaymentType': ['OI', 'CC', 'CC', 'OI', 'OI', 'OI']}
data = pd.DataFrame(data)
data['date'] = pd.to_datetime(data['date'])
spark_data= spark.createDataFrame(data)

win = Window().partitionBy('customerid').orderBy((F.col('date')).cast("long")).rangeBetween(
        -(2*86400), Window.currentRow)

result_frame = spark_data.withColumn("names_array", F.collect_list('names').over(win)).sort(F.col("date").asc())

pd_result_frame = result_frame.toPandas()
Run Code Online (Sandbox Code Playgroud)

数据:

<pre>
date      |customerid|PaymentType|names 
2014-01-01|2         |OI         |Andrew
2014-01-02|2         |CC         |Pete 
2014-01-03|2         |CC         |Sean
2014-01-04|3         |OI         |Steve
2014-01-05|4         |OI         |Ray
2014-01-06|3         |OI         |Stef
</pre>
Run Code Online (Sandbox Code Playgroud)

结果表:

<pre>
date      |customerid|PaymentType|names_array|
2014-01-01|2         |OI         |['Andrew']
2014-01-02|2         |CC         |['Andrew', 'Pete'] 
2014-01-03|2         |CC         |['Andrew', 'Pete', 'Sean']
2014-01-04|3         |OI         |['Steve']
2014-01-05|4         |OI         |['Ray']
2014-01-06|3         |OI         |['Steve', 'Stef']
</pre>
Run Code Online (Sandbox Code Playgroud)

现在我想介绍一下F.collect_list的一个条件。仅应将那些 PaymentType == 'OI' 的名称收集到列表中。

最后,该表应如下所示:

<pre>
date      |customerid|PaymentType|names_array|
2014-01-01|2         |OI         |['Andrew']
2014-01-02|2         |CC         |['Andrew'] 
2014-01-03|2         |CC         |['Andrew']
2014-01-04|3         |OI         |['Steve']
2014-01-05|4         |OI         |['Ray']
2014-01-06|3         |OI         |['Steve', 'Stef']
</pre>
Run Code Online (Sandbox Code Playgroud)

谢谢你!

mur*_*ash 11

您可以when/otherwise在您的条款中添加一个条款,collect_list仅在以下情况下收集PaymentType is 'OI'otherwise collect None.

spark_data.withColumn("names_array",\
                      F.collect_list(F.when(F.col("PaymentType")=='OI',F.col("names"))\
                      .otherwise(F.lit(None))).over(win)).sort(F.col("date").asc()).show()

#+-------------------+----------+------+-----------+-------------+
#|               date|customerid| names|PaymentType|  names_array|
#+-------------------+----------+------+-----------+-------------+
#|2014-01-01 00:00:00|         2|Andrew|         OI|     [Andrew]|
#|2014-01-02 00:00:00|         2|  Pete|         CC|     [Andrew]|
#|2014-01-03 00:00:00|         2|  Sean|         CC|     [Andrew]|
#|2014-01-04 00:00:00|         3| Steve|         OI|      [Steve]|
#|2014-01-05 00:00:00|         4|   Ray|         OI|        [Ray]|
#|2014-01-06 00:00:00|         3|  Stef|         OI|[Steve, Stef]|
#+-------------------+----------+------+-----------+-------------+
Run Code Online (Sandbox Code Playgroud)