在节点js中使用Avro序列化数据

Pis*_*olo 10 serialization json node.js avro

我想序列化来自JSON对象的数据,并通过网络将其发送到kafka作为结束.现在我在一个文件中有一个avro模式,它确定了为日志记录系统发送到kafka所需的字段:

{"namespace": "com.company.wr.messages",
   "type": "record",
   "name": "Log",
   "fields": [
       {"name": "timestamp", "type": "long"},
       {"name": "source", "type": "string"},
       {"name": "version", "type": "string"},
       {"name": "ipAddress", "type": "string"},
       {"name": "name", "type": "string"},
       {"name": "level", "type": "string"},
       {"name": "errorCode", "type": "string"},
       {"name": "message", "type": "string"}
       ]
}
Run Code Online (Sandbox Code Playgroud)

我正在使用节点包'avro-schema',我尝试了其他的但是当时没有一个工作得很好,我只需要从节点js以avro方式序列化.

mtt*_*tth 5

avsc:

var avro = require('avsc');

// Parse the schema.
var logType = avro.parse({
  "namespace": "com.company.wr.messages",
  "type": "record",
  "name": "Log",
  "fields": [
    {"name": "timestamp", "type": "long"},
    {"name": "source", "type": "string"},
    {"name": "version", "type": "string"},
    {"name": "ipAddress", "type": "string"},
    {"name": "name", "type": "string"},
    {"name": "level", "type": "string"},
    {"name": "errorCode", "type": "string"},
    {"name": "message", "type": "string"}
  ]
});

// A sample log record.
var obj = {
  timestamp: 2313213,
  source: 'src',
  version: '1.0',
  ipAddress: '0.0.0.0',
  name: 'foo',
  level: 'INFO',
  errorCode: '',
  message: ''
};

// And its corresponding Avro encoding.
var buf = logType.toBuffer(obj);
Run Code Online (Sandbox Code Playgroud)

您可以在此处找到有关各种编码方法的更多信息.


Mik*_*tra 2

以下是我们针对类似用例所做的示例,其中我们将 Avro 记录发送到另一个队列 (Amazon Kinesis),并根据您的架构进行调整。我们将其与node-avro-io 0.2.0 和stream-to-arry 2.0.2 一起使用。

var avro = require('node-avro-io');
var toArray = require('stream-to-array');
var schema = {
    "namespace": "com.company.wr.messages",
    "type": "record",
    "name": "Log",
    "fields": [
        {"name": "timestamp", "type": "long"},
        {"name": "source", "type": "string"},
        {"name": "version", "type": "string"},
        {"name": "ipAddress", "type": "string"},
        {"name": "name", "type": "string"},
        {"name": "level", "type": "string"},
        {"name": "errorCode", "type": "string"},
        {"name": "message", "type": "string"}
    ]
};
var writer = new avro.DataFile.Writer(schema, "snappy");
toArray(writer, function(err, arr) {
    var dataBuffer = Buffer.concat(arr);
    // Send dataBuffer to Kafka here
});
var record = {
    "timestamp": 123,
    "source": "example.com",
    "version": "HTTP 1.1",
    "ipAddress": "123.123.123.123",
    "name": "Jim",
    "level": "INFO",
    "errorCode": "200",
    "message": "foo"
};
writer.append(record).end();
Run Code Online (Sandbox Code Playgroud)

在撰写本文时,node-avro-io 的示例用于在文件系统上序列化/反序列化 Avro 文件。此示例使用stream-to-array包作为获取Buffer基于流的node-avro-io包的快捷方式。可以Buffer作为 Kafka 生产者中的消息发送到您的队列。

其他一些 node.js 包(例如 avronode 和Collective 的 node-avro)是 C++ 库的包装器。我使用这些软件包并没有取得那么大的成功。这里有一个关于 node-avro 的 Avro C++ 库安装说明的 tl:dr(为其构建 .deb 包)。它可能对任何 C++ 包装器包有帮助。

sudo apt-get install -y libboost-all-dev cmake checkinstall
ssh clone git@github.com:apache/avro.git
cd avro
git checkout release-1.7.7
cd lang/c++
cmake -G "Unix Makefiles"
sudo checkinstall -y \
    --install=no \
    --pkgname="avro-cpp" \
    --pkgrelease="1.7.7" \
    --maintainer="me@example.com" \
    --addso=yes
Run Code Online (Sandbox Code Playgroud)

对于 Collective 的 node-avro,我必须从Ubuntu 14.04 上的脚本中删除该export CXXFLAGS="-fcxx-exceptions"行。bin/install-and-run-tests