与独立消费者同时处理单个InputStream

jde*_*lop 7 java concurrency inputstream stream

我需要生成N个消费者线程,它们同时处理相同的InputStream,例如 - 以某种方式转换它,计算校验和或数字签名等.这些消费者不依赖于彼此,并且所有消费者都使用第三方库,它接受InputStream作为数据来源.

所以我能做的是 - 创建一些InputStream的实现,这将是

  • 从"父"流中读取数据块
  • 解锁消费者
  • 等到每个消费者阅读整个块
  • 读下一个块

虽然看起来很简单,但是当某些消费者死亡时,它可能引发各种问题,例如活锁,实现所有的InputStream方法,使用障碍/锁存器控制消费者自己的叉/连接等.

一位好友告诉我,这是半个小时的实施,它是我的晚上.

我更喜欢使用足够成熟的东西(谷歌搜索没有结果因此,我的google-fu还不够好?)或者不打扰并将整个"源"流复制到临时文件中并将其用作数据来源.后一种解决方案似乎更可靠,但最终可能会创建千兆字节文件(例如处理流式音频时).

Mar*_*nik 3

在我看来,您至少应该有某种缓冲,以便不同的消费者可以以不同的速度在流中移动,而不会让当前最慢的消费者不断地陷入困境。这基本上保证了最坏情况下的性能并且几乎没有并发的好处。

例如,您可以用迄今为止使用过的消费者标记每个块,然后删除完全用完的消费者。也许这可以通过每个消费者持有对其尚未使用的每个块的引用来实现,这将允许 GC 自动处理已使用的块。生产者可能会保留一个WeakReference块的列表,以便它可以掌握尚未使用的块的数量,并以此为基础进行限制。

我还在考虑InputStream每个线程有一个单独的实例,它在内部与 Producer 进行通信InputStream。这样,您就可以轻松解决活锁危险:try ... finally { is.close(); }--垂死的消费者关闭自己的输入流。这将传达给生产者。

我对使用ArrayBlockingQueue每个消费者有一些想法。确保所有消费者都得到适当的喂养而不使生产者阻塞或忙等待会有些困难。