我正在尝试为flume-ng编写一个自定义接收器.我查看了现有的接收器和文档并对其进行了编码.但是,应该接收事件的'process()'方法总是以null结尾.我正在做Event event = channel.take(); 但事件为空.我在日志中看到,当事件仍在通道中时,会重复调用此方法.
有人能指出我正确的方向吗?
小智 5
这是一个过程函数的框架......如果你没有得到一个回滚的事件,请将状态更改为BACKOFF.如果没有,则提交并将状态设置为READY.无论如何,您总是关闭交易.
Status status = null;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
transaction.begin();
try {
Event event = channel.take();
if (event != null && validEvent(event.getBody()) >= 0) {
# make some printing
}
transaction.commit();
status = Status.READY;
} catch (Throwable ex) {
transaction.rollback();
status = Status.BACKOFF;
logger.error("Failed to deliver event. Exception follows.", ex);
throw new EventDeliveryException("Failed to deliver event: " + ex);
} finally {
transaction.close();
}
return status;
Run Code Online (Sandbox Code Playgroud)
我相信这会奏效:).