Java中golang通道的等价物

Nip*_*dar 16 java concurrency multithreading go blockingqueue

我有一个要求,我需要从一组阻塞队列中读取.阻塞队列由我正在使用的库创建.我的代码必须从队列中读取.我不想为每个阻塞队列创建一个读者线程.相反,我想使用单个线程(或者可能最多使用2/3线程)轮询它们的数据可用性.由于某些阻塞队列可能长时间没有数据,而其中一些阻塞队列可能会获得数据突发.轮询具有较小超时的队列将起作用,但这根本不高效,因为它仍然需要在所有队列上保持循环,即使其中一些队列长时间没有数据.基本上,我正在寻找一个选择/ epoll(用于套接字)类型的阻塞队列机制.任何线索都非常感谢.

尽管如此,在Go中执行此操作非常简单.下面的代码模拟了与channel和goroutines相同的内容:

package main

import "fmt"
import "time"
import "math/rand"

func sendMessage(sc chan string) {
    var i int

    for {
        i =  rand.Intn(10)
        for ; i >= 0 ; i-- {
            sc <- fmt.Sprintf("Order number %d",rand.Intn(100))
        }
        i = 1000 + rand.Intn(32000);
        time.Sleep(time.Duration(i) * time.Millisecond)
    }
}

func sendNum(c chan int) {
    var i int 
    for  {
        i = rand.Intn(16);
        for ; i >=  0; i-- {
            time.Sleep(20 * time.Millisecond)
            c <- rand.Intn(65534)
        }
        i = 1000 + rand.Intn(24000);
        time.Sleep(time.Duration(i) * time.Millisecond)
    }
}

func main() {
    msgchan := make(chan string, 32)
    numchan := make(chan int, 32)
    i := 0
    for ; i < 8 ; i++ {
        go sendNum(numchan)
        go sendMessage(msgchan)
    }
    for {
        select {
        case msg := <- msgchan:
            fmt.Printf("Worked on  %s\n", msg)
        case x := <- numchan:
            fmt.Printf("I got %d \n", x)
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

Ric*_*777 10

我建议你研究一下使用JCSP库.相当于Go的select被称为Alternative.您只需要一个消耗线程,如果它打开它们,则不需要轮询传入的通道Alternative.因此,这将是多路复用源数据的有效方式.

如果您能够使用JCSP频道替换BlockingQueues,它将会有很大帮助.信道的行为基本相同,但在信道端共享的扇出或扇入方面提供了更大程度的灵活性,特别是信道的使用Alternative.

对于使用示例,这里是公平的多路复用器.此示例演示了一个将来自其输入通道阵列的流量与其单个输出通道进行相当多路复用的过程.无论竞争对手的热情如何,都不会缺乏输入渠道.

import org.jcsp.lang.*;

public class FairPlex implements CSProcess {

   private final AltingChannelInput[] in;
   private final ChannelOutput out;

   public FairPlex (final AltingChannelInput[] in, final ChannelOutput out) {
     this.in = in;
     this.out = out;
   }

   public void run () {

     final Alternative alt = new Alternative (in);

     while (true) {
       final int index = alt.fairSelect ();
       out.write (in[index].read ());
     }
   }
 }
Run Code Online (Sandbox Code Playgroud)

请注意,如果priSelect在上面使用过,如果较低索引的频道不断要求服务,则较高索引的频道将会缺乏.或者代替fairSelect,select可以使用,但不能进行饥饿分析.select只有在饥饿不成问题时才应使用该机制.

摆脱僵局

与Go一样,使用通道的Java程序必须设计为不会死锁.在Java中实现低级并发原语很难做到正确,你需要一些可靠的东西.幸运的是,Alternative已经通过正式分析以及JCSP渠道进行了验证.这使它成为可靠的可靠选择.

为了澄清一点点困惑,目前的JCSP版本在Maven回购中是1.1-rc5,而不是网站所说的.


Ale*_*dov 1

唯一的方法是用功能更强大的类的对象替换标准队列,当数据插入空队列时通知消费者。这个类仍然可以实现BlockingQueue接口,因此另一方(生产者)看不到任何区别。诀窍是put操作还应该提出一个标志并通知消费者。消费者在轮询所有线程后,清除标志并调用Object.wait().