为消费者-生产者使用 ExecutorService 和 PipedReader/PipedWriter(或 PipedInputStream/PipedOutputStream)的 Java 示例

Era*_*dan 0 java producer-consumer executorservice java.util.concurrent

我正在寻找一个简单的生产者 - Java 中的消费者实现,不想重新发明轮子

我找不到同时使用新并发包和 Piped 类的示例

是否有为此使用PipedInputStream和新的 Java 并发包的示例?

有没有更好的方法而不使用 Piped 类来完成这样的任务?

tru*_*ity 5

对于您的任务,仅使用单个线程并BufferedOutputStream在您从数据库中读取时使用 a 写入文件可能就足够了。

如果您想更好地控制缓冲区大小和写入文件的块大小,您可以执行以下操作:

class Producer implements Runnable {

    private final OutputStream out;
    private final SomeDBClass db;

    public Producer( OutputStream out, SomeDBClass db ){
        this.out = out;
        this.db = db;
    }

    public void run(){
        // If you're writing to a text file you might want to wrap
        // out in a Writer instead of using `write` directly.
        while( db has more data ){
            out.write( the data );
        }
        out.flush();
        out.close();
    }
}

class Consumer implements Runnable {

    private final InputStream in;
    private final OutputStream out;
    public static final int CHUNKSIZE=512;

    public Consumer( InputStream in, OutputStream out ){
        this.out = out;
        this.in = in;
    }

    public void run(){
        byte[] chunk = new byte[CHUNKSIZE];

        for( int bytesRead; -1 != (bytesRead = in.read(chunk,0,CHUNKSIZE) );;){
            out.write(chunk, 0, bytesRead);
        }
        out.close();
    }
}
Run Code Online (Sandbox Code Playgroud)

在调用代码中:

FileOutputStream toFile = // Open the stream to a file
SomeDBClass db = // Set up the db connection
PipedInputStream pi = new PipedInputStream(); // Optionally specify a size
PipedOutputStream po = new PipedOutputStream( pi );

ExecutorService exec = Executors.newFixedThreadPool(2);
exec.submit( new Producer( po, db ) );
exec.submit( new Consumer( pi, toFile ) );
exec.shutdown();
Run Code Online (Sandbox Code Playgroud)
  • 还可以捕获可能抛出的任何异常。

请注意,如果这就是您所做的一切,那么使用ExecutorService. 当您有很多任务(太多而无法同时在线程中启动所有任务)时,Executors 很有用。这里只有两个线程必须同时运行,因此Thread#start直接调用会减少开销。