标签: avro

Json String to Java Object Avro

我正在尝试使用Avro架构将Json字符串转换为通用Java对象.

以下是我的代码.

String json = "{\"foo\": 30.1, \"bar\": 60.2}";
String schemaLines = "{\"type\":\"record\",\"name\":\"FooBar\",\"namespace\":\"com.foo.bar\",\"fields\":[{\"name\":\"foo\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"bar\",\"type\":[\"null\",\"double\"],\"default\":null}]}";

InputStream input = new ByteArrayInputStream(json.getBytes());
DataInputStream din = new DataInputStream(input);

Schema schema = Schema.parse(schemaLines);

Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din);

DatumReader<Object> reader = new GenericDatumReader<Object>(schema);
Object datum = reader.read(null, decoder);
Run Code Online (Sandbox Code Playgroud)

我得到"org.apache.avro.AvroTypeException:期望的start-union.得到VALUE_NUMBER_FLOAT"异常.

如果模式中没有联合,则相同的代码可以工作.有人可以解释并给我一个解决方案.

java json avro

14
推荐指数
4
解决办法
3万
查看次数

如何使用Java中的AVRO序列化Date

我实际上是在尝试使用Avro序列化包含日期的对象,并且反序列化日期与预期值不匹配(使用avro 1.7.2和1.7.1进行测试).这是我正在序列化的类:

import java.text.SimpleDateFormat;
import java.util.Date;

public class Dummy {
    private Date date;
    private SimpleDateFormat df = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss.SSS");

    public Dummy() {
    }

    public void setDate(Date date) {
        this.date = date;
    }

    public Date getDate() {
        return date;
    }

    @Override
    public String toString() {
        return df.format(date);
    }
}
Run Code Online (Sandbox Code Playgroud)

用于序列化/反序列化的代码:

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Date;

import org.apache.avro.Schema;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;

public class AvroSerialization { …
Run Code Online (Sandbox Code Playgroud)

java serialization datetime avro

13
推荐指数
2
解决办法
2万
查看次数

使用Avro将对象编码为Python中的字节数组

在python 2.7中,使用Avro,我想将一个对象编码为一个字节数组.

我发现的所有示例都写入文件.

我尝试过使用io.BytesIO(),但这给出了:

AttributeError: '_io.BytesIO' object has no attribute 'write_long'
Run Code Online (Sandbox Code Playgroud)

使用io.BytesIO的示例

def avro_encode(raw, schema):
    writer = DatumWriter(schema)
    avro_buffer = io.BytesIO()
    writer.write(raw, avro_buffer)
    return avro_buffer.getvalue()
Run Code Online (Sandbox Code Playgroud)

python-2.7 avro

13
推荐指数
1
解决办法
9366
查看次数

如何在PySpark中阅读Avro文件

我正在使用python编写一个spark作业.但是,我需要阅读一大堆avro文件.

是我在Spark的示例文件夹中找到的最接近的解决方案.但是,您需要使用spark-submit提交此python脚本.在spark-submit的命令行中,您可以指定驱动程序类,在这种情况下,将定位您的所有avrokey,avrovalue类.

avro_rdd = sc.newAPIHadoopFile(
        path,
        "org.apache.avro.mapreduce.AvroKeyInputFormat",
        "org.apache.avro.mapred.AvroKey",
        "org.apache.hadoop.io.NullWritable",
        keyConverter="org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter",
        conf=conf)
Run Code Online (Sandbox Code Playgroud)

在我的情况下,我需要在Python脚本中运行所有内容,我已经尝试创建一个环境变量来包含jar文件,手指交叉Python会将jar添加到路径但显然它不是,它给了我意想不到的类错误.

os.environ['SPARK_SUBMIT_CLASSPATH'] = "/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/spark/examples/lib/spark-examples_2.10-1.0.0-cdh5.1.0.jar"
Run Code Online (Sandbox Code Playgroud)

任何人都可以帮我如何在一个python脚本中读取avro文件?

python avro apache-spark pyspark

13
推荐指数
2
解决办法
2万
查看次数

如何在Java或Scala中读取和写入来自/到镶木地板文件的Map <String,Object>?

寻找一个关于如何Map<String, Object>在Java或Scala中读取/ 写入镶木地板文件的简明示例?

这是预期的结构,com.fasterxml.jackson.databind.ObjectMapper 在Java中用作序列化器(即使用镶木地板寻找等效物):

public static Map<String, Object> read(InputStream inputStream) throws IOException {
    ObjectMapper objectMapper = new ObjectMapper();

    return objectMapper.readValue(inputStream, new TypeReference<Map<String, Object>>() {

    });
}

public static void write(OutputStream outputStream, Map<String, Object> map) throws IOException {
    ObjectMapper objectMapper = new ObjectMapper();

    objectMapper.writeValue(outputStream, map);        
}
Run Code Online (Sandbox Code Playgroud)

java scala avro parquet

13
推荐指数
1
解决办法
9232
查看次数

使用模式将带有Spark的AVRO消息转换为DataFrame

有没有使用模式转换方式从消息?用户记录的模式文件:

{
  "fields": [
    { "name": "firstName", "type": "string" },
    { "name": "lastName", "type": "string" }
  ],
  "name": "user",
  "type": "record"
}
Run Code Online (Sandbox Code Playgroud)

来自SqlNetworkWordCount示例Kafka,Spark和Avro的代码片段- 第3部分,生成和使用Avro消息来读取消息.

object Injection {
  val parser = new Schema.Parser()
  val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json"))
  val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
}

...

messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => {
  val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
  import sqlContext.implicits._

  val df = rdd.map(message => Injection.injection.invert(message._2).get)
    .map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF()

  df.show() …
Run Code Online (Sandbox Code Playgroud)

scala avro apache-kafka apache-spark spark-streaming

13
推荐指数
1
解决办法
1万
查看次数

使用C#反序列化Avro文件

我找不到用C#反序列化Apache Avro文件的方法.Avro文件是Microsoft Azure Event Hub中的存档功能生成的文件.

使用Java,我可以使用Apache的Avro Tools将文件转换为JSON:

java -jar avro-tools-1.8.1.jar tojson --pretty inputfile > output.json
Run Code Online (Sandbox Code Playgroud)

使用NuGet包Microsoft.Hadoop.Avro我能提取SequenceNumber,OffsetEnqueuedTimeUtc,但因为我不知道该用什么类型Body的异常被抛出.我试过Dictionary<string, object>和其他类型.

static void Main(string[] args)
{
    var fileName = "...";

    using (Stream stream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read))
    {
        using (var reader = AvroContainer.CreateReader<EventData>(stream))
        {
            using (var streamReader = new SequentialReader<EventData>(reader))
            {
                var record = streamReader.Objects.FirstOrDefault();
            }
        }
    }
}

[DataContract(Namespace = "Microsoft.ServiceBus.Messaging")]
public class EventData
{ …
Run Code Online (Sandbox Code Playgroud)

c# hadoop azure avro

13
推荐指数
3
解决办法
1万
查看次数

使用Kafka Avro Console Consumer时如何传递特定Schema注册表的参数?

我正在尝试使用Confluent kafka-avro-console-consumer,但是如何将Schema Registry的参数传递给它?

avro apache-kafka confluent confluent-schema-registry

13
推荐指数
1
解决办法
5010
查看次数

Avro架构演变

我有两个问题:

