Dom*_*nik 6 python pandas apache-spark pyspark
我有以下测试数据:
import pandas as pd
import datetime
data = {'date': ['2014-01-01', '2014-01-02', '2014-01-03', '2014-01-04', '2014-01-05', '2014-01-06'],
'customerid': [2, 2, 2, 3, 4, 3], 'names': ['Andrew', 'Pete', 'Sean', 'Steve', 'Ray', 'Stef'], 'PaymentType': ['OI', 'CC', 'CC', 'OI', 'OI', 'OI']}
data = pd.DataFrame(data)
data['date'] = pd.to_datetime(data['date'])
Run Code Online (Sandbox Code Playgroud)
以下代码为我提供了两天时间范围内具有匹配 customerid 的名称列表:
import pandas as pd
import datetime
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master('local[*]') \
.config("spark.driver.memory", "500g") \
.appName('my-pandasToSparkDF-app') \
.config("spark.ui.showConsoleProgress", "false")\
.getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set('spark.sql.execution.arrow.maxRecordsPerBatch', 50000)
spark.sparkContext.setLogLevel("OFF")
data = {'date': ['2014-01-01', '2014-01-02', '2014-01-03', '2014-01-04', '2014-01-05', '2014-01-06'],
'customerid': [2, 2, 2, 3, 4, 3], 'names': ['Andrew', 'Pete', 'Sean', 'Steve', 'Ray', 'Stef'], 'PaymentType': ['OI', 'CC', 'CC', 'OI', 'OI', 'OI']}
data = pd.DataFrame(data)
data['date'] = pd.to_datetime(data['date'])
spark_data= spark.createDataFrame(data)
win = Window().partitionBy('customerid').orderBy((F.col('date')).cast("long")).rangeBetween(
-(2*86400), Window.currentRow)
result_frame = spark_data.withColumn("names_array", F.collect_list('names').over(win)).sort(F.col("date").asc())
pd_result_frame = result_frame.toPandas()
Run Code Online (Sandbox Code Playgroud)
数据:
<pre>
date |customerid|PaymentType|names
2014-01-01|2 |OI |Andrew
2014-01-02|2 |CC |Pete
2014-01-03|2 |CC |Sean
2014-01-04|3 |OI |Steve
2014-01-05|4 |OI |Ray
2014-01-06|3 |OI |Stef
</pre>
Run Code Online (Sandbox Code Playgroud)
结果表:
<pre>
date |customerid|PaymentType|names_array|
2014-01-01|2 |OI |['Andrew']
2014-01-02|2 |CC |['Andrew', 'Pete']
2014-01-03|2 |CC |['Andrew', 'Pete', 'Sean']
2014-01-04|3 |OI |['Steve']
2014-01-05|4 |OI |['Ray']
2014-01-06|3 |OI |['Steve', 'Stef']
</pre>
Run Code Online (Sandbox Code Playgroud)
现在我想介绍一下F.collect_list的一个条件。仅应将那些 PaymentType == 'OI' 的名称收集到列表中。
最后,该表应如下所示:
<pre>
date |customerid|PaymentType|names_array|
2014-01-01|2 |OI |['Andrew']
2014-01-02|2 |CC |['Andrew']
2014-01-03|2 |CC |['Andrew']
2014-01-04|3 |OI |['Steve']
2014-01-05|4 |OI |['Ray']
2014-01-06|3 |OI |['Steve', 'Stef']
</pre>
Run Code Online (Sandbox Code Playgroud)
谢谢你!
mur*_*ash 11
您可以when/otherwise在您的条款中添加一个条款,collect_list仅在以下情况下收集PaymentType is 'OI',otherwise collect None.
spark_data.withColumn("names_array",\
F.collect_list(F.when(F.col("PaymentType")=='OI',F.col("names"))\
.otherwise(F.lit(None))).over(win)).sort(F.col("date").asc()).show()
#+-------------------+----------+------+-----------+-------------+
#| date|customerid| names|PaymentType| names_array|
#+-------------------+----------+------+-----------+-------------+
#|2014-01-01 00:00:00| 2|Andrew| OI| [Andrew]|
#|2014-01-02 00:00:00| 2| Pete| CC| [Andrew]|
#|2014-01-03 00:00:00| 2| Sean| CC| [Andrew]|
#|2014-01-04 00:00:00| 3| Steve| OI| [Steve]|
#|2014-01-05 00:00:00| 4| Ray| OI| [Ray]|
#|2014-01-06 00:00:00| 3| Stef| OI|[Steve, Stef]|
#+-------------------+----------+------+-----------+-------------+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
9044 次 |
| 最近记录: |