我正在尝试使用Avro架构验证JSON文件并编写相应的Avro文件.首先,我定义了以下名为的Avro架构user.avsc:
{"namespace": "example.avro",
"type": "record",
"name": "user",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
Run Code Online (Sandbox Code Playgroud)
然后创建了一个user.json文件:
{"name": "Alyssa", "favorite_number": 256, "favorite_color": null}
Run Code Online (Sandbox Code Playgroud)
然后尝试运行:
java -jar ~/bin/avro-tools-1.7.7.jar fromjson --schema-file user.avsc user.json > user.avro
Run Code Online (Sandbox Code Playgroud)
但我得到以下异常:
Exception in thread "main" org.apache.avro.AvroTypeException: Expected start-union. Got VALUE_NUMBER_INT
at org.apache.avro.io.JsonDecoder.error(JsonDecoder.java:697)
at org.apache.avro.io.JsonDecoder.readIndex(JsonDecoder.java:441)
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
at org.apache.avro.tool.DataFileWriteTool.run(DataFileWriteTool.java:99)
at org.apache.avro.tool.Main.run(Main.java:84) …Run Code Online (Sandbox Code Playgroud) 我在Spark,我有一个Avro文件的RDD.我现在想对该RDD进行一些转换并将其保存为Avro文件:
val job = new Job(new Configuration())
AvroJob.setOutputKeySchema(job, getOutputSchema(inputSchema))
rdd.map(elem => (new SparkAvroKey(doTransformation(elem._1)), elem._2))
.saveAsNewAPIHadoopFile(outputPath,
classOf[AvroKey[GenericRecord]],
classOf[org.apache.hadoop.io.NullWritable],
classOf[AvroKeyOutputFormat[GenericRecord]],
job.getConfiguration)
Run Code Online (Sandbox Code Playgroud)
运行时,Spark会抱怨Schema $ recordSchema不可序列化.
如果我取消注释.map调用(并且只有rdd.saveAsNewAPIHadoopFile),则调用成功.
我在这做错了什么?
任何的想法?
你如何首先提取架构,然后从java中的avro文件中提取数据?除了java之外,与此问题相同.
我已经看到了如何从avsc文件而不是avro文件中获取模式的示例.任何方向都非常赞赏.
Schema schema = new Schema.Parser().parse(new File("/home/Hadoop/Avro/schema/emp.avsc"));
Run Code Online (Sandbox Code Playgroud) 为了便于在Scala中使用Avro,我想基于存储在.avro文件中的模式来定义案例类.我可以尝试:
谢谢,任何建议表示赞赏.-Julian
是否有任何工具能够从"典型的"JSON文档创建AVRO模式.
例如:
{
"records":[{"name":"X1","age":2},{"name":"X2","age":4}]
}
Run Code Online (Sandbox Code Playgroud)
我找到了http://jsonschema.net/reboot/#/,它生成了一个' json-schema '
{
"$schema": "http://json-schema.org/draft-04/schema#",
"id": "http://jsonschema.net#",
"type": "object",
"required": false,
"properties": {
"records": {
"id": "#records",
"type": "array",
"required": false,
"items": {
"id": "#1",
"type": "object",
"required": false,
"properties": {
"name": {
"id": "#name",
"type": "string",
"required": false
},
"age": {
"id": "#age",
"type": "integer",
"required": false
}
}
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
但我想要一个AVRO版本.
我正在寻找一种以通用方式将POJO转换为avro对象的方法.对于POJO级的任何更改,实现都应该是健壮的.我已经实现了它,但明确地填写了avro记录(参见下面的示例).
有没有办法摆脱硬编码的字段名称,只是填充对象的avro记录?反射是唯一的方式,还是avro提供开箱即用的功能?
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.reflect.ReflectData;
public class PojoToAvroExample {
static class PojoParent {
public final Map<String, String> aMap = new HashMap<String, String>();
public final Map<String, Integer> anotherMap = new HashMap<String, Integer>();
}
static class Pojo extends PojoParent {
public String uid;
public Date eventTime;
}
static Pojo createPojo() {
Pojo foo = new Pojo();
foo.uid = "123";
foo.eventTime = new Date();
foo.aMap.put("key", "val");
foo.anotherMap.put("key", 42);
return foo;
}
public static void …Run Code Online (Sandbox Code Playgroud) 我为我的Spark作业启用了Kryo序列化,启用了设置以要求注册,并确保我的所有类型都已注册.
val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrationRequired", "true")
conf.registerKryoClasses(classes)
conf.registerAvroSchemas(avroSchemas: _*)
Run Code Online (Sandbox Code Playgroud)
作业的Wallclock-time性能恶化了大约20%,并且洗牌的字节数增加了近400%.
鉴于Spark文档建议Kryo应该更好,这对我来说似乎真的很令人惊讶.
Kryo比Java序列化更快,更紧凑(通常高达10倍)
我手动调用serializeSpark的实例上的方法org.apache.spark.serializer.KryoSerializer和org.apache.spark.serializer.JavaSerializer我的数据示例.结果与Spark文档中的建议一致:Kryo生成了98个字节; Java产生了993个字节.这确实是10倍的改进.
一个可能混淆的因素是被序列化和混洗的对象实现了Avro GenericRecord接口.我尝试注册Avro架构SparkConf,但没有显示出任何改进.
我尝试制作新的类来改组简单的Scala数据case class,不包括任何Avro机器.它没有改善shuffle性能或交换的字节数.
Spark代码最终沸腾到以下:
case class A(
f1: Long,
f2: Option[Long],
f3: Int,
f4: Int,
f5: Option[String],
f6: Option[Int],
f7: Option[String],
f8: Option[Int],
f9: Option[Int],
f10: Option[Int],
f11: Option[Int],
f12: String,
f13: Option[Double],
f14: Option[Int],
f15: Option[Double],
f16: Option[Double],
f17: List[String],
f18: String) extends org.apache.avro.specific.SpecificRecordBase {
def get(f: …Run Code Online (Sandbox Code Playgroud) 我在Spark Structured Streaming中使用Kafka Source来接收Confluent编码的Avro记录.我打算使用Confluent Schema Registry,但是与spark结构化流媒体的集成似乎是不可能的.
我已经看到了这个问题,但无法使用Confluent Schema Registry.使用Spark 2.0.2读取来自Kafka的Avro消息(结构化流式传输)
avro apache-kafka apache-spark confluent-schema-registry spark-structured-streaming
avro ×10
apache-spark ×3
java ×3
scala ×3
json ×2
schema ×2
apache-kafka ×1
avro-tools ×1
case-class ×1
generator ×1
java-8 ×1
mapreduce ×1
performance ×1
validation ×1