Dar*_*ord 6 c# azure avro azure-eventhub
是否有任何 C# 示例代码用于读取 Azure 事件中心存档文件(Avro 格式)?
我正在尝试使用 Microsoft.Hadoop.Avro 库。我使用 java avro 工具转储模式,该工具生成以下内容:
{
""type"":""record"",
""name"":""EventData"",
""namespace"":""Microsoft.ServiceBus.Messaging"",
""fields"":[
{""name"":""SequenceNumber"",""type"":""long""},
{""name"":""Offset"",""type"":""string""},
{""name"":""EnqueuedTimeUtc"",""type"":""string""},
{""name"":""SystemProperties"",""type"":{ ""type"":""map"",""values"":[""long"",""double"",""string"",""bytes""]}},
{""name"":""Properties"",""type"":{ ""type"":""map"",""values"":[""long"",""double"",""string"",""bytes"", ""null""]}},
{""name"":""Body"",""type"":[""null"",""bytes""]}
]
}
Run Code Online (Sandbox Code Playgroud)
但是,当尝试反序列化文件以读回数据时,如下所示:
using (var reader = AvroContainer.CreateReader<EventData>(stream))
{
using (var streamReader = new SequentialReader<EventData>(reader))
{
foreach (EventData dta in streamReader.Objects)
{
//stuff here
}
}
}
Run Code Online (Sandbox Code Playgroud)
当传递生产者端使用的实际 EventData 类型时它不起作用,因此我尝试创建一个用 DataContract 属性标记的特殊类,如下所示:
[DataContract(Namespace = "Microsoft.ServiceBus.Messaging")]
public class EventData
{
[DataMember(Name = "SequenceNumber")]
public long SequenceNumber { get; set; }
[DataMember(Name = "Offset")]
public string Offset { get; set; }
[DataMember(Name = "EnqueuedTimeUtc")]
public string EnqueuedTimeUtc { get; set; }
[DataMember(Name = "Body")]
public ArraySegment<byte> Body { get; set; }
//[DataMember(Name = "SystemProperties")]
//public SystemPropertiesCollection SystemProperties { get; set; }
//[DataMember(Name = "Properties")]
//public IDictionary<string, object> Properties { get; set; }
}
Run Code Online (Sandbox Code Playgroud)
它出现以下错误:
System.Runtime.Serialization.SerializationException occurred
Message=Cannot match the union schema.
Run Code Online (Sandbox Code Playgroud)
对于使用 C# 读取 Avro 存档文件的用例,MS 中没有示例代码吗?
我使用了 Microsoft.Hadoop.Avro 和 apache avro C# 库,它们似乎有同样的问题。当只是尝试读取序列、偏移量和 EnqueuedTimeUTC 时,它们都会得到相同的乱码数据,这些数据似乎是编解码器和模式定义数据。这就是我发现的。我正在将 blob 下载到内存流,然后尝试从那里反序列化。问题在于反序列化器没有考虑文件中的标头和架构,而是尝试从流的最开始进行反序列化。
为了解决这个问题,有效的方法是使用 Apache Avro C# 库并使用其 gen 工具根据转储的 json 格式架构创建 C# 类,然后使用可以从流中读取的 DataFileReader。
using (var dataFileReader = Avro.File.DataFileReader<EventData>.OpenReader(stream, evtSample.Schema))
Run Code Online (Sandbox Code Playgroud)
其中 evtSample.Schema 是包含其架构的 EventData 类的实例。
现在看看我是否可以使用 Microsoft.Hadoop.Avro 库做同样的事情。
顺便说一句,这是 Apache AVRO gen 工具生成的 C# 类输出:
public partial class EventData : ISpecificRecord
{
public static Schema _SCHEMA = Avro.Schema.Parse(@"{""type"":""record"",""name"":""EventData"",""namespace"":""Microsoft.ServiceBus.Messaging"",""fields"":[{""name"":""SequenceNumber"",""type"":""long""},{""name"":""Offset"",""type"":""string""},{""name"":""EnqueuedTimeUtc"",""type"":""string""},{""name"":""SystemProperties"",""type"":{""type"":""map"",""values"":[""long"",""double"",""string"",""bytes""]}},{""name"":""Properties"",""type"":{""type"":""map"",""values"":[""long"",""double"",""string"",""bytes"",""null""]}},{""name"":""Body"",""type"":[""null"",""bytes""]}]}");
private long _SequenceNumber;
private string _Offset;
private string _EnqueuedTimeUtc;
private IDictionary<string, System.Object> _SystemProperties;
private IDictionary<string, System.Object> _Properties;
private byte[] _Body;
public virtual Schema Schema
{
get
{
return EventData._SCHEMA;
}
}
public long SequenceNumber
{
get
{
return this._SequenceNumber;
}
set
{
this._SequenceNumber = value;
}
}
public string Offset
{
get
{
return this._Offset;
}
set
{
this._Offset = value;
}
}
public string EnqueuedTimeUtc
{
get
{
return this._EnqueuedTimeUtc;
}
set
{
this._EnqueuedTimeUtc = value;
}
}
public IDictionary<string, System.Object> SystemProperties
{
get
{
return this._SystemProperties;
}
set
{
this._SystemProperties = value;
}
}
public IDictionary<string, System.Object> Properties
{
get
{
return this._Properties;
}
set
{
this._Properties = value;
}
}
public byte[] Body
{
get
{
return this._Body;
}
set
{
this._Body = value;
}
}
public virtual object Get(int fieldPos)
{
switch (fieldPos)
{
case 0: return this.SequenceNumber;
case 1: return this.Offset;
case 2: return this.EnqueuedTimeUtc;
case 3: return this.SystemProperties;
case 4: return this.Properties;
case 5: return this.Body;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()");
};
}
public virtual void Put(int fieldPos, object fieldValue)
{
switch (fieldPos)
{
case 0: this.SequenceNumber = (System.Int64)fieldValue; break;
case 1: this.Offset = (System.String)fieldValue; break;
case 2: this.EnqueuedTimeUtc = (System.String)fieldValue; break;
case 3: this.SystemProperties = (IDictionary<string, System.Object>)fieldValue; break;
case 4: this.Properties = (IDictionary<string, System.Object>)fieldValue; break;
case 5: this.Body = (System.Byte[])fieldValue; break;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()");
};
}
}
Run Code Online (Sandbox Code Playgroud)
}
| 归档时间: |
|
| 查看次数: |
2699 次 |
| 最近记录: |