Pis*_*olo 10 serialization json node.js avro
我想序列化来自JSON对象的数据,并通过网络将其发送到kafka作为结束.现在我在一个文件中有一个avro模式,它确定了为日志记录系统发送到kafka所需的字段:
{"namespace": "com.company.wr.messages",
"type": "record",
"name": "Log",
"fields": [
{"name": "timestamp", "type": "long"},
{"name": "source", "type": "string"},
{"name": "version", "type": "string"},
{"name": "ipAddress", "type": "string"},
{"name": "name", "type": "string"},
{"name": "level", "type": "string"},
{"name": "errorCode", "type": "string"},
{"name": "message", "type": "string"}
]
}
Run Code Online (Sandbox Code Playgroud)
我正在使用节点包'avro-schema',我尝试了其他的但是当时没有一个工作得很好,我只需要从节点js以avro方式序列化.
用avsc:
var avro = require('avsc');
// Parse the schema.
var logType = avro.parse({
"namespace": "com.company.wr.messages",
"type": "record",
"name": "Log",
"fields": [
{"name": "timestamp", "type": "long"},
{"name": "source", "type": "string"},
{"name": "version", "type": "string"},
{"name": "ipAddress", "type": "string"},
{"name": "name", "type": "string"},
{"name": "level", "type": "string"},
{"name": "errorCode", "type": "string"},
{"name": "message", "type": "string"}
]
});
// A sample log record.
var obj = {
timestamp: 2313213,
source: 'src',
version: '1.0',
ipAddress: '0.0.0.0',
name: 'foo',
level: 'INFO',
errorCode: '',
message: ''
};
// And its corresponding Avro encoding.
var buf = logType.toBuffer(obj);
Run Code Online (Sandbox Code Playgroud)
您可以在此处找到有关各种编码方法的更多信息.
以下是我们针对类似用例所做的示例,其中我们将 Avro 记录发送到另一个队列 (Amazon Kinesis),并根据您的架构进行调整。我们将其与node-avro-io 0.2.0 和stream-to-arry 2.0.2 一起使用。
var avro = require('node-avro-io');
var toArray = require('stream-to-array');
var schema = {
"namespace": "com.company.wr.messages",
"type": "record",
"name": "Log",
"fields": [
{"name": "timestamp", "type": "long"},
{"name": "source", "type": "string"},
{"name": "version", "type": "string"},
{"name": "ipAddress", "type": "string"},
{"name": "name", "type": "string"},
{"name": "level", "type": "string"},
{"name": "errorCode", "type": "string"},
{"name": "message", "type": "string"}
]
};
var writer = new avro.DataFile.Writer(schema, "snappy");
toArray(writer, function(err, arr) {
var dataBuffer = Buffer.concat(arr);
// Send dataBuffer to Kafka here
});
var record = {
"timestamp": 123,
"source": "example.com",
"version": "HTTP 1.1",
"ipAddress": "123.123.123.123",
"name": "Jim",
"level": "INFO",
"errorCode": "200",
"message": "foo"
};
writer.append(record).end();
Run Code Online (Sandbox Code Playgroud)
在撰写本文时,node-avro-io 的示例用于在文件系统上序列化/反序列化 Avro 文件。此示例使用stream-to-array包作为获取Buffer基于流的node-avro-io包的快捷方式。可以Buffer作为 Kafka 生产者中的消息发送到您的队列。
其他一些 node.js 包(例如 avronode 和Collective 的 node-avro)是 C++ 库的包装器。我使用这些软件包并没有取得那么大的成功。这里有一个关于 node-avro 的 Avro C++ 库安装说明的 tl:dr(为其构建 .deb 包)。它可能对任何 C++ 包装器包有帮助。
sudo apt-get install -y libboost-all-dev cmake checkinstall
ssh clone git@github.com:apache/avro.git
cd avro
git checkout release-1.7.7
cd lang/c++
cmake -G "Unix Makefiles"
sudo checkinstall -y \
--install=no \
--pkgname="avro-cpp" \
--pkgrelease="1.7.7" \
--maintainer="me@example.com" \
--addso=yes
Run Code Online (Sandbox Code Playgroud)
对于 Collective 的 node-avro,我必须从Ubuntu 14.04 上的脚本中删除该export CXXFLAGS="-fcxx-exceptions"行。bin/install-and-run-tests