syv*_*syv 8 apache-spark pyspark
我想应用和传递和作为参数的splitUtlisation每一行。,结果将返回多行数据,因此我想创建一个新的 DataFrame (Id, Day, Hour, Minute)utilisationDataFarmestartTimeendTimesplitUtlisation
def splitUtlisation(onDateTime, offDateTime):
yield onDateTime
rule = rrule.rrule(rrule.HOURLY, byminute = 0, bysecond = 0, dtstart=offDateTime)
for result in rule.between(onDateTime, offDateTime):
yield result
yield offDateTime
utilisationDataFarme = (
sc.parallelize([
(10001, "2017-02-12 12:01:40" , "2017-02-12 12:56:32"),
(10001, "2017-02-13 12:06:32" , "2017-02-15 16:06:32"),
(10001, "2017-02-16 21:45:56" , "2017-02-21 21:45:56"),
(10001, "2017-02-21 22:32:41" , "2017-02-25 00:52:50"),
]).toDF(["id", "startTime" , "endTime"])
.withColumn("startTime", col("startTime").cast("timestamp"))
.withColumn("endTime", col("endTime").cast("timestamp"))
Run Code Online (Sandbox Code Playgroud)
在核心 Python 中,我确实喜欢这样
dayList = ['SUN' , 'MON' , 'TUE' , 'WED' , 'THR' , 'FRI' , 'SAT']
for result in hours_aligned(datetime.datetime.now(), datetime.datetime.now() + timedelta(hours=68)):
print(dayList[datetime.datetime.weekday(result)], result.hour, 60 if result.minute == 0 else result.minute)
Run Code Online (Sandbox Code Playgroud)
结果
THR 21 60
THR 22 60
THR 23 60
FRI 0 60
FRI 1 60
FRI 2 60
FRI 3 60
Run Code Online (Sandbox Code Playgroud)
如何在 pySpark 中创建它?
我尝试创建新架构并应用
schema = StructType([StructField("Id", StringType(), False), StructField("Day", StringType(), False), StructField("Hour", StringType(), False) , StructField("Minute", StringType(), False)])
udf_splitUtlisation = udf(splitUtlisation, schema)
df = sqlContext.createDataFrame([],"id" , "Day" , "Hour" , "Minute")
Run Code Online (Sandbox Code Playgroud)
我仍然无法处理多行作为响应。
pan*_*sen 12
您可以使用pyspark的explode解包包含多个值成多行的单行,一旦你有你的UDF正确定义。
据我所知,您将无法将生成器与yieldudf一起使用。相反,你需要在一次作为一个数组(见返回所有值return_type),然后可以展开和扩大:
from pyspark.sql.functions import col, udf, explode
from pyspark.sql.types import ArrayType, StringType, MapType
import pandas as pd
# input data as given by OP
df = sc.parallelize([
(10001, "2017-02-12 12:01:40" , "2017-02-12 12:56:32"),
(10001, "2017-02-13 12:06:32" , "2017-02-15 16:06:32"),
(10001, "2017-02-16 21:45:56" , "2017-02-21 21:45:56"),
(10001, "2017-02-21 22:32:41" , "2017-02-25 00:52:50")])\
.toDF(["id", "startTime" , "endTime"])\
.withColumn("startTime", col("startTime").cast("timestamp"))\
.withColumn("endTime", col("endTime").cast("timestamp"))
return_type = ArrayType(MapType(StringType(), StringType()))
@udf(returnType=return_type)
def your_udf_func(start, end):
"""Insert your function to return whatever you like
as a list of dictionaries.
For example, I chose to return hourly values for
day, hour and minute.
"""
date_range = pd.date_range(start, end, freq="h")
df = pd.DataFrame({"day": date_range.strftime("%a"),
"hour": date_range.hour,
"minute": date_range.minute})
values = df.to_dict("index").values()
return list(values)
extracted = your_udf_func("startTime", "endTime")
exploded = explode(extracted).alias("exploded")
expanded = [col("exploded").getItem(k).alias(k) for k in ["hour", "day", "minute"]]
result = df.select("id", exploded).select("id", *expanded)
Run Code Online (Sandbox Code Playgroud)
结果是:
result.show(5)
+-----+----+---+------+
| id|hour|day|minute|
+-----+----+---+------+
|10001| 12|Sun| 1|
|10001| 12|Mon| 6|
|10001| 13|Mon| 6|
|10001| 14|Mon| 6|
|10001| 15|Mon| 6|
+-----+----+---+------+
only showing top 5 rows
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4824 次 |
| 最近记录: |