我可以,
{
"type": "record",
"name": "Foo",
"fields": [
{"name": "bar", "type": {
"type": "record",
"name": "Bar",
"fields": [ ]
}}
]
}
Run Code Online (Sandbox Code Playgroud)
并且工作正常,但假设我想将模式拆分为两个文件,例如:
{
"type": "record",
"name": "Foo",
"fields": [
{"name": "bar", "type": "Bar"}
]
}
{
"type": "record",
"name": "Bar",
"fields": [ ]
}
Run Code Online (Sandbox Code Playgroud)
Avro有能力这样做吗?
Apache Avro提供紧凑,快速的二进制数据格式,丰富的数据结构用于序列化.但是,它需要用户为需要序列化的对象定义模式(在JSON中).
在某些情况下,这是不可能的(例如:该Java对象的类具有一些成员,其类型是外部库中的外部Java类).因此,我想知道有一个工具可以从对象的.class文件中获取信息,并为该对象生成Avro架构(如Gson使用对象的.class信息将某些对象转换为JSON字符串).
在一个宠物项目(cassandra,spark,hadoop,kafka)上工作我需要一个数据序列化框架.检查常见的三个框架 - 即Thrift,Avro和Protocolbuffers - 我注意到它们中的大多数似乎都死了,最多每年有2个小版本.
这让我有两个假设:
如果有人能给我一些暗示我的假设,欢迎任何意见.
我已经为 100 GB 大小的 522 个 gzip 文件运行了以下代码,解压缩后,它将是大约 320 GB 数据和 protobuf 格式的数据,并将输出写入 GCS。我已经使用 n1 标准机器和区域进行输入,输出都得到了照顾,工作花费了我大约 17 美元,这是半小时的数据,所以我真的很需要在这里做一些成本优化。
我从下面的查询中得到的成本
SELECT l.value AS JobID, ROUND(SUM(cost),3) AS JobCost
FROM `PROJECT.gcp_billing_data.gcp_billing_export_v1_{}` bill,
UNNEST(bill.labels) l
WHERE service.description = 'Cloud Dataflow' and l.key = 'goog-dataflow-job-id' and
extract(date from _PARTITIONTIME) > "2020-12-31"
GROUP BY 1
Run Code Online (Sandbox Code Playgroud)
完整代码
import time
import sys
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
import csv
import base64
from google.protobuf import timestamp_pb2
from google.protobuf.json_format import MessageToDict …Run Code Online (Sandbox Code Playgroud) python protocol-buffers avro google-cloud-dataflow apache-beam
我想使用Avro序列化我的Kafka消息的数据,并希望将它与Avro模式存储库一起使用,因此我不必在每条消息中都包含模式.
将Avro与Kafka一起使用似乎是一件很受欢迎的事情,很多博客/ Stack Overflow问题/用户组等参考都会发送带有消息的Schema ID,但我找不到它应该去的实际示例.
我认为它应该放在某处的Kafka消息标题中,但我找不到一个明显的地方.如果它在Avro消息中,则必须根据模式对其进行解码以获取消息内容并显示需要解码的模式,这有明显的问题.
我正在使用C#客户端,但任何语言的示例都会很棒.消息类包含以下字段:
public MessageMetadata Meta { get; set; }
public byte MagicNumber { get; set; }
public byte Attribute { get; set; }
public byte[] Key { get; set; }
public byte[] Value { get; set; }
Run Code Online (Sandbox Code Playgroud)
但这些似乎没有.MessageMetaData只有Offset和PartitionId.
那么,Avro Schema Id应该去哪里?
Avro序列化在Hadoop用户中很受欢迎,但很难找到示例.
有谁可以帮我这个示例代码?我最感兴趣的是使用Reflect API来读/写文件并使用Union和Null注释.
public class Reflect {
public class Packet {
int cost;
@Nullable TimeStamp stamp;
public Packet(int cost, TimeStamp stamp){
this.cost = cost;
this.stamp = stamp;
}
}
public class TimeStamp {
int hour = 0;
int second = 0;
public TimeStamp(int hour, int second){
this.hour = hour;
this.second = second;
}
}
public static void main(String[] args) throws IOException {
TimeStamp stamp;
Packet packet;
stamp = new TimeStamp(12, 34);
packet = new Packet(9, stamp);
write(file, packet);
packet = …Run Code Online (Sandbox Code Playgroud) 如果有人熟悉Apache Avro的Java实现,那么这在黑暗中就是一个镜头.
我的高级目标是通过网络传输一些avro数据系列(例如,让我们说HTTP,但是特定协议对于此目的并不重要).在我的上下文中我有一个HttpServletResponse我需要以某种方式编写这些数据.
我最初尝试将数据写为avro容器文件的虚拟版本(假设"response"的类型为HttpServletResponse):
response.setContentType("application/octet-stream");
response.setHeader("Content-transfer-encoding", "binary");
ServletOutputStream outStream = response.getOutputStream();
BufferedOutputStream bos = new BufferedOutputStream(outStream);
Schema someSchema = Schema.parse(".....some valid avro schema....");
GenericRecord someRecord = new GenericData.Record(someSchema);
someRecord.put("somefield", someData);
...
GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(someSchema);
DataFileWriter<GenericRecord> fileWriter = new DataFileWriter<GenericRecord>(datumWriter);
fileWriter.create(someSchema, bos);
fileWriter.append(someRecord);
fileWriter.close();
bos.flush();
Run Code Online (Sandbox Code Playgroud)
这一切都很好,但是事实证明Avro并没有提供一种方法来读取除实际文件之外的容器文件:DataFileReader只有两个构造函数:
public DataFileReader(File file, DatumReader<D> reader);
Run Code Online (Sandbox Code Playgroud)
和
public DataFileReader(SeekableInput sin, DatumReader<D> reader);
Run Code Online (Sandbox Code Playgroud)
其中SeekableInput是一些特定于avro的自定义表单,其创建也最终从文件中读取.现在给出,除非有某种方法以某种方式将InputStream强制转换为文件(http://stackoverflow.com/questions/578305/create-a-java-file-object-or-equivalent-using-a-byte- array-in-memory-without-a表明没有,我也试过查看Java文档),如果OutputStream另一端的阅读器收到该avro容器文件,这种方法将无效(我不确定为什么他们允许一个人将avro二进制容器文件输出到任意OutputStream,而没有提供从另一端的相应InputStream读取它们的方法,但这不是重点.似乎容器文件阅读器的实现需要具体文件提供的"可搜索"功能.
好的,所以看起来这种方法看起来不像我想做的那样.如何创建模仿avro容器文件的JSON响应?
public static Schema WRAPPER_SCHEMA = Schema.parse(
"{\"type\": \"record\", " +
"\"name\": \"AvroContainer\", " +
"\"doc\": \"a JSON avro container …Run Code Online (Sandbox Code Playgroud) 我编写了一个Avro架构,其中一些字段**需要**类型,String但Avro已经生成了那些类型的字段CharSequence.
我无法找到任何方法告诉Avro制作那些类型的字段String.
我试着用
"fields": [
{
"name":"startTime",
"type":"string",
"avro.java.stringImpl":"String"
},
{
"name":"endTime",
"type":"string",
"avro.java.string":"String"
}
]
Run Code Online (Sandbox Code Playgroud)
但对于这两个领域,Avro正在生成类型的字段CharSequence.
有没有其他方法来制作这些类型的字段String?
在尝试编写avro时,我收到以下错误:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 35.0 failed 1 times, most recent failure: Lost task 7.0 in stage 35.0 (TID 110, localhost): java.lang.ClassCastException: java.util.HashMap cannot be cast to org.apache.avro.mapred.AvroWrapper
Run Code Online (Sandbox Code Playgroud)
我使用以下3条记录读取了avro文件:
avro_rdd = sc.newAPIHadoopFile(
"threerecords.avro",
"org.apache.avro.mapreduce.AvroKeyInputFormat",
"org.apache.avro.mapred.AvroKey",
"org.apache.hadoop.io.NullWritable",
keyConverter="org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter",
conf=None)
output = avro_rdd.map(lambda x: x[0]).collect()
Run Code Online (Sandbox Code Playgroud)
然后我尝试写出一条记录(avro中保存的输出):
conf = {"avro.schema.input.key": reduce(lambda x, y: x + y, sc.textFile("myschema.avsc", 1).collect())}
sc.parallelize([output[0]]).map(lambda x: (x, None)).saveAsNewAPIHadoopFile(
"output.avro",
"org.apache.avro.mapreduce.AvroKeyOutputFormat",
"org.apache.avro.mapred.AvroKey",
"org.apache.hadoop.io.NullWritable",
keyConverter="org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter",
conf=conf)
Run Code Online (Sandbox Code Playgroud)
如何解决这个错误/写出一个单独的avro记录?我知道我的架构是正确的,因为它来自avro本身.
我需要使用Confluent kafka-avro-serializerMaven工件.从官方指南,我应该将此存储库添加到我的Maven pom
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
Run Code Online (Sandbox Code Playgroud)
问题是,当我得到以下响应时,URL http://packages.confluent.io/maven/似乎不起作用
<Error>
<Code>NoSuchKey</Code>
<Message>The specified key does not exist.</Message>
<Key>maven/</Key>
<RequestId>15E287D11E5D4DFA</RequestId>
<HostId>
QVr9lCF0y3SrQoa1Z0jDWtmxD3eJz1gAEdivauojVJ+Bexb2gB6JsMpnXc+JjF95i082hgSLJSM=
</HostId>
</Error>
Run Code Online (Sandbox Code Playgroud)
实际上Maven没有找到神器
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>3.1.1</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
你知道问题是什么吗?谢谢
avro ×10
java ×4
apache-kafka ×2
python ×2
apache ×1
apache-beam ×1
apache-spark ×1
confluent ×1
hadoop ×1
jsonschema ×1
maven ×1
reflection ×1
thrift ×1