Java中并发管道的策略

bri*_*gge 25 java concurrency performance multithreading

考虑以下shell脚本:

gzip -dc in.gz | sed -e 's/@/_at_/g' | gzip -c > out.gz 
Run Code Online (Sandbox Code Playgroud)

This has three processes working in parallel to decompress a stream, modify it, and re-compress it. Running time I can see my user time is about twice that of my real time, which indicates the program is effectively working in parallel.

我试图通过将每个任务放在它自己的线程中来用Java创建相同的程序.不幸的是,多线程Java程序仅比上述示例的单线程版本30%.我尝试过使用ExchangerConcurrentLinkedQueue.ConcurrentLinkedQueue链接队列会引起很多争用,尽管所有三个线程通常都处于忙碌状态.交换器的争用较少,但更复杂,并且似乎不会让最慢的工作者在100%的时间内运行.

我试图找出一个纯Java解决方案来解决这个问题,而不是看一个字节代码编织框架或基于JNI的MPI.

大多数并发性研究和API都关注分而治之的算法,使每个节点都能正常工作并且不依赖于先前的计算.另一种并发方法是管道方法,其中每个工作人员完成一些工作并将数据传递给下一个工作人员.

我并不是想找到最有效的方法来获取gzip文件,而是我正在研究如何有效地分解管道中的任务,以便将运行时间减少到最慢的任务.

10米行文件的当前时间如下:

Testing via shell

real    0m31.848s
user    0m58.946s
sys     0m1.694s

Testing SerialTest

real    0m59.997s
user    0m59.263s
sys     0m1.121s

Testing ParallelExchangerTest

real    0m41.573s
user    1m3.436s
sys     0m1.830s

Testing ConcurrentQueueTest

real    0m44.626s
user    1m24.231s
sys     0m10.856s
Run Code Online (Sandbox Code Playgroud)

我通过实时测量具有10米行测试数据的四核系统,为Java提供了10%的改进.目前的消息来源可在Bitbucket上获得.

cle*_*tus 14

首先,这个过程只会和最慢的一样快.如果时间分解是:

  • gunzip:1秒
  • sed:5秒
  • gzip:1秒

通过多线程,你将在最好的 5秒而不是7秒完成.

其次,不是使用您正在使用的队列,而是尝试复制您正在复制和使用的功能PipedInputStream以及PipedOutputStream将进程链接在一起.

编辑:使用Java并发工具处理相关任务有几种方法.将它划分为线程.首先创建一个公共基类:

public interface Worker {
  public run(InputStream in, OutputStream out);
}
Run Code Online (Sandbox Code Playgroud)

此接口的作用是表示处理输入和生成输出的任意作业.将这些链接在一起,你就有了一条管道.你也可以抽象出样板.为此,我们需要一个类:

public class UnitOfWork implements Runnable {
  private final InputStream in;
  private final OutputStream out;
  private final Worker worker;

  public UnitOfWork(InputStream in, OutputStream out, Worker worker) {
    if (in == null) {
      throw new NullPointerException("in is null");
    }
    if (out == null) {
      throw new NullPointerException("out is null");
    }
    if (worker == null) {
      throw new NullPointerException("worker is null");
    }
    this.in = in;
    this.out = out;
    this.worker = worker;
  }

  public final void run() {
    worker.run(in, out);
  }
}
Run Code Online (Sandbox Code Playgroud)

所以,例如,UnzipPART:

public class Unzip implements Worker {
  protected void run(InputStream in, OutputStream out) {
    ...
  }
}
Run Code Online (Sandbox Code Playgroud)

等等SedZip.然后将它绑定在一起的是:

public static void pipe(InputStream in, OutputStream out, Worker... workers) {
  if (workers.length == 0) {
    throw new IllegalArgumentException("no workers");
  }
  OutputStream last = null;
  List<UnitOfWork> work = new ArrayList<UnitOfWork>(workers.length);
  PipedOutputStream last = null;
  for (int i=0; i<workers.length-2; i++) {
    PipedOutputStream out = new PipedOutputStream();
    work.add(new UnitOfWork(
      last == null ? in, new PipedInputStream(last), out, workers[i]);
    last = out;
  }
  work.add(new UnitOfWork(new PipedInputStream(last),
    out, workers[workers.length-1);
  ExecutorService exec = Executors.newFixedThreadPool(work.size());
  for (UnitOfWork w : work) {
    exec.submit(w);
  }
  exec.shutdown();
  try {
    exec.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
  } catch (InterruptedExxception e) {
    // do whatever
  }
}
Run Code Online (Sandbox Code Playgroud)

我不确定你能做得多好,每个工作都有最少的代码.然后你的代码变成:

public static processFile(String inputName, String outputName) {
  pipe(new FileInputStream(inputFile), new FileOutputStream(outputFile),
    new Zip(), new Sed(), new Unzip());
}
Run Code Online (Sandbox Code Playgroud)

  • 在我对多线程加密进行的一些测试中,我做了一些非常相似的事情,但是当我做了自己的缓冲实现时,性能的真正提高了.管道流和缓冲流已经被缓冲,但缓冲区大小会引入大量开销,除非缓冲区与变换算法的任何对齐.所以,如果zip在1k字节上工作,使用该维度的自定义缓冲区来在压缩之前提供数据,如果sed使用一次完全获取128行的行,这种东西大大提高了速度,减少了争用和开销(和复杂性,otoh ......). (3认同)

chi*_*aya 6

我个人验证了所花费的时间,看起来阅读时间不到10%,阅读加上处理时间不到30%.所以我采用了ParallelExchangerTest(代码中性能最佳)并将其修改为只有2个线程,第一个线程执行读取和替换,第二个线程执行写入.

以下是要比较的数字(在我的机器上运行ubuntu和1gb ram的Intel双核(不是core2))

>通过shell测试

真实0m41.601s

用户0m58.604s

sys 0m1.032s

>测试ParallelExchangerTest

真正的1m55.424s

用户2m14.160s

sys 0m4.768s

> ParallelExchangerTestMod(2个线程)

真正的1m35.524s

用户1m55.319s

sys 0m3.580s

我知道字符串处理需要更长的时间,所以我用matcher.replaceAll替换line.repalce,我得到了这个数字

> ParallelExchangerTestMod_Regex(2个线程)

真实1m12.781s

用户1m33.382s

sys 0m2.916s

现在我向前迈出了一步,而不是一次读取一行,我读了各种大小的char []缓冲区并定时,(使用正则表达式搜索/替换),我得到了这些数字

>测试ParallelExchangerTestMod_Regex_Buff(一次处理100个字节)

真实1m13.804s

用户1m32.494s

sys 0m2.676s

>测试ParallelExchangerTestMod_Regex_Buff(时间处理500字节)

真正的1m6.286s

用户1m29.334s

sys 0m2.324s

>测试ParallelExchangerTestMod_Regex_Buff(时间处理800字节)

真实1m12.309s

用户1m33.910s

sys 0m2.476s

看起来500字节对于数据大小是最佳的.

我在这里分叉并获得了我的更改副本

https://bitbucket.org/chinmaya/java-concurrent_response/