标签: jeromq

jeromq生产准备好了吗?

我过去使用ZeroMQ通过jzmq库使用JVM应用程序.我计划在一个新项目上使用zeromq,其中一些服务是在JVM上实现的.我刚刚发现了jeromq,一个zeromq的纯java实现,我想主要使用它,因为它跟踪zeromq 3.x并且它消除了处理的麻烦jzmq.但是,我无法从回购页面判断它是否已准备就绪.有没有人有jeromq生产经验?

zeromq jeromq

17
推荐指数
1
解决办法
2950
查看次数

java.lang.ArrayIndexOutOfBoundsException:256,jeromq 0.3.6版本

我在多线程环境中使用Jeromq,如下所示.下面是我的代码,其中构造函数首先SocketManager连接到所有可用的套接字,然后我将它们放在方法中的liveSocketsByDatacentermap中connectToZMQSockets.之后,我在同一个构造函数中启动一个后台线程,该构造函数每30秒运行一次,它调用updateLiveSockets方法来ping所有那些已经存在于liveSocketsByDatacentermap中的套接字并更新liveSocketsByDatacenter映射,看看这些套接字是否存活.

并且getNextSocket()多个读取器线程同时调用方法以获取下一个可用的套接字,然后我们使用该套接字在其上发送数据.所以我的问题是我们在多线程环境中正确使用Jeromq吗?因为在我们尝试将数据发送到该实时套接字时,我们刚刚在生产环境中看到了这个堆栈跟踪的异常,所以我不确定它是否是一个错误或其他什么?

java.lang.ArrayIndexOutOfBoundsException: 256
at zmq.YQueue.push(YQueue.java:97)
at zmq.YPipe.write(YPipe.java:47)
at zmq.Pipe.write(Pipe.java:232)
at zmq.LB.send(LB.java:83)
at zmq.Push.xsend(Push.java:48)
at zmq.SocketBase.send(SocketBase.java:590)
at org.zeromq.ZMQ$Socket.send(ZMQ.java:1271)
at org.zeromq.ZFrame.send(ZFrame.java:131)
at org.zeromq.ZFrame.sendAndKeep(ZFrame.java:146)
at org.zeromq.ZMsg.send(ZMsg.java:191)
at org.zeromq.ZMsg.send(ZMsg.java:163)
Run Code Online (Sandbox Code Playgroud)

以下是我的代码:

public class SocketManager {
    private static final Random random = new Random();
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<>();
    private final ZContext ctx = new ZContext();

    private static class Holder {
        private …
Run Code Online (Sandbox Code Playgroud)

java multithreading thread-safety zeromq jeromq

10
推荐指数
1
解决办法
560
查看次数

如何在JeroMQ中实现活动和备用队列作业处理系统?

使用ZeroMQ .Context.Socket实例,我能够推送/拉取消息
,例如在我的代码下方为队列设置:

 ZMQ.Context context = ZMQ.context(1);

 //  Socket to send messages on
 ZMQ.Socket sender = context.socket(ZMQ.PUSH);
 sender.bind("tcp://*:5557");

 // Send messages
 sender.send("0", 0);

 ZMQ.Socket receiver = context.socket(ZMQ.PULL);
 receiver.connect("tcp://localhost:5557");

