正确的方法来停止自定义logback异步appender

tau*_*nen 5 java logback appender aws-sdk

我使用亚马逊的Java SDK创建了Amazon SQS和SNS logback appender.基本的appender使用同步Java API,但我也通过扩展ch.qos.logback.classic.AsyncAppender类创建了两者的异步版本.

使用异步appender停止logback记录器上下文不会按预期工作.当上下文停止时,所有异步appender都会在退出之前尝试刷新剩余事件.问题源于ch.qos.logback.core.AsyncAppenderBase#stop方法,它会中断工作线程.当Amazon SDK仍在处理排队事件并产生结果时触发中断com.amazonaws.AbortedException.在我的测试中,AbortedException当SDK处理来自API的响应时发生了这样的事情,因此实际的消息经历了,但情况可能并非总是如此.

即使工作者仍应处理剩余的事件队列,是否打算使用logback中断工作线程?如果是这样,我怎样才能解决AbortedException由中断引起的问题?我可以覆盖整个停止方法并删除中断,但这需要复制粘贴大部分实现.

tau*_*nen 1

我终于找到了一个解决方案,我认为这不是最佳的,而且远非简单,但它有效。

我的第一次尝试是将 AWS SDK API 的异步版本与 logback 提供的执行器一起使用,因为使用内部执行器可以避免中断问题。但这并没有成功,因为工作队列是共享的,在这种情况下,队列必须是特定于附加程序的才能正确停止它。所以我需要对每个附加程序使用自己的执行程序。

首先,我需要一个 AWS 客户端的执行程序。执行器的问题是所提供的线程工厂必须创建守护线程,否则如果使用 logback 的 JVM shutdown hook,它将无限期地阻塞。

public static ExecutorService newExecutor(Appender<?> appender, int threadPoolSize) {
    final String name = appender.getName();
    return Executors.newFixedThreadPool(threadPoolSize, new ThreadFactory() {

        private final AtomicInteger idx = new AtomicInteger(1);

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName(name + "-" + idx.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        }
    });
}
Run Code Online (Sandbox Code Playgroud)

下一个问题是如何通过中断正确停止追加器?这需要通过重试来处理中断异常,因为否则执行器将跳过等待队列刷新。

public static void shutdown(Appender<?> appender, ExecutorService executor, long waitMillis) {
    executor.shutdown();
    boolean completed = awaitTermination(appender, executor, waitMillis);
    if (!completed) {
        appender.addWarn(format("Executor for %s did not shut down in %d milliseconds, " +
                                "logging events might have been discarded",
                                appender.getName(), waitMillis));
    }
}

private static boolean awaitTermination(Appender<?> appender, ExecutorService executor, long waitMillis) {
    long started = System.currentTimeMillis();
    try {
        return executor.awaitTermination(waitMillis, TimeUnit.MILLISECONDS);
    } catch (InterruptedException ie1) {
        // the worker loop is stopped by interrupt, but the remaining queue should still be handled
        long waited = System.currentTimeMillis() - started;
        if (waited < waitMillis) {
            try {
                return executor.awaitTermination(waitMillis - waited, TimeUnit.MILLISECONDS);
            } catch (InterruptedException ie2) {
                appender.addError(format("Shut down of executor for %s was interrupted",
                                         appender.getName()));
            }
        }
        Thread.currentThread().interrupt();
    }
    return false;
}
Run Code Online (Sandbox Code Playgroud)

正常的 logback 附加程序预计以同步方式工作,因此即使没有适当的关闭挂钩,也不应该丢失日志记录事件。这是当前异步 AWS 开发工具包 API 调用的问题。我决定使用倒计时锁存器来提供阻塞附加器行为。

public class LoggingEventHandler<REQUEST extends AmazonWebServiceRequest, RESULT> implements AsyncHandler<REQUEST, RESULT> {

    private final ContextAware contextAware;
    private final CountDownLatch latch;
    private final String errorMessage;

    public LoggingEventHandler(ContextAware contextAware, CountDownLatch latch, String errorMessage) {
        this.contextAware = contextAware;
        this.latch = latch;
        this.errorMessage = errorMessage;
    }

    @Override
    public void onError(Exception exception) {
        contextAware.addWarn(errorMessage, exception);
        latch.countDown();
    }

    @Override
    public void onSuccess(REQUEST request, RESULT result) {
        latch.countDown();
    }
}
Run Code Online (Sandbox Code Playgroud)

并用闩锁来处理等待。

public static void awaitLatch(Appender<?> appender, CountDownLatch latch, long waitMillis) {
    if (latch.getCount() > 0) {
        try {
            boolean completed = latch.await(waitMillis, TimeUnit.MILLISECONDS);
            if (!completed) {
                appender.addWarn(format("Appender '%s' did not complete sending event in %d milliseconds, " +
                                        "the event might have been lost",
                                        appender.getName(), waitMillis));
            }
        } catch (InterruptedException ex) {
            appender.addWarn(format("Appender '%s' was interrupted, " +
                                    "a logging event might have been lost or shutdown was initiated",
                                    appender.getName()));
            Thread.currentThread().interrupt();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

然后全部捆绑在一起。以下示例是实际实现的简化版本,仅显示此问题的相关部分。

public class SqsAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {

    private AmazonSQSAsyncClient sqs;

    @Override
    public void start() {
        sqs = new AmazonSQSAsyncClient(
                getCredentials(),
                getClientConfiguration(),
                Executors.newFixedThreadPool(getThreadPoolSize())
        );
        super.start();
    }

    @Override
    public void stop() {
        super.stop();
        if (sqs != null) {
            AppenderExecutors.shutdown(this, sqs.getExecutorService(), getMaxFlushTime());
            sqs.shutdown();
            sqs = null;
        }
    }

    @Override
    protected void append(final ILoggingEvent eventObject) {
        SendMessageRequest request = ...
        CountDownLatch latch = new CountDownLatch(1);
        sqs.sendMessageAsync(request, new LoggingEventHandler<SendMessageRequest, SendMessageResult>(this, latch, "Error"));
        AppenderExecutors.awaitLatch(this, latch, getMaxFlushTime());
    }
}
Run Code Online (Sandbox Code Playgroud)

所有这些都是为了正确处理以下情况:

  • 使用异步附加程序包装器时,在 logback 上下文停止或关闭挂钩上刷新剩余事件队列
  • 使用logback的延迟关闭钩子时不要无限期阻塞
  • 不使用异步附加程序时提供阻塞行为
  • 避免异步追加器停止的中断,该中断导致所有 AWS SDK 流实现中断

以上是在开源项目Logback Extensions中使用的,我是该项目的维护者。