解析spark中的模式很少的mongo集合时出现问题

kno*_*ker 5 mongodb apache-spark apache-spark-sql

我正在使用 Spark 将数据从一个集合移动到另一个集群中的另一个集合。数据的模式不一致(我的意思是在具有不同数据类型且变化很小的单个集合中几乎没有模式)。当我尝试从 spark 读取数据时,采样无法获取数据的所有模式并抛出以下错误。(我有一个复杂的模式,我无法明确提及,而不是通过采样获取 spark。)

com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast ARRAY into a NullType (value: BsonArray{values=[{ "type" : "GUEST_FEE", "appliesPer" : "GUEST_PER_NIGHT", "description" : null, "minAmount" : 33, "maxAmount" : 33 }]})

我尝试将集合作为 RDD 读取并作为 RDD 写入,但问题仍然存在。

对此有任何帮助。!

谢谢。

vly*_*bin 5

所有这些com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast SOME_TYPE into a NullType都来自不正确的模式推断。对于 JSON 文件或 mongodb 等无模式数据源,Spark 会扫描一小部分数据以确定类型。如果某些特定字段有很多 NULL,您可能会不走运,类型将设置为NullType.

您可以做的一件事是增加扫描以进行模式推断的条目数。

另一个 - 首先获取推断的架构,修复它,然后使用固定架构重新加载数据帧:

def fix_spark_schema(schema):
  if schema.__class__ == pyspark.sql.types.StructType:
    return pyspark.sql.types.StructType([fix_spark_schema(f) for f in schema.fields])
  if schema.__class__ == pyspark.sql.types.StructField:
    return pyspark.sql.types.StructField(schema.name, fix_spark_schema(schema.dataType), schema.nullable)
  if schema.__class__ == pyspark.sql.types.NullType:
    return pyspark.sql.types.StringType()
  return schema

collection_schema = sqlContext.read \
    .format("com.mongodb.spark.sql") \
    .options(...) \
    .load() \
    .schema

collection = sqlContext.read \
    .format("com.mongodb.spark.sql") \
    .options(...) \
    .load(schema=fix_spark_schema(collection_schema))
Run Code Online (Sandbox Code Playgroud)

在我的情况下,所有有问题的字段都可以用 StringType 表示,如果需要,您可以使逻辑更复杂。


Bar*_*zny 0

据我了解您的问题: * Spark 错误地检测到您的架构并认为某些字段是必需的 ( nullable = false) - 在这种情况下,您仍然可以显式定义它并将其设置nullable为 true。如果您的模式正在发展,并且在过去的某个时间您添加或删除了一个字段但仍然保留列类型(例如,字符串将始终是字符串而不是结构或其他完全不同的类型)*或者您的模式完全不同,那么它会起作用不一致,即您的 String 字段有时会转换为 Struct 或其他完全不同的类型。RDD在这种情况下,除了使用抽象和使用非常宽松的类型(如AnyScala(ObjectJava 中))并使用isInstanceOf测试将所有字段规范化为 1 种通用格式之外,我没有看到其他解决方案

实际上,我还看到了另一种可能的解决方案,但前提是您知道哪些数据具有哪种架构。例如,如果您知道对于 2018-01-01 和 2018-02-01 之间的数据,您使用 schema#1,而对于其他 schema#2,您可以编写一个将 schema#1 转换为 schema#2 的管道。稍后,您可以简单地处理union两个数据集,并将转换应用于一致的结构化值。


编辑:

我刚刚尝试过您提供的类似代码,它在我的本地 MongoDB 实例上运行正常:

val sc = getSparkContext(Array("mongodb://localhost:27017/test.init_data")) 

// Load sample data
import com.mongodb.spark._

val docFees =
  """
    | {"fees": null}
    | {"fees": { "main" : [ { "type" : "misc", "appliesPer" : "trip", "description" : null, "minAmount" : 175, "maxAmount" : 175 } ]} }
  """.stripMargin.trim.stripMargin.split("[\\r\\n]+").toSeq
MongoSpark.save(sc.parallelize(docFees.map(Document.parse)))

val rdd = MongoSpark.load(sc)
rdd.saveToMongoDB(WriteConfig(Map("uri"->"mongodb://localhost:27017/test.new_coll_data", "replaceDocument"->"true")))
Run Code Online (Sandbox Code Playgroud)

当我在 MongoDB shell 中检查结果时,我得到:

> coll = db.init_data; 
test.init_data
> coll.find();
{ "_id" : ObjectId("5b33d415ea78632ff8452c60"), "fees" : { "main" : [ { "type" : "misc", "appliesPer" : "trip", "description" : null, "minAmount" : 175, "maxAmount" : 175 } ] } }
{ "_id" : ObjectId("5b33d415ea78632ff8452c61"), "fees" : null }
> coll = db.new_coll_data;
test.new_coll_data
> coll.find();
{ "_id" : ObjectId("5b33d415ea78632ff8452c60"), "fees" : { "main" : [ { "type" : "misc", "appliesPer" : "trip", "description" : null, "minAmount" : 175, "maxAmount" : 175 } ] } }
{ "_id" : ObjectId("5b33d415ea78632ff8452c61"), "fees" : null }
Run Code Online (Sandbox Code Playgroud)