AWS Glue Grok 模式,以毫秒为单位的时间戳

ylc*_*nky 5 amazon-web-services logstash logstash-grok aws-glue

我需要在 AWS Glue Classifie 中定义一个 grok 模式来捕获文件列datestamp上的毫秒数datetimestring由 AWS Glue Crawler转换。我使用了DATESTAMP_EVENTLOGAWS Glue 中的预定义并尝试将毫秒添加到模式中。

分类: datetime

格罗克模式: %{DATESTAMP_EVENTLOG:string}

自定义模式:

MILLISECONDS (\d){3,7}
DATESTAMP_EVENTLOG %{YEAR}-%{MONTHNUM}-%{MONTHDAY}T%{HOUR}:%{MINUTE}:%{SECOND}.%{MILLISECONDS}
Run Code Online (Sandbox Code Playgroud)

我仍然无法成功实现模式。有任何想法吗?格罗克的快照

lil*_*ine 1

我也无法弄清楚如何使用分类器来做到这一点,但我最终通过向映射脚本(python)编写自定义转换来将时间戳从字符串转换为日期时间。

下面是我的工作代码。col2 是一个将爬虫指定为字符串的列,这里我将其转换为 python 日期时间。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from datetime import datetime

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "s3_events", table_name = "events", transformation_ctx = "datasource0")

def convert_dates(rec):
    rec["col2"] = datetime.strptime(rec["col2"], "%d.%m.%Y")
    return rec
custommapping1 = Map.apply(frame = datasource0, f = convert_dates, transformation_ctx = "custommapping1")

applymapping1 = ApplyMapping.apply(frame = custommapping1, mappings = [("col0", "string", "col0", "string"), ("col1", "string", "col1", "string"), ("col2", "date", "col2", "date")], transformation_ctx = "applymapping1")

selectfields2 = SelectFields.apply(frame = applymapping1, paths = ["col2", "col0", "col1"], transformation_ctx = "selectfields2")

resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = "MATCH_CATALOG", database = "mydb", table_name = "mytable", transformation_ctx = "resolvechoice3")

resolvechoice4 = ResolveChoice.apply(frame = resolvechoice3, choice = "make_cols", transformation_ctx = "resolvechoice4")

datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice4, database = "mydb", table_name = "mytable", transformation_ctx = "datasink5")
job.commit()
Run Code Online (Sandbox Code Playgroud)