PipedInputStream和PipedOutputStream中的异常传播

tob*_*bik 5 java multithreading

我有一个数据生成器,它在一个单独的线程中运行,并将生成的数据推送到PipedOutputStream其中PipedInputStream.此输入流的引用通过公共API公开,以便任何客户端都可以使用它.的PipedInputStream包含一个有限的缓冲器,如果充分,块中的数据产生器.基本上,作为客户端从输入流中读取数据时,由所产生的新的数据的数据生产者.

问题是数据生成器可能会失败并抛出异常.但是当消费者在一个单独的线程中运行时,没有很好的方法来获取客户端的异常.

我所做的是捕获该异常并关闭输入流.这将在IOException客户端产生消息"管道关闭"但我真的想给客户端背后的真正原因.

这是我的API的粗略代码:

public InputStream getData() {
    final PipedInputStream inputStream = new PipedInputStream(config.getPipeBufferSize());
    final PipedOutputStream outputStream = new PipedOutputStream(inputStream);

    Thread thread = new Thread(() -> {
        try {
          // Start producing the data and push it into output stream.
          // The production my fail and throw an Exception with the reason
        } catch (Exception e) {
            try {
                // What to do here?
                outputStream.close();
                inputStream.close();
            } catch (IOException e1) {
            }
        }
    });
    thread.start();

    return inputStream;
}
Run Code Online (Sandbox Code Playgroud)

我有两个想法如何解决这个问题:

  1. 将异常存储在父对象中,并通过API将其公开给客户端.I. e.如果读取失败IOException,客户端可以向API询问原因.
  2. 扩展/重新实现管道流,以便我可以将原因传递给close()方法.然后,IOException流引发的可能包含该原因作为消息.

有更好的想法吗?

Jon*_*man 4

巧合的是,我刚刚编写了类似的代码来允许 GZip 压缩流。您不需要扩展 PipedInputStream,只需FilterInputStream即可并返回包装版本,例如

final PipedInputStream in = new PipedInputStream();
final InputStreamWithFinalExceptionCheck inWithException = new InputStreamWithFinalExceptionCheck(in);
final PipedOutputStream out = new PipedOutputStream(in);
Thread thread = new Thread(() -> {
    try {
      // Start producing the data and push it into output stream.
      // The production my fail and throw an Exception with the reason
    } catch (final IOException e) {
        inWithException.fail(e);
    } finally {
        inWithException.countDown();
    }
});
thread.start();
return inWithException;
Run Code Online (Sandbox Code Playgroud)

然后 InputStreamWithFinalExceptionCheck 只是

private static final class InputStreamWithFinalExceptionCheck extends FilterInputStream {
    private final AtomicReference<IOException> exception = new AtomicReference<>(null);
    private final CountDownLatch complete = new CountDownLatch(1);

    public InputStreamWithFinalExceptionCheck(final InputStream stream) {
        super(stream);
    }

    @Override
    public void close() throws IOException {
        try {
            complete.await();
            final IOException e = exception.get();
            if (e != null) {
                throw e;
            }
        } catch (final InterruptedException e) {
            throw new IOException("Interrupted while waiting for synchronised closure");
        } finally {
            stream.close();
        }
    }

    public void fail(final IOException e) {
        exception.set(Preconditions.checkNotNull(e));
    }

    public void countDown() {complete.countDown();}
}
Run Code Online (Sandbox Code Playgroud)

  • 如果扩展 FilterInputStream,则可以使事情变得简单得多。它为您完成了大部分工作。 (2认同)