  1. 是否可以使用相同的阅读器并解析用两个兼容的模式编写的记录,例如,Schema V2只有一个额外的可选字段Schema V1,我想让读者理解这两个?我认为这里的答案是否定的,但如果是,我该怎么做?

  2. 我曾尝试用Schema V1它编写记录并阅读它Schema V2,但是我收到以下错误:

    org.apache.avro.AvroTypeException:找到了foo,期待foo

我用过avro-1.7.3和:

   writer = new GenericDatumWriter<GenericData.Record>(SchemaV1);
   reader = new GenericDatumReader<GenericData.Record>(SchemaV2, SchemaV1);
Run Code Online (Sandbox Code Playgroud)

以下是两个模式的示例(我也尝试过添加命名空间,但没有运气).

架构V1:

{
"name": "foo",
"type": "record",
"fields": [{
    "name": "products",
    "type": {
        "type": "array",
        "items": {
            "name": "product",
            "type": "record",
            "fields": [{
                "name": "a1",
                "type": "string"
            }, {
                "name": "a2",
                "type": {"type": "fixed", "name": "a3", "size": 1}
            }, {
                "name": "a4",
                "type": "int"
            }, {
                "name": "a5",
                "type": "int"
            }]
        }
    }
}] …
Run Code Online (Sandbox Code Playgroud)

avro

12
推荐指数
1
解决办法
1万
查看次数

AVRO的二进制编码压缩数据吗?

在我们的一个项目中,我们使用Kafka和AVRO在应用程序之间传输数据.数据被添加到AVRO对象,对象被二进制编码以写入Kafka.我们使用二进制编码,因为与其他格式相比,它通常被称为最小表示.

数据通常是JSON字符串,当它保存在文件中时,它使用最多10 Mb的磁盘.但是,当文件被压缩(.zip)时,它只使用几个KB.我们担心在Kafka中存储这样的数据,因此在写入Kafka主题之前尝试压缩.

当测量二进制编码消息的长度(即字节数组的长度)时,它与数据串的长度成比例.所以我假设二进制编码没有减少任何大小.

有人能告诉我二进制编码压缩数据吗?如果没有,我该如何应用压缩?

谢谢!

avro

12
推荐指数
2
解决办法
1万
查看次数