使用Java在线程之间管道数据

Jus*_*Y17 7 java multithreading semaphore piping

我正在编写一个模仿电影院的多线程应用程序.涉及的每个人都是自己的线程,并发必须完全由信号量完成.我唯一的问题是如何基本上链接线程,以便他们可以通信(例如通过管道).

例如:

客户[1]是一个线程,获取一个信号量,让它走向票房.现在,客户[1]必须告诉票房代理他们想要看电影"X".然后BoxOfficeAgent [1]也是一个线程,必须检查以确保电影未满,并且要么卖票,要么告诉客户[1]选择另一部电影.

如何在保持与信号量的并发性的同时来回传递数据?

另外,我可以在java.util.concurrent中使用的唯一类是Semaphore类.

Bru*_*eis 8

在线程之间来回传递数据的一种简单方法是使用BlockingQueue<E>位于包中的接口的实现java.util.concurrent.

此接口具有使用不同行为向集合添加元素的方法:

  • add(E):如果可能,添加,否则抛出异常
  • boolean offer(E):如果已添加元素,则返回true,否则返回false
  • boolean offer(E, long, TimeUnit):尝试添加元素,等待指定的时间
  • put(E):阻塞调用线程,直到添加元素

它还定义了具有类似行为的元素检索方法:

  • take():阻止直到有一个元素可用
  • poll(long, TimeUnit):检索元素或返回null

我最常用的实现方案是ArrayBlockingQueue,LinkedBlockingQueueSynchronousQueue.

第一个,ArrayBlockingQueue具有固定大小,由传递给其构造函数的参数定义.

第二,LinkedBlockingQueue具有极小的规模.它将始终接受任何元素,即,offer将立即返回true,add永远不会抛出异常.

第三个,对我来说最有趣的一个SynchronousQueue,就是管道.您可以将其视为大小为0的队列.它永远不会保留一个元素:如果某个其他线程试图从中检索元素,则此队列只接受元素.相反,如果有另一个线程试图推送它,则检索操作将仅返回一个元素.

为了完成与信号量完全同步作业要求,你可以从我给你的关于SynchronousQueue的描述中获得灵感,并写一些非常相似的东西:

class Pipe<E> {
  private E e;

  private final Semaphore read = new Semaphore(0);
  private final Semaphore write = new Semaphore(1);

  public final void put(final E e) {
    write.acquire();
    this.e = e;
    read.release();
  }

  public final E take() {
    read.acquire();
    E e = this.e;
    write.release();
    return e;
  }
}
Run Code Online (Sandbox Code Playgroud)

请注意,此类提供了与我所描述的有关SynchronousQueue的类似行为.

一旦put(E)调用了这些方法,它就会获取写入信号量,该信号量将保留为空,以便对同一方法的另一次调用将在其第一行阻塞.然后,此方法存储对传递的对象的引用,并释放读取的信号量.此版本将使调用该take()方法的任何线程都可以继续.

take()然后,该方法的第一步自然是获取读取信号量,以便禁止任何其他线程同时检索该元素.在检索到元素并将其保存在局部变量之后(练习:如果该行,E e = this.e被删除会发生什么?),该方法释放写信号量,以便put(E)可以通过以下方式再次调用该方法:任何线程,并返回已保存在局部变量中的内容.

作为一种重要的话,观察到传递的对象的引用保存在一个私有字段,并且这些方法take()put(E)都是最终的.这是至关重要的,而且常常被遗漏.如果这些方法不是最终的(或更糟糕的是,字段不是私有的),继承类将能够改变合同的行为take()put(E)破坏合同.

最后,您可以take()通过使用try {} finally {}如下方式避免在方法中声明局部变量:

class Pipe<E> {
  // ...
  public final E take() {
    try {
      read.acquire();
      return e;
    } finally {
      write.release();
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

在这里,这个例子的重点是如果只是为了显示它的使用try/finally在没有经验的开发人员中被忽视.显然,在这种情况下,没有真正的收获.

哦,该死的,我大部分时间都为你完成了作业.在报复中 - 并且为了测试您对信号量的了解 - 为什么不实现BlockingQueue合约定义的其他一些方法呢?例如,您可以实现一个offer(E)方法和一个take(E, long, TimeUnit)!

祝好运.