Dat*_*ted 1 dataframe python-2.7 apache-spark apache-spark-sql pyspark
我正在寻找一种方法来按小时聚合我的数据。我想首先在我的 evtTime 中只保留几个小时。我的 DataFrame 看起来像这样:
Row(access=u'WRITE',
agentHost=u'xxxxxx50.haas.xxxxxx',
cliIP=u'192.000.00.000',
enforcer=u'ranger-acl',
event_count=1,
event_dur_ms=0,
evtTime=u'2017-10-01 23:03:51.337',
id=u'a43d824c-1e53-439b-b374-96b76bacf714',
logType=u'RangerAudit',
policy=699,
reason=u'/project-h/xxxx/xxxx/warehouse/rocq.db/f_crcm_res_temps_retrait',
repoType=1,
reqUser=u'rocqphadm',
resType=u'path',
resource=u'/project-h/xxxx/xxxx/warehouse/rocq.db/f_crcm_res_temps_retrait',
result=1,
seq_num=342976577)
Run Code Online (Sandbox Code Playgroud)
我的目标随后是按 reqUser 分组并计算 event_count 的总和。我试过这个:
func = udf (lambda x: datetime.datetime.strptime(x, '%Y-%m-%d %H:%M:%S.%f'), DateType())
df1 = df.withColumn('DATE', func(col('evtTime')))
metrics_DataFrame = (df1
.groupBy(hour('DATE'), 'reqUser')
.agg({'event_count': 'sum'})
)
Run Code Online (Sandbox Code Playgroud)
这是结果:
[Row(hour(DATE)=0, reqUser=u'A383914', sum(event_count)=12114),
Row(hour(DATE)=0, reqUser=u'xxxxadm', sum(event_count)=211631),
Row(hour(DATE)=0, reqUser=u'splunk-system-user', sum(event_count)=48),
Row(hour(DATE)=0, reqUser=u'adm', sum(event_count)=7608),
Row(hour(DATE)=0, reqUser=u'X165473', sum(event_count)=2)]
Run Code Online (Sandbox Code Playgroud)
我的目标是得到这样的东西:
[Row(hour(DATE)=2017-10-01 23:00:00, reqUser=u'A383914', sum(event_count)=12114),
Row(hour(DATE)=2017-10-01 22:00:00, reqUser=u'xxxxadm', sum(event_count)=211631),
Row(hour(DATE)=2017-10-01 08:00:00, reqUser=u'splunk-system-user', sum(event_count)=48),
Row(hour(DATE)=2017-10-01 03:00:00, reqUser=u'adm', sum(event_count)=7608),
Row(hour(DATE)=2017-10-01 11:00:00, reqUser=u'X165473', sum(event_count)=2)]
Run Code Online (Sandbox Code Playgroud)
有多种可能的解决方案,最简单的一种是仅使用所需的部分作为字符串:
from pyspark.sql.functions import substring, to_timestamp
df = spark.createDataFrame(["2017-10-01 23:03:51.337"], "string").toDF("evtTime")
df.withColumn("hour", substring("evtTime", 0, 13)).show()
# +--------------------+-------------+
# | evtTime| hour|
# +--------------------+-------------+
# |2017-10-01 23:03:...|2017-10-01 23|
# +--------------------+-------------+
Run Code Online (Sandbox Code Playgroud)
或作为时间戳:
df.withColumn("hour", to_timestamp(substring("evtTime", 0, 13), "yyyy-MM-dd HH")).show()
# +--------------------+-------------------+
# | evtTime| hour|
# +--------------------+-------------------+
# |2017-10-01 23:03:...|2017-10-01 23:00:00|
# +--------------------+-------------------+
Run Code Online (Sandbox Code Playgroud)
你也可以date_format:
from pyspark.sql.functions import date_format, col
df.withColumn("hour", date_format(col("evtTime").cast("timestamp"), "yyyy-MM-dd HH:00")).show()
# +--------------------+----------------+
# | evtTime| hour|
# +--------------------+----------------+
# |2017-10-01 23:03:...|2017-10-01 23:00|
# +--------------------+----------------+
Run Code Online (Sandbox Code Playgroud)
或date_trunc:
from pyspark.sql.functions import date_trunc
df.withColumn("hour", date_trunc("hour", col("evtTime").cast("timestamp"))).show()
# +--------------------+-------------------+
# | evtTime| hour|
# +--------------------+-------------------+
# |2017-10-01 23:03:...|2017-10-01 23:00:00|
# +--------------------+-------------------+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1728 次 |
| 最近记录: |