java线程同步问题,如何实现observable线程

Rn2*_*2dy 2 java multithreading

问题是:C类控制B类是否允许B再次将A添加到queueOfA,但是如何确保当A的A通知C并在B的queueOfA变空之前立即更新,因为B类运行得如此之快以至于可能会删除所有的A,因此在C进行更新之前变为空队列,这可能会导致C的运行方法结束.

请帮帮我!

class A extends Observable implements Runnable{
    //...
    void run(){
     //...
     if ( goBackToTheQueueAgain == true ){
         setChanged();
         notifyObservers();//notify C that to let B add itself back
     }
    //...
}

class B extends implements Runnable{
    Queue<A> queueOfA;// initialy, queueOfA contains several A's
    //...
    void run(){
       //...
       A retrieved = queueOFA.remove();
       Thread aThread = new Thread(retrieved);
       aThread.start();
       //...
    }
 }

 class C implements Observer, Runnable {
    B b; //initially class C contains an instance of B 
    //...
    void update(Observable o, Object arg){
       if(o instanceof A){
         A a = (A) o;
         b.queueOfA.add(a);
       }
    }
    //...
    void run(){
       //...
       Thread bThread = new Thread(b);
       bThread.start();
       while ( b.queueOfA.isEmpty() == false ){
          // if queueOfA is not empty loop for ever
       }
      //..
    }
 }
Run Code Online (Sandbox Code Playgroud)

mdm*_*dma 5

问题不在于您必须尽快通知更新 - 您无法保证.例如,假设您在队列中只有一个元素可以开始.这将从队列中删除,然后队列为空,但该元素仍在处理中.A线程可以随意使用 - 即使队列为空,C也不应该终止.

修复是C应该等到所有A完全处理完毕.也就是说,A在他们的run方法结束时没有将A放回队列.这可以使用倒计时锁存器完成.最初将锁存器设置为队列的大小(A的数量),并在每次完全处理A时递减该锁存器.只有当此锁存器变为零时,C才会退出.

C看起来像这样

CountdownLatch latch;

void run(){
   //...
   this.latch = new CountDownLatch(queueOfA.size());
   Thread bThread = new Thread(b);
   bThread.start();
   latch.await();
  //.. catch InterruptedException etc..
}

void notifyDone(A a) {
    this.latch.countDown();
}
Run Code Online (Sandbox Code Playgroud)

A的运行方法是

void run(){
     //...
     C c = ...; // passed in from somewhere
     if ( goBackToTheQueueAgain == true ){
         setChanged();
         notifyObservers();//notify C that to let B add itself back
     }
     else
         c.notifyDone(this);
}
Run Code Online (Sandbox Code Playgroud)

A需要对C的引用,以便它可以直接通知它已完成.

就个人而言,我不会在这里使用Observer/Observable,因为您的通知具有特定含义 - 该项目可以重新排队.观察者模式最适合通知状态的变化,其中该状态可从被观察的实例获得.这不是这种情况,并且这种抽象解耦没有真正的收益.如果你想保持它抽象而不是将A耦合到C,你可以引入一个额外的接口,例如

interface QueueHandler<T>
{
    void requeue(T t);
    void completed(T t);
}
Run Code Online (Sandbox Code Playgroud)

并在C上实现它,并将其传递给每个A而不是当前的Observer接口.但同样地,A和C类是紧密耦合的,所以你也可以将C传递给A并让它直接通知.