相关疑难解决方法(0)

如何将嵌套的Avro GenericRecord转换为行

我有一个代码可以使用功能将我的avro记录转换为Row avroToRowConverter()

directKafkaStream.foreachRDD(rdd -> {
        JavaRDD<Row> newRDD= rdd.map(x->{

            Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(SchemaRegstryClient.getLatestSchema("poc2"));
            return avroToRowConverter(recordInjection.invert(x._2).get());
            });
Run Code Online (Sandbox Code Playgroud)

此功能不适用于嵌套模式(TYPE= UNION)

private static Row avroToRowConverter(GenericRecord avroRecord) {
    if (null == avroRecord) {
        return null;
    }
    //GenericData
    Object[] objectArray = new Object[avroRecord.getSchema().getFields().size()];
    StructType structType = (StructType) SchemaConverters.toSqlType(avroRecord.getSchema()).dataType();
    for (Schema.Field field : avroRecord.getSchema().getFields()) {

        if(field.schema().getType().toString().equalsIgnoreCase("STRING") || field.schema().getType().toString().equalsIgnoreCase("ENUM")){
            objectArray[field.pos()] = ""+avroRecord.get(field.pos());
        }else {
            objectArray[field.pos()] = avroRecord.get(field.pos());
        }
    }

    return new GenericRowWithSchema(objectArray, structType);
}
Run Code Online (Sandbox Code Playgroud)

谁能建议我如何将复杂的架构转换为ROW?

java avro apache-spark spark-avro

5
推荐指数
1
解决办法
2230
查看次数

标签 统计

apache-spark ×1

avro ×1

java ×1

spark-avro ×1