如何从 avro 数据中提取文档并将其添加到数据框中

use*_*871 6 python hive avro apache-spark pyspark

我正在尝试基于 HDFS 中的 avro 文件创建 hive/impala 表。进行转换的工具是 Spark。

我无法使用spark.read.format("avro")将数据加载到数据框中,因为这样部分doc(列的描述)将会丢失。我可以通过执行以下操作来查看文档:

 input = sc.textFile("/path/to/avrofile")
 avro_schema = input.first() # not sure what type it is 
Run Code Online (Sandbox Code Playgroud)

问题是,它是一个嵌套架构,我不确定如何遍历它以将其映射doc到数据帧中的列描述。我想要doc表的列描述。例如,输入模式如下所示:

"fields": [
    {
     "name":"productName",
     "type": [
       "null",
       "string"
      ],
     "doc": "Real name of the product"
     "default": null
    },
    {
     "name" : "currentSellers",
     "type": [
        "null",
        {
         "type": "record",
         "name": "sellers",
         "fields":[
             {
              "name": "location",
              "type":[
                 "null",
                  {
                   "type": "record"
                   "name": "sellerlocation",
                   "fields": [
                      {
                       "name":"locationName",
                       "type": [
                           "null",
                           "string"
                         ],
                       "doc": "Name of the location",
                       "default":null
                       },
                       {
                       "name":"locationArea",
                       "type": [
                           "null",
                           "string"
                         ],
                       "doc": "Area of the location",#The comment needs to be added to table comments
                       "default":null
                         .... #These are nested fields 
Run Code Online (Sandbox Code Playgroud)

例如,在最终表中,一个字段名称为currentSellers_locationName,列描述为“位置名称”。有人可以帮助阐明如何解析架构并将文档添加到描述中吗?并解释一下下面的内容在字段之外是什么?非常感谢。如果我能更好地解释,请告诉我。

         "name" : "currentSellers",
     "type": [
        "null",
        {
         "type": "record",
         "name": "sellers",
         "fields":[
             {
  
Run Code Online (Sandbox Code Playgroud)

bzu*_*bzu 1

如果您想自己解析架构并手动将元数据添加到 Spark,我建议flatdict打包:

from flatdict import FlatterDict

flat_schema = FlatterDict(schema)  # schema as python dict

names = {k.replace(':name', ''): flat_schema[k] for k in flat_schema if k.endswith(':name')}
docs = {k.replace(':doc', ''): flat_schema[k] for k in flat_schema if k.endswith(':doc')}

# keep only keys which are present in both names and docs
keys_with_doc = set(names.keys()) & set(docs.keys())

full_name = lambda key: '_'.join(
    names[k] for k in sorted(names, key=len) if key.startswith(k) and k.split(':')[-2] == 'fields'
)
name_doc_map = {full_name(k): docs[k] for k in keys_with_doc}
Run Code Online (Sandbox Code Playgroud)

一组典型的键flat_schema.keys()是:

'fields:1:type:1:fields:0:type:1:fields:0:type:1',
'fields:1:type:1:fields:0:type:1:fields:0:name',
'fields:1:type:1:fields:0:type:1:fields:0:default',
'fields:1:type:1:fields:0:type:1:fields:0:doc',
Run Code Online (Sandbox Code Playgroud)

现在可以操纵这些字符串:

  1. 仅提取以“name”和“doc”结尾的内容(忽略“default”等)
  2. 获取设置交集以删除不同时存在两个字段的交集
  3. 从较高层次结构中获取所有字段名称的列表:fields:1:type:1:fields是父级之一fields:1:type:1:fields:0:type:1:fields(条件是它们具有相同的开头并且以“fields”结尾)