我正在尝试使用Avro架构将Json字符串转换为通用Java对象.
以下是我的代码.
String json = "{\"foo\": 30.1, \"bar\": 60.2}";
String schemaLines = "{\"type\":\"record\",\"name\":\"FooBar\",\"namespace\":\"com.foo.bar\",\"fields\":[{\"name\":\"foo\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"bar\",\"type\":[\"null\",\"double\"],\"default\":null}]}";
InputStream input = new ByteArrayInputStream(json.getBytes());
DataInputStream din = new DataInputStream(input);
Schema schema = Schema.parse(schemaLines);
Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din);
DatumReader<Object> reader = new GenericDatumReader<Object>(schema);
Object datum = reader.read(null, decoder);
Run Code Online (Sandbox Code Playgroud)
我得到"org.apache.avro.AvroTypeException:期望的start-union.得到VALUE_NUMBER_FLOAT"异常.
如果模式中没有联合,则相同的代码可以工作.有人可以解释并给我一个解决方案.
我实际上是在尝试使用Avro序列化包含日期的对象,并且反序列化日期与预期值不匹配(使用avro 1.7.2和1.7.1进行测试).这是我正在序列化的类:
import java.text.SimpleDateFormat;
import java.util.Date;
public class Dummy {
private Date date;
private SimpleDateFormat df = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss.SSS");
public Dummy() {
}
public void setDate(Date date) {
this.date = date;
}
public Date getDate() {
return date;
}
@Override
public String toString() {
return df.format(date);
}
}
Run Code Online (Sandbox Code Playgroud)
用于序列化/反序列化的代码:
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Date;
import org.apache.avro.Schema;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
public class AvroSerialization { …Run Code Online (Sandbox Code Playgroud) 在python 2.7中,使用Avro,我想将一个对象编码为一个字节数组.
我发现的所有示例都写入文件.
我尝试过使用io.BytesIO(),但这给出了:
AttributeError: '_io.BytesIO' object has no attribute 'write_long'
Run Code Online (Sandbox Code Playgroud)
使用io.BytesIO的示例
def avro_encode(raw, schema):
writer = DatumWriter(schema)
avro_buffer = io.BytesIO()
writer.write(raw, avro_buffer)
return avro_buffer.getvalue()
Run Code Online (Sandbox Code Playgroud) 我正在使用python编写一个spark作业.但是,我需要阅读一大堆avro文件.
这是我在Spark的示例文件夹中找到的最接近的解决方案.但是,您需要使用spark-submit提交此python脚本.在spark-submit的命令行中,您可以指定驱动程序类,在这种情况下,将定位您的所有avrokey,avrovalue类.
avro_rdd = sc.newAPIHadoopFile(
path,
"org.apache.avro.mapreduce.AvroKeyInputFormat",
"org.apache.avro.mapred.AvroKey",
"org.apache.hadoop.io.NullWritable",
keyConverter="org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter",
conf=conf)
Run Code Online (Sandbox Code Playgroud)
在我的情况下,我需要在Python脚本中运行所有内容,我已经尝试创建一个环境变量来包含jar文件,手指交叉Python会将jar添加到路径但显然它不是,它给了我意想不到的类错误.
os.environ['SPARK_SUBMIT_CLASSPATH'] = "/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/spark/examples/lib/spark-examples_2.10-1.0.0-cdh5.1.0.jar"
Run Code Online (Sandbox Code Playgroud)
任何人都可以帮我如何在一个python脚本中读取avro文件?
寻找一个关于如何Map<String, Object>在Java或Scala中读取/ 写入镶木地板文件的简明示例?
这是预期的结构,com.fasterxml.jackson.databind.ObjectMapper 在Java中用作序列化器(即使用镶木地板寻找等效物):
public static Map<String, Object> read(InputStream inputStream) throws IOException {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(inputStream, new TypeReference<Map<String, Object>>() {
});
}
public static void write(OutputStream outputStream, Map<String, Object> map) throws IOException {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.writeValue(outputStream, map);
}
Run Code Online (Sandbox Code Playgroud) 有没有使用模式转换方式的Avro从消息卡夫卡与火花到数据帧?用户记录的模式文件:
{
"fields": [
{ "name": "firstName", "type": "string" },
{ "name": "lastName", "type": "string" }
],
"name": "user",
"type": "record"
}
Run Code Online (Sandbox Code Playgroud)
来自SqlNetworkWordCount示例和Kafka,Spark和Avro的代码片段- 第3部分,生成和使用Avro消息来读取消息.
object Injection {
val parser = new Schema.Parser()
val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json"))
val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
}
...
messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => {
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
val df = rdd.map(message => Injection.injection.invert(message._2).get)
.map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF()
df.show() …Run Code Online (Sandbox Code Playgroud) 我找不到用C#反序列化Apache Avro文件的方法.Avro文件是Microsoft Azure Event Hub中的存档功能生成的文件.
使用Java,我可以使用Apache的Avro Tools将文件转换为JSON:
java -jar avro-tools-1.8.1.jar tojson --pretty inputfile > output.json
Run Code Online (Sandbox Code Playgroud)
使用NuGet包Microsoft.Hadoop.Avro我能提取SequenceNumber,Offset和EnqueuedTimeUtc,但因为我不知道该用什么类型Body的异常被抛出.我试过Dictionary<string, object>和其他类型.
static void Main(string[] args)
{
var fileName = "...";
using (Stream stream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read))
{
using (var reader = AvroContainer.CreateReader<EventData>(stream))
{
using (var streamReader = new SequentialReader<EventData>(reader))
{
var record = streamReader.Objects.FirstOrDefault();
}
}
}
}
[DataContract(Namespace = "Microsoft.ServiceBus.Messaging")]
public class EventData
{ …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用Confluent kafka-avro-console-consumer,但是如何将Schema Registry的参数传递给它?
我有两个问题:
是否可以使用相同的阅读器并解析用两个兼容的模式编写的记录,例如,Schema V2只有一个额外的可选字段Schema V1,我想让读者理解这两个?我认为这里的答案是否定的,但如果是,我该怎么做?
我曾尝试用Schema V1它编写记录并阅读它Schema V2,但是我收到以下错误:
org.apache.avro.AvroTypeException:找到了foo,期待foo
我用过avro-1.7.3和:
writer = new GenericDatumWriter<GenericData.Record>(SchemaV1);
reader = new GenericDatumReader<GenericData.Record>(SchemaV2, SchemaV1);
Run Code Online (Sandbox Code Playgroud)
以下是两个模式的示例(我也尝试过添加命名空间,但没有运气).
架构V1:
{
"name": "foo",
"type": "record",
"fields": [{
"name": "products",
"type": {
"type": "array",
"items": {
"name": "product",
"type": "record",
"fields": [{
"name": "a1",
"type": "string"
}, {
"name": "a2",
"type": {"type": "fixed", "name": "a3", "size": 1}
}, {
"name": "a4",
"type": "int"
}, {
"name": "a5",
"type": "int"
}]
}
}
}] …Run Code Online (Sandbox Code Playgroud) 在我们的一个项目中,我们使用Kafka和AVRO在应用程序之间传输数据.数据被添加到AVRO对象,对象被二进制编码以写入Kafka.我们使用二进制编码,因为与其他格式相比,它通常被称为最小表示.
数据通常是JSON字符串,当它保存在文件中时,它使用最多10 Mb的磁盘.但是,当文件被压缩(.zip)时,它只使用几个KB.我们担心在Kafka中存储这样的数据,因此在写入Kafka主题之前尝试压缩.
当测量二进制编码消息的长度(即字节数组的长度)时,它与数据串的长度成比例.所以我假设二进制编码没有减少任何大小.
有人能告诉我二进制编码压缩数据吗?如果没有,我该如何应用压缩?
谢谢!
avro ×10
java ×3
apache-kafka ×2
apache-spark ×2
scala ×2
azure ×1
c# ×1
confluent ×1
datetime ×1
hadoop ×1
json ×1
parquet ×1
pyspark ×1
python ×1
python-2.7 ×1