use*_*332 3 elasticsearch apache-spark pyspark
我正在使用 Spark 2.3 (Pyspark) 从 Elasticsearch 6.6 索引读取数据。
Spark 作业正在尝试创建一个df,但由于解析问题而失败:
df = spark.read.format("org.elasticsearch.spark.sql").option("es.resource.read", index_name).option("es.nodes", hosts).load()
org.elasticsearch.hadoop.rest.EsHadoopParsingException: Cannot parse value [2019/05/06 19:31:21] for field [GenerateTime]
我相信这部分是由于源日期格式不是公认的ISO 8601格式。
此外,在阅读时间/日期映射文档时,我了解这可以通过创建映射来解决,但这只会影响新索引,而不会更改历史索引的映射。
有没有办法解决这个问题,以便我可以通过 Spark从历史索引中成功读取(例如,在可能需要的任何映射更改之前)?我也尝试过,.option("es.mapping.date.rich", False)但没有任何运气。
我根据您在ES 6.4/Spark 2.1版本中的数据创建了一个示例文档,并使用了以下代码,以便在 spark 中读取GenerateTime字段text而不是日期类型。
PUT somedateindex
{
"mappings": {
"mydocs":{
"properties": {
"GenerateTime": {
"type": "date",
"format": "yyyy/MM/dd HH:mm:ss"
}
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
请注意,该字段是dateES 中的类型。
请注意,我使用了配置选项("es.mapping.date.rich" , false )
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.master", "local")
.getOrCreate()
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
val df = spark.read.format("org.elasticsearch.spark.sql")
.option("es.resource.read","somedateindex")
.option("es.nodes", "some_host_name")
.option("es.mapping.date.rich", false)
.option("es.port","9200")
.load()
df.show()
df.printSchema()
Run Code Online (Sandbox Code Playgroud)
19/05/13 03:10:53 INFO DAGScheduler: Job 1 finished: show at Elasticsearch.scala:134, took 9.424294 s
19/05/13 03:10:53 INFO CodeGenerator: Code generated in 21.256205 ms
+-------------------+
| GenerateTime|
+-------------------+
|2019/05/06 19:31:21|
+-------------------+
root
|-- GenerateTime: string (nullable = true)
19/05/13 03:10:53 INFO SparkUI: Stopped Spark web UI at....
Run Code Online (Sandbox Code Playgroud)
请注意,printSchema该表有一个GenerateTime类型为 的列string。
如果您不想继续更改映射,上述内容应该对您有所帮助。
我建议使用日期格式而不是文本格式的日期字段,并且也是 ISO-8601 支持的格式,这样当类型推断开始时,您最终会在 Spark 中获得正确类型的数据,您可以只关注业务逻辑,很多时候,正确的解决方案在于我们如何存储数据而不是我们如何处理它。
但是,如果由于某种原因您无法从源(即elasticsearch)更改映射,您可以进一步添加以下代码以使用以下代码将字符串值转换为时间戳:
import org.apache.spark.sql.functions._
//String into Timestamp Transformation
val df2_timestamp = df.withColumn("GenerateTime_timestamp", from_unixtime(unix_timestamp($"GenerateTime", "yyyy/MM/dd HH:mm:ss")).cast(TimestampType))
df2_timestamp.show(false)
df2_timestamp.printSchema();
Run Code Online (Sandbox Code Playgroud)
如果你运行上面的代码,你会看到如下输出:
19/05/14 11:33:10 INFO CodeGenerator: Code generated in 23.742359 ms
+-------------------+----------------------+
|GenerateTime |GenerateTime_timestamp|
+-------------------+----------------------+
|2019/05/06 19:31:21|2019-05-06 19:31:21.0 |
+-------------------+----------------------+
root
|-- GenerateTime: string (nullable = true)
|-- GenerateTime_timestamp: timestamp (nullable = true)
19/05/14 11:33:10 INFO SparkContext: Invoking stop() from shutdown hook
Run Code Online (Sandbox Code Playgroud)
另请注意,我的解决方案是在 Scala 中。如果有帮助,请告诉我!
| 归档时间: |
|
| 查看次数: |
1160 次 |
| 最近记录: |