 // receive messages
 String string = new String(receiver.recv(0)).trim();
Run Code Online (Sandbox Code Playgroud)

我的问题是:

Q1:如何在队列中实现主用/备用模式?

我的意思是将为一个主机和端口创建2个队列,如果一个队列(活动)发生故障,另一个(即备用)队列将立即启动以侦听/拉取消息.

任何实现它的示例或指导都会更有帮助.

Q2:是否有内置类来执行此类任务?

java sockets zeromq jeromq

9
推荐指数
1
解决办法
281
查看次数

jeromq:关闭上下文失败

编辑:解决了自己,见下文(虽然我不确定我是否偶然发现了一个错误)

使用下面的简单hello-world请求 - 回复示例,在程序结束时关闭上下文失败:它只是挂起ctx.close()或者抛出以下异常:

Exception in thread "reaper-1" java.lang.NullPointerException
    at zmq.Ctx.destroy_socket(Ctx.java:327)
    at zmq.ZObject.destroy_socket(ZObject.java:144)
    at zmq.SocketBase.check_destroy(SocketBase.java:938)
    at zmq.SocketBase.start_reaping(SocketBase.java:753)
    at zmq.Reaper.process_reap(Reaper.java:133)
    at zmq.ZObject.process_command(ZObject.java:114)
    at zmq.Reaper.in_event(Reaper.java:90)
    at zmq.Poller.run(Poller.java:233)
    at java.lang.Thread.run(Thread.java:724)
Run Code Online (Sandbox Code Playgroud)

无论哪种方式,程序都不会停止.

这是代码(请注意,套接字在创建它们的线程中都是关闭的):

import org.zeromq.ZMQ;
import org.zeromq.ZContext;

public class App {
    public static void main(String[] args) throws InterruptedException {
        final ZContext ctx = new ZContext();

        final Thread t1 = new Thread() {
            @Override
            public void run() {
                ZMQ.Socket socket = ctx.createSocket(ZMQ.REQ);
                socket.connect("inproc://test");
                System.err.format("[Thread %s] socket connected%n", Thread.currentThread().getId());
                socket.send("hello");
                System.err.format("[Thread %s] …
Run Code Online (Sandbox Code Playgroud)

java zeromq jeromq

7
推荐指数
1
解决办法
2220
查看次数

JeroMQ正确关闭

我想知道如何正确关闭JeroMQ,到目前为止我知道三种方法都有它们的优点和缺点,我不知道哪一个是最好的.

情况:

  • 线程A:拥有上下文,应提供启动/停止方法
  • 线程B:实际的侦听器线程

我目前的方法:

线程A.

static ZContext CONTEXT = new ZContext();
Thread thread;

public void start() {
    thread = new Thread(new B()).start();
}

public void stop() {
    thread.stopping = true;
    thread.join();
}
Run Code Online (Sandbox Code Playgroud)

线程B.

boolean stopping = false;
ZMQ.Socket socket;

public void run() {
    socket = CONTEXT.createSocket(ROUTER);
    ... // socket setup
    socket.setReceiveTimeout(10);

    while (!stopping) {
        socket.recv();
    }

    if (NUM_SOCKETS >= 1) {
        CONTEXT.destroySocket(socket);
    } else {
        CONTEXT.destroy();
    }
}
Run Code Online (Sandbox Code Playgroud)

这很有用.关机10ms对我来说没有问题,但是当没有收到消息时,我会不必要地增加CPU负载.目前我更喜欢这个.


第二种方法在两个线程之间共享套接字:

线程A.

static ZContext CONTEXT = new ZContext();
ZMQ.Socket socket; …
Run Code Online (Sandbox Code Playgroud)

sockets recv zeromq jeromq

7
推荐指数
1
解决办法
1957
查看次数

如果可以使用JZMQ,为什么需要JeroMQ?

简单的问题.为什么在java上"移植"zmq并称之为JeroMQ是个好主意?

zeromq jzmq jeromq

6
推荐指数
1
解决办法
3897
查看次数

如何将一个float数组(没有序列化/反序列化)从Scala(JeroMQ)传输到C(ZMQ)?

目前,我正在使用JSON库来序列化发送方(JeroMQ)上的数据,并在接收方(C,ZMQ)进行反序列化.但是,在解析时,JSON库开始消耗大量内存,操作系统会终止进程.所以,我想按原样发送float数组,即不使用JSON.

现有的发件人代码在下面(syn0并且syn1Double数组).如果syn0并且syn1每个大约100 MB,则在解析接收到的数组时,该进程将被终止,即下面的代码段的最后一行:

import org.zeromq.ZMQ
import com.codahale.jerkson
socket.connect("tcp://localhost:5556")

socket.send(json.JSONObject(Map("syn0"->json.JSONArray(List.fromArray(syn0Global)))).toString())
println("SYN0 Request sent”)
val reply_syn0 = socket.recv(0)
println("Response received after syn0: " + new String(reply_syn0))
logInfo("Sending Syn1 request … , size : " + syn1Global.length )

socket.send(json.JSONObject(Map("syn1"->json.JSONArray(List.fromArray(syn1Global)))).toString())
println("SYN1 Request sent")
val reply_syn1 = socket.recv(0)

socket.send(json.JSONObject(Map("foldComplete"->"Done")).toString())
println("foldComplete sent")
//  Get the reply.
val reply_foldComplete = socket.recv(0)
val processedSynValuesJson = new String(reply_foldComplete)
val processedSynValues_jerkson =   jerkson.Json.parse[Map[String,List[Double]]](processedSynValuesJson)
Run Code Online (Sandbox Code Playgroud)

可以在不使用JSON的情况下传输这些数组吗?

这里我在两个C程序之间传输一个float数组:

//client.c
int main (void)
{ …
Run Code Online (Sandbox Code Playgroud)

json distributed scala zeromq jeromq

6
推荐指数
1
解决办法
674
查看次数

为什么这个JeroMQ(ZeroMQ端口)基准测试如此之慢?

我想使用我发现的这个库,它是zeromq的纯java端口(不是包装器).我正在尝试测试它,虽然它声称有一些好的数字,我正在执行的测试结果相当差,它甚至在本地执行(客户端和服务在同一台机器上).我确定这是我做错了.需要约.5秒执行此10.000消息循环.

我所做的只是采用Hello world示例并删除了暂停和sysouts.这是代码:

服务器:

package guide;

import org.jeromq.ZMQ;

public class hwserver{
    public static void main(String[] args) throws Exception{

        //  Prepare our context and socket
        ZMQ.Context context = ZMQ.context(1);
        ZMQ.Socket socket = context.socket(ZMQ.REP);

        System.out.println("Binding hello world server");
        socket.bind ("tcp://*:5555");        

        while (true) {                  
            byte[] reply = socket.recv(0);
            String requestString = "Hello" ;
            byte[] request = requestString.getBytes();              
            socket.send(request, 0);            
        }              
    }
}
Run Code Online (Sandbox Code Playgroud)

客户端:

package guide;

import org.jeromq.ZMQ;

public class hwclient{
    public static void main(String[] args){
        ZMQ.Context context = ZMQ.context(1); …
Run Code Online (Sandbox Code Playgroud)

benchmarking messaging zeromq jeromq

5
推荐指数
1
解决办法
3477
查看次数

ZeroMQ,我们可以使用inproc:transport以及pub/sub消息传递模式

场景:

我们正在ZeroMQ(特别jeroMq)评估事件驱动机制.

应用程序分布在多个服务(发布者和订阅者都是服务)可以存在于同一个jvm或不同节点中,这取决于部署体系结构.

意见

为了玩游戏我使用jero mq 创建了一个pub/ subpattern inproc:作为传输(版本:0.3.5)

  1. 线程发布能够发布(看起来像发布,至少没有错误)
  2. 另一个线程中的订户没有收到任何东西.

使用inproc:连同pub/sub可行吗?

尝试谷歌搜索,但找不到任何具体的,任何见解?

pub/ subwith的代码示例inproc:

使用jero mq(版本:0.3.5)的inproc pub sub的工作代码示例对以后访问此帖子的人有用.一个出版商出版的话题AB,和两个用户接收AB分别

/**
 * @param args
 */
public static void main(String[] args) {

    // The single ZMQ instance
    final Context context = ZMQ.context(1);

    ExecutorService executorService = Executors.newFixedThreadPool(3);
    //Publisher
    executorService.execute(new Runnable() {

        @Override
        public void run() { …
Run Code Online (Sandbox Code Playgroud)

java event-driven-design publish-subscribe zeromq jeromq

5
推荐指数
1
解决办法
1601
查看次数

Android 上的 Jeromq:没有虚拟方法clear()Ljava/nio/ByteBuffer

我正在尝试将 jeromq 用于 android 项目。我需要连接到另一个经销商设备。这是我的代码:

ZContext zcontext = new ZContext(1);
ZMQ.Socket zsocket = zcontext.createSocket(ZMQ.DEALER);
String identity = "S61_phone";
zsocket.setIdentity(identity.getBytes(ZMQ.CHARSET));
zsocket.connect("tcp://my_other_device_ip_and_port_here");
zsocket.send("test",0);
Run Code Online (Sandbox Code Playgroud)

在 connect 调用时,发生错误:

2019-09-26 16:59:53.033 18347-18379/?E/AndroidRuntime:致命异常:Thread-4 进程:com.flir.flironeexampleapplication,PID:18347 java.lang.NoSuchMethodError:没有虚拟方法clear()Ljava/nio/ByteBuffer;在类 Ljava/nio/ByteBuffer 中;或其超类(“java.nio.ByteBuffer”的声明出现在 /system/framework/core-oj.jar 中)位于 zmq.Signaler.send(Signaler.java:97) 处 zmq.Mailbox.send(Mailbox.java :71)在zmq.Ctx.sendCommand(Ctx.java:517)在zmq.ZObject.sendCommand(ZObject.java:382)在zmq.ZObject.sendPlug(ZObject.java:185)在zmq.ZObject.sendPlug(ZObject) .java:175) 在 zmq.Own.launchChild(Own.java:115) 在 zmq.SocketBase.addEndpoint(SocketBase.java:590) 在 zmq.SocketBase.connect(SocketBase.java:582) 在 org.zeromq.ZMQ $Socket.connect(ZMQ.java:2531) 位于 com.flir.flironeexampleapplication.GLPreviewActivity.onDeviceConnected(GLPreviewActivity.java:115) 位于 com.flir.flironesdk.EmbeddedDevice$4.run(EmbeddedDevice.java:512) 位于 java.lang .Thread.run(Thread.java:764)

我的环境:

  • 杰罗姆克 0.5.2
  • 执行代码的目标设备是Android 8.1 (Oreo)
  • 使用 JDK 1.8.0_66 / ndk r10e 在 Android Studio 中编译

知道问题是什么吗?多谢。

java android zeromq jeromq

4
推荐指数
1
解决办法
3256
查看次数

如何在MATLAB中使用jeromq

jeromq 是 libzmq 的 Java 实现。我有一个从 jeromq 源创建的 .jar 文件。但是,我无法从 MATLAB 调用 jeromq 中的类。我已经使用过 addjavaclasspathaddjavalibrarypath但仍然无法让它工作。有没有人在 MATLAB 中有一个简单的工作示例?

matlab jeromq

2
推荐指数
1
解决办法
2407
查看次数