tob*_*bik 5 java multithreading
我有一个数据生成器,它在一个单独的线程中运行,并将生成的数据推送到PipedOutputStream其中PipedInputStream.此输入流的引用通过公共API公开,以便任何客户端都可以使用它.的PipedInputStream包含一个有限的缓冲器,如果充分,块中的数据产生器.基本上,作为客户端从输入流中读取数据时,由所产生的新的数据的数据生产者.
问题是数据生成器可能会失败并抛出异常.但是当消费者在一个单独的线程中运行时,没有很好的方法来获取客户端的异常.
我所做的是捕获该异常并关闭输入流.这将在IOException客户端产生消息"管道关闭"但我真的想给客户端背后的真正原因.
这是我的API的粗略代码:
public InputStream getData() {
final PipedInputStream inputStream = new PipedInputStream(config.getPipeBufferSize());
final PipedOutputStream outputStream = new PipedOutputStream(inputStream);
Thread thread = new Thread(() -> {
try {
// Start producing the data and push it into output stream.
// The production my fail and throw an Exception with the reason
} catch (Exception e) {
try {
// What to do here?
outputStream.close();
inputStream.close();
} catch (IOException e1) {
}
}
});
thread.start();
return inputStream;
}
Run Code Online (Sandbox Code Playgroud)
我有两个想法如何解决这个问题:
IOException,客户端可以向API询问原因.close()方法.然后,IOException流引发的可能包含该原因作为消息.有更好的想法吗?
巧合的是,我刚刚编写了类似的代码来允许 GZip 压缩流。您不需要扩展 PipedInputStream,只需FilterInputStream即可并返回包装版本,例如
final PipedInputStream in = new PipedInputStream();
final InputStreamWithFinalExceptionCheck inWithException = new InputStreamWithFinalExceptionCheck(in);
final PipedOutputStream out = new PipedOutputStream(in);
Thread thread = new Thread(() -> {
try {
// Start producing the data and push it into output stream.
// The production my fail and throw an Exception with the reason
} catch (final IOException e) {
inWithException.fail(e);
} finally {
inWithException.countDown();
}
});
thread.start();
return inWithException;
Run Code Online (Sandbox Code Playgroud)
然后 InputStreamWithFinalExceptionCheck 只是
private static final class InputStreamWithFinalExceptionCheck extends FilterInputStream {
private final AtomicReference<IOException> exception = new AtomicReference<>(null);
private final CountDownLatch complete = new CountDownLatch(1);
public InputStreamWithFinalExceptionCheck(final InputStream stream) {
super(stream);
}
@Override
public void close() throws IOException {
try {
complete.await();
final IOException e = exception.get();
if (e != null) {
throw e;
}
} catch (final InterruptedException e) {
throw new IOException("Interrupted while waiting for synchronised closure");
} finally {
stream.close();
}
}
public void fail(final IOException e) {
exception.set(Preconditions.checkNotNull(e));
}
public void countDown() {complete.countDown();}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
853 次 |
| 最近记录: |