dla*_*lin 5 parsing json apache-spark
from_json
有人在 Spark 2+ 中解析过毫秒时间戳吗?是怎么做到的?
因此Spark 将TimestampType
v2 中的 epoch 数值解析为以秒为单位,而不是以毫秒为单位。
我的输入是一个配置单元表,其中的一列中有一个 json 格式的字符串,我尝试像这样解析:
\n\nval spark = SparkSession\n .builder\n .appName("Problematic Timestamps")\n .enableHiveSupport()\n .getOrCreate()\nimport spark.implicits._\nval schema = StructType(\n StructField("categoryId", LongType) ::\n StructField("cleared", BooleanType) ::\n StructField("dataVersion", LongType) ::\n StructField("details", DataTypes.createArrayType(StringType)) ::\n \xe2\x80\xa6\n StructField("timestamp", TimestampType) ::\n StructField("version", StringType) :: Nil\n)\nval item_parsed =\n spark.sql("select * FROM source.jsonStrInOrc")\n .select(\'itemid, \'locale,\n from_json(\'internalitem, schema)\n as \'internalitem,\n \'version, \'createdat, \'modifiedat)\nval item_flattened = item_parsed\n .select(\'itemid, \'locale,\n $"internalitem.*",\n \'version as\'outer_version, \'createdat, \'modifiedat)\n
Run Code Online (Sandbox Code Playgroud)\n\n这可以解析具有包含以下内容的列的行:
\n\n\n\n\n{“时间戳”:1494790299549,“已清除”:false,“版本”:“V1”,“数据版本”:2,“categoryId”:2641,“详细信息”:[],\ xe2 \ x80 \ xa6}
\n
这给了我timestamp
一些字段,比如我宁愿读为的49338-01-08 00:39:09.0
值:1494790299549
2017-05-14 19:31:39.549
现在我可以将时间戳的模式设置为 long,然后将该值除以 1000 并转换为时间戳,但随后我就没有2017-05-14 19:31:39.000
了2017-05-14 19:31:39.549
。我无法弄清楚我该如何:
from_json
解析毫秒时间戳(也许通过以某种方式对 TimestampType 进行子类化以在模式中使用)LongType
在模式中使用 a并将其转换为保留毫秒的时间戳。我发现尝试在选择中进行除法然后进行转换对我来说看起来并不干净,尽管这是一个完全有效的方法。我选择了一个使用 a 的 UDF java.sql.timestamp
,它实际上以纪元毫秒为单位指定。
import java.sql.Timestamp\n\nimport org.apache.spark.sql.SparkSession\nimport org.apache.spark.sql.functions.{explode, from_json, udf}\nimport org.apache.spark.sql.types.\n{BooleanType, DataTypes, IntegerType, LongType,\nStringType, StructField, StructType, TimestampType}\n\nval tsmillis = udf { t: Long => new Timestamp (t) }\n\nval spark = SparkSession\n .builder\n .appName("Problematic Timestamps")\n .enableHiveSupport()\n .getOrCreate()\nimport spark.implicits._\nval schema = StructType(\n StructField("categoryId", LongType) ::\n StructField("cleared", BooleanType) ::\n StructField("dataVersion", LongType) ::\n StructField("details", DataTypes.createArrayType(StringType)) ::\n \xe2\x80\xa6\n StructField("timestamp", LongType) ::\n StructField("version", StringType) :: Nil\n)\nval item_parsed =\n spark.sql("select * FROM source.jsonStrInOrc")\n .select(\'itemid, \'locale,\n from_json(\'internalitem, schema)\n as \'internalitem,\n \'version, \'createdat, \'modifiedat)\nval item_flattened = item_parsed\n .select(\'itemid, \'locale,\n $"internalitem.categoryId", $"internalitem.cleared",\n $"internalitem.dataVersion", $"internalitem.details",\n tsmillis($"internalitem.timestamp"),\n $"internalitem.version",\n \'version as\'outer_version, \'createdat, \'modifiedat)\n
Run Code Online (Sandbox Code Playgroud)\n\n看看选择中的情况如何。\n我认为值得进行性能测试,看看使用withcolumn
除法和转换是否比udf
.
现在我可以将时间戳的模式设置为 long,然后将该值除以 1000
实际上这正是您所需要的,只需保持类型正确即可。假设您只有一个Long
timestamp
字段:
val df = spark.range(0, 1).select(lit(1494790299549L).alias("timestamp"))
// df: org.apache.spark.sql.DataFrame = [timestamp: bigint]
Run Code Online (Sandbox Code Playgroud)
如果除以 1000:
val inSeconds = df.withColumn("timestamp_seconds", $"timestamp" / 1000)
// org.apache.spark.sql.DataFrame = [timestamp: bigint, timestamp_seconds: double]
Run Code Online (Sandbox Code Playgroud)
您将获得以秒为单位的双精度时间戳(请注意,这是 SQL,而不是 Scala 行为)。
剩下的就是cast
(Spark < 3.1)
inSeconds.select($"timestamp_seconds".cast("timestamp")).show(false)
// +-----------------------+
// |timestamp_seconds |
// +-----------------------+
// |2017-05-14 21:31:39.549|
// +-----------------------+
Run Code Online (Sandbox Code Playgroud)
或 ( Spark >= 3.1 ) timestamp_seconds
(或直接timestamp_millis
)
import org.apache.spark.sql.functions.{expr, timestamp_seconds}
inSeconds.select(timestamp_seconds($"timestamp_seconds")).show(false)
// +------------------------------------+
// |timestamp_seconds(timestamp_seconds)|
// +------------------------------------+
// |2017-05-14 21:31:39.549 |
// +------------------------------------+
df.select(expr("timestamp_millis(timestamp)")).show(false)
// +---------------------------+
// |timestamp_millis(timestamp)|
// +---------------------------+
// |2017-05-14 21:31:39.549 |
// +---------------------------+
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
3106 次 |
最近记录: |