ZigZag需要大量的开销才能写入/读取数字.实际上我惊呆了,看到它不仅仅是按原样写入int/long值,而是进行了大量额外的加扰.甚至还有一个循环:https: //github.com/mardambey/mypipe/blob/master/avro/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryEncoder.java#L90
我似乎无法在Protocol Buffers文档或Avro文档中找到,或者说我自己,那些扰乱数字的优势是什么?为什么在编码后交替使用正数和负数会更好?
为什么他们不只是用little-endian,big-endian,网络顺序编写,只需要将它们读入内存并可能反转位字节序?我们用性能支付什么?
我是Hadoop和编程新手,我对Avro架构演变有点困惑.到目前为止,我将解释我对Avro的理解.
Avro是一个序列化工具,它将顶层的json模式存储为二进制数据.架构看起来像这样.
{
"namespace":"com.trese.db.model",
"type":"record",
"doc":"This Schema describes about Product",
"name":"Product",
"fields":[
{"name":"product_id","type": "long"},
{"name":"product_name","type": "string","doc":"This is the name of the product"},
{"name":"cost","type": "float", "aliases":["price"]},
{"name":"discount","type": "float", "default":5}
]
}
Run Code Online (Sandbox Code Playgroud)
现在我的问题是为什么我们需要进化?我已经读过,我们可以default在架构中使用新字段; 但是如果我们在文件中添加新模式,那么早期的模式将被覆盖.我们不能为单个文件提供两个模式.
另一个问题是,什么是读写器模式以及它们如何帮助?
我想知道是否可以有一个可选的数组.让我们假设一个这样的架构:
{
"type": "record",
"name": "test_avro",
"fields" : [
{"name": "test_field_1", "type": "long"},
{"name": "subrecord", "type": [{
"type": "record",
"name": "subrecord_type",
"fields":[{"name":"field_1", "type":"long"}]
},"null"]
},
{"name": "simple_array",
"type":{
"type": "array",
"items": "string"
}
}
]
}
Run Code Online (Sandbox Code Playgroud)
尝试编写没有"simple_array"的avro记录会导致数据编写器中的NPE.对于subrecord,它很好,但是当我尝试将数组定义为可选时:
{"name": "simple_array",
"type":[{
"type": "array",
"items": "string"
}, "null"]
Run Code Online (Sandbox Code Playgroud)
它不会导致NPE,但会导致运行时异常:
AvroRuntimeException: Not an array schema: [{"type":"array","items":"string"},"null"]
Run Code Online (Sandbox Code Playgroud)
谢谢.
如果我使用模式版本1序列化对象,然后将模式更新为版本2(例如通过添加字段) - 我是否需要在稍后反序列化对象时使用模式版本1?理想情况下,我只想使用模式版本2,并且反序列化对象具有在最初序列化对象后添加到模式的字段的默认值.
也许一些代码会更好地解释......
schema1:
{"type": "record",
"name": "User",
"fields": [
{"name": "firstName", "type": "string"}
]}
Run Code Online (Sandbox Code Playgroud)
SCHEMA2:
{"type": "record",
"name": "User",
"fields": [
{"name": "firstName", "type": "string"},
{"name": "lastName", "type": "string", "default": ""}
]}
Run Code Online (Sandbox Code Playgroud)
使用通用的非代码生成方法:
// serialize
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
GenericDatumWriter writer = new GenericDatumWriter(schema1);
GenericRecord datum = new GenericData.Record(schema1);
datum.put("firstName", "Jack");
writer.write(datum, encoder);
encoder.flush();
out.close();
byte[] bytes = out.toByteArray();
// deserialize
// I would like to not have any reference to schema1 …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用Python Avro库(https://pypi.python.org/pypi/avro)来读取由JAVA生成的AVRO文件.由于架构已嵌入avro文件中,为什么我需要指定架构文件?有没有办法自动提取它?
发现另一个叫做fastavro的软件包(https://pypi.python.org/pypi/fastavro)可以提取avro架构.手册是否在设计中指定了python arvo包中的模式文件?非常感谢你.
我正在评估 kinesis 作为 kafka 的替代品。我缺少的一件事是模式注册表等效解决方案。我特别需要:
处理上述 2 的选项是什么?我发现的唯一一件事是胶水目录,但似乎并没有
最后我也想使用 firehose(输出到 redshift),但据我所知,这是不可能的,需要编写自定义 lambda。
我在文件路径中的HDFS中有Avro格式的数据,如:/data/logs/[foldername]/[filename].avro.我想在所有这些日志文件上创建一个Hive表,即表单的所有文件/data/logs/*/*.(它们都基于相同的Avro架构.)
我用标志运行以下查询mapred.input.dir.recursive=true:
CREATE EXTERNAL TABLE default.testtable
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION 'hdfs://.../data/*/*'
TBLPROPERTIES (
'avro.schema.url'='hdfs://.../schema.avsc')
Run Code Online (Sandbox Code Playgroud)
该表最终为空,除非我更改LOCATION为更少嵌套,即'hdfs://.../data/[foldername]/'与某个foldername.对于较少嵌套的路径,这没有问题LOCATION.
我希望能够从所有这些不同的[foldername]文件夹中获取数据.如何在嵌套目录中进一步进行递归输入选择?
我已经浏览了avro的C文档, 我发现我只能获得avro输出文件.如何将序列化输出提供给缓冲区,以便我可以通过tcp套接字发送.任何帮助深表感谢.
我使用Apache avro架构与Kafka 0.0.8V.我在生产者/消费者端使用相同的模式.目前暂无任何变化的模式.但是当我尝试使用消息时,我在消费者处得到了一些例外.为什么我会收到此错误?
制片人
public void sendFile(String topic, GenericRecord payload, Schema schema) throws CoreException, IOException {
BinaryEncoder encoder = null;
ByteArrayOutputStream out = null;
try {
DatumWriter<GenericRecord> writer = new SpecificDatumWriter<GenericRecord>(schema);
out = new ByteArrayOutputStream();
encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(payload, encoder);
encoder.flush();
byte[] serializedBytes = out.toByteArray();
KeyedMessage<String, byte[]> message = new KeyedMessage<String, byte[]>(topic, serializedBytes);
producer.send(message);
}
Run Code Online (Sandbox Code Playgroud)
消费者
public void run() {
try {
ConsumerIterator<byte[], byte[]> itr = stream.iterator();
while (itr.hasNext()) {
byte[] data = itr.next().message();
Schema schema = …Run Code Online (Sandbox Code Playgroud) 我们使用kafka存储消息并推送大量消息(一分钟> 30k).我不确定它是否相关,但是作为kafka消息的生产者的代码是在jruby中.
序列化和反序列化消息也会对系统产生性能影响.
在序列化和反序列化的速度方面,有人可以帮助比较Avro与Protocol Buffer.
performance serialization protocol-buffers avro apache-kafka