在SparkSQL中使用Avro模式和Parquet格式进行读取/写入

Jas*_*ans 5 avro apache-spark parquet apache-spark-sql

我正在尝试从SparkSQL编写和读取Parquet文件。出于架构演变的原因,我想在读写中使用Avro架构。

我的理解是,这可以在Spark外部(或在Spark内部手动使用),例如使用AvroParquetWriter和Avro的通用API。但是,我想使用SparkSQL的write()和read()方法(与DataFrameWriter和DataFrameReader一起使用),并且与SparkSQL集成良好(我将编写和读取Dataset的方法)。

我一生都无法弄清楚该怎么做,并且想知道这是否可能。SparkSQL拼花格式似乎唯一支持的选项是“压缩”和“ mergeSchema”,即没有用于指定备用模式格式或备用模式的选项。换句话说,似乎没有办法使用SparkSQL API使用Avro模式读取/写入Parquet文件。但是也许我只是想念一些东西?

为了澄清,我也理解这基本上只是在写时将Avro模式添加到Parquet元数据中,而在读时将添加一个翻译层(Parquet格式-> Avro模式-> SparkSQL内部格式),但将特别允许我为缺少的列添加默认值(Avro模式支持但Parquet模式不支持)。

另外,我不是在寻找一种将Avro转换为Parquet或Parquet到Avro的方法(而是一种将它们一起使用的方法),并且我不是在寻找一种在SparkSQL中读取/写入普通Avro的方法。使用databricks / spark-avro)。

Sun*_*par 0

我正在做类似的事情。我使用 avro 模式写入 parquet 文件,但是不要将其读取为 avro。但同样的技术也应该适用于阅读。我不确定这是否是最好的方法,但无论如何:我有 AvroData.avsc,它具有 avro 架构。

KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder,Tuple2[String, Array[Byte]]](ssc, kafkaProps, fromOffsets, messageHandler)


kafkaArr.foreachRDD  { (rdd,time) 
       => { val schema =  SchemaConverters.toSqlType(AvroData.getClassSchema).dataType.asInstanceOf[StructType] val ardd = rdd.mapPartitions{itr =>
              itr.map { r =>
try {
                    val cr = avroToListWithAudit(r._2, offsetSaved, loadDate, timeNow.toString)
                    Row.fromSeq(cr.toArray)
    } catch{
      case e:Exception => LogHandler.log.error("Exception while converting to Avro" + e.printStackTrace())
      System.exit(-1)
      Row(0)  //This is just to allow compiler to accept. On exception, the application will exit before this point
} 
} 
}


  public static List avroToListWithAudit(byte[] kfkBytes, String kfkOffset, String loaddate, String loadtime ) throws IOException {
        AvroData av = getAvroData(kfkBytes);
        av.setLoaddate(loaddate);
        av.setLoadtime(loadtime);
        av.setKafkaOffset(kfkOffset);
        return avroToList(av);
    }



 public static List avroToList(AvroData a) throws UnsupportedEncodingException{
        List<Object> l = new ArrayList<>();
        for (Schema.Field f : a.getSchema().getFields()) {
            String field = f.name().toString();
            Object value = a.get(f.name());
            if (value == null) {
                //System.out.println("Adding null");
                l.add(""); 
            }
            else {
                switch (f.schema().getType().getName()){
                    case "union"://System.out.println("Adding union");
                        l.add(value.toString());
                        break;

                    default:l.add(value);
                        break;
                }

            }
        }
        return l;
    }
Run Code Online (Sandbox Code Playgroud)

getAvroData 方法需要具有从原始字节构造 avro 对象的代码。我还试图找出一种方法来做到这一点,而不必显式指定每个属性设置器,但似乎没有。

public static AvroData getAvroData (bytes)
{
AvroData av = AvroData.newBuilder().build();
        try {
            av.setAttr(String.valueOf("xyz"));
        .....
    }
   } 
Run Code Online (Sandbox Code Playgroud)

希望能帮助到你

  • 你好,Sunita,感谢您的回复——我不完全理解您在这里所做的事情,但看起来您正在阅读 Avro,并且可能从中构建内存中的对象。我正在寻找一种在 Avro 中编写 parquet 文件的方法,并使用 SparkSQL 在 parquet 文件中**包含 Avro 架构**,例如 `df.write.parquet("my_output_location")` (3认同)