我在多线程环境中使用Jeromq,如下所示.下面是我的代码,其中构造函数首先SocketManager连接到所有可用的套接字,然后我将它们放在方法中的liveSocketsByDatacentermap中connectToZMQSockets.之后,我在同一个构造函数中启动一个后台线程,该构造函数每30秒运行一次,它调用updateLiveSockets方法来ping所有那些已经存在于liveSocketsByDatacentermap中的套接字并更新liveSocketsByDatacenter映射,看看这些套接字是否存活.
并且getNextSocket()多个读取器线程同时调用方法以获取下一个可用的套接字,然后我们使用该套接字在其上发送数据.所以我的问题是我们在多线程环境中正确使用Jeromq吗?因为在我们尝试将数据发送到该实时套接字时,我们刚刚在生产环境中看到了这个堆栈跟踪的异常,所以我不确定它是否是一个错误或其他什么?
java.lang.ArrayIndexOutOfBoundsException: 256
at zmq.YQueue.push(YQueue.java:97)
at zmq.YPipe.write(YPipe.java:47)
at zmq.Pipe.write(Pipe.java:232)
at zmq.LB.send(LB.java:83)
at zmq.Push.xsend(Push.java:48)
at zmq.SocketBase.send(SocketBase.java:590)
at org.zeromq.ZMQ$Socket.send(ZMQ.java:1271)
at org.zeromq.ZFrame.send(ZFrame.java:131)
at org.zeromq.ZFrame.sendAndKeep(ZFrame.java:146)
at org.zeromq.ZMsg.send(ZMsg.java:191)
at org.zeromq.ZMsg.send(ZMsg.java:163)
Run Code Online (Sandbox Code Playgroud)
以下是我的代码:
public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<>();
private final ZContext ctx = new ZContext();
private static class Holder {
private …Run Code Online (Sandbox Code Playgroud) 使用ZeroMQ .Context和.Socket实例,我能够推送/拉取消息
,例如在我的代码下方为队列设置:
ZMQ.Context context = ZMQ.context(1);
// Socket to send messages on
ZMQ.Socket sender = context.socket(ZMQ.PUSH);
sender.bind("tcp://*:5557");
// Send messages
sender.send("0", 0);
ZMQ.Socket receiver = context.socket(ZMQ.PULL);
receiver.connect("tcp://localhost:5557");
// receive messages
String string = new String(receiver.recv(0)).trim();
Run Code Online (Sandbox Code Playgroud)
Q1:如何在队列中实现主用/备用模式?
我的意思是将为一个主机和端口创建2个队列,如果一个队列(活动)发生故障,另一个(即备用)队列将立即启动以侦听/拉取消息.
任何实现它的示例或指导都会更有帮助.
Q2:是否有内置类来执行此类任务?
使用下面的简单hello-world请求 - 回复示例,在程序结束时关闭上下文失败:它只是挂起ctx.close()或者抛出以下异常:
Exception in thread "reaper-1" java.lang.NullPointerException
at zmq.Ctx.destroy_socket(Ctx.java:327)
at zmq.ZObject.destroy_socket(ZObject.java:144)
at zmq.SocketBase.check_destroy(SocketBase.java:938)
at zmq.SocketBase.start_reaping(SocketBase.java:753)
at zmq.Reaper.process_reap(Reaper.java:133)
at zmq.ZObject.process_command(ZObject.java:114)
at zmq.Reaper.in_event(Reaper.java:90)
at zmq.Poller.run(Poller.java:233)
at java.lang.Thread.run(Thread.java:724)
Run Code Online (Sandbox Code Playgroud)
无论哪种方式,程序都不会停止.
这是代码(请注意,套接字在创建它们的线程中都是关闭的):
import org.zeromq.ZMQ;
import org.zeromq.ZContext;
public class App {
public static void main(String[] args) throws InterruptedException {
final ZContext ctx = new ZContext();
final Thread t1 = new Thread() {
@Override
public void run() {
ZMQ.Socket socket = ctx.createSocket(ZMQ.REQ);
socket.connect("inproc://test");
System.err.format("[Thread %s] socket connected%n", Thread.currentThread().getId());
socket.send("hello");
System.err.format("[Thread %s] …Run Code Online (Sandbox Code Playgroud) 我想知道如何正确关闭JeroMQ,到目前为止我知道三种方法都有它们的优点和缺点,我不知道哪一个是最好的.
情况:
我目前的方法:
线程A.
static ZContext CONTEXT = new ZContext();
Thread thread;
public void start() {
thread = new Thread(new B()).start();
}
public void stop() {
thread.stopping = true;
thread.join();
}
Run Code Online (Sandbox Code Playgroud)
线程B.
boolean stopping = false;
ZMQ.Socket socket;
public void run() {
socket = CONTEXT.createSocket(ROUTER);
... // socket setup
socket.setReceiveTimeout(10);
while (!stopping) {
socket.recv();
}
if (NUM_SOCKETS >= 1) {
CONTEXT.destroySocket(socket);
} else {
CONTEXT.destroy();
}
}
Run Code Online (Sandbox Code Playgroud)
这很有用.关机10ms对我来说没有问题,但是当没有收到消息时,我会不必要地增加CPU负载.目前我更喜欢这个.
第二种方法在两个线程之间共享套接字:
线程A.
static ZContext CONTEXT = new ZContext();
ZMQ.Socket socket; …Run Code Online (Sandbox Code Playgroud) 简单的问题.为什么在java上"移植"zmq并称之为JeroMQ是个好主意?
目前,我正在使用JSON库来序列化发送方(JeroMQ)上的数据,并在接收方(C,ZMQ)进行反序列化.但是,在解析时,JSON库开始消耗大量内存,操作系统会终止进程.所以,我想按原样发送float数组,即不使用JSON.
现有的发件人代码在下面(syn0并且syn1是Double数组).如果syn0并且syn1每个大约100 MB,则在解析接收到的数组时,该进程将被终止,即下面的代码段的最后一行:
import org.zeromq.ZMQ
import com.codahale.jerkson
socket.connect("tcp://localhost:5556")
socket.send(json.JSONObject(Map("syn0"->json.JSONArray(List.fromArray(syn0Global)))).toString())
println("SYN0 Request sent”)
val reply_syn0 = socket.recv(0)
println("Response received after syn0: " + new String(reply_syn0))
logInfo("Sending Syn1 request … , size : " + syn1Global.length )
socket.send(json.JSONObject(Map("syn1"->json.JSONArray(List.fromArray(syn1Global)))).toString())
println("SYN1 Request sent")
val reply_syn1 = socket.recv(0)
socket.send(json.JSONObject(Map("foldComplete"->"Done")).toString())
println("foldComplete sent")
// Get the reply.
val reply_foldComplete = socket.recv(0)
val processedSynValuesJson = new String(reply_foldComplete)
val processedSynValues_jerkson = jerkson.Json.parse[Map[String,List[Double]]](processedSynValuesJson)
Run Code Online (Sandbox Code Playgroud)
可以在不使用JSON的情况下传输这些数组吗?
这里我在两个C程序之间传输一个float数组:
//client.c
int main (void)
{ …Run Code Online (Sandbox Code Playgroud) 我想使用我发现的这个库,它是zeromq的纯java端口(不是包装器).我正在尝试测试它,虽然它声称有一些好的数字,我正在执行的测试结果相当差,它甚至在本地执行(客户端和服务在同一台机器上).我确定这是我做错了.需要约.5秒执行此10.000消息循环.
我所做的只是采用Hello world示例并删除了暂停和sysouts.这是代码:
服务器:
package guide;
import org.jeromq.ZMQ;
public class hwserver{
public static void main(String[] args) throws Exception{
// Prepare our context and socket
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket socket = context.socket(ZMQ.REP);
System.out.println("Binding hello world server");
socket.bind ("tcp://*:5555");
while (true) {
byte[] reply = socket.recv(0);
String requestString = "Hello" ;
byte[] request = requestString.getBytes();
socket.send(request, 0);
}
}
}
Run Code Online (Sandbox Code Playgroud)
客户端:
package guide;
import org.jeromq.ZMQ;
public class hwclient{
public static void main(String[] args){
ZMQ.Context context = ZMQ.context(1); …Run Code Online (Sandbox Code Playgroud) 场景:
我们正在ZeroMQ(特别jeroMq)评估事件驱动机制.
应用程序分布在多个服务(发布者和订阅者都是服务)可以存在于同一个jvm或不同节点中,这取决于部署体系结构.
意见
为了玩游戏我使用jero mq 创建了一个pub/ subpattern inproc:作为传输(版本:0.3.5)
题
使用inproc:连同pub/sub可行吗?
尝试谷歌搜索,但找不到任何具体的,任何见解?
pub/ subwith的代码示例inproc:
使用jero mq(版本:0.3.5)的inproc pub sub的工作代码示例对以后访问此帖子的人有用.一个出版商出版的话题A和B,和两个用户接收A并B分别
/**
* @param args
*/
public static void main(String[] args) {
// The single ZMQ instance
final Context context = ZMQ.context(1);
ExecutorService executorService = Executors.newFixedThreadPool(3);
//Publisher
executorService.execute(new Runnable() {
@Override
public void run() { …Run Code Online (Sandbox Code Playgroud) 我正在尝试将 jeromq 用于 android 项目。我需要连接到另一个经销商设备。这是我的代码:
ZContext zcontext = new ZContext(1);
ZMQ.Socket zsocket = zcontext.createSocket(ZMQ.DEALER);
String identity = "S61_phone";
zsocket.setIdentity(identity.getBytes(ZMQ.CHARSET));
zsocket.connect("tcp://my_other_device_ip_and_port_here");
zsocket.send("test",0);
Run Code Online (Sandbox Code Playgroud)
在 connect 调用时,发生错误:
2019-09-26 16:59:53.033 18347-18379/?E/AndroidRuntime:致命异常:Thread-4 进程:com.flir.flironeexampleapplication,PID:18347 java.lang.NoSuchMethodError:没有虚拟方法clear()Ljava/nio/ByteBuffer;在类 Ljava/nio/ByteBuffer 中;或其超类(“java.nio.ByteBuffer”的声明出现在 /system/framework/core-oj.jar 中)位于 zmq.Signaler.send(Signaler.java:97) 处 zmq.Mailbox.send(Mailbox.java :71)在zmq.Ctx.sendCommand(Ctx.java:517)在zmq.ZObject.sendCommand(ZObject.java:382)在zmq.ZObject.sendPlug(ZObject.java:185)在zmq.ZObject.sendPlug(ZObject) .java:175) 在 zmq.Own.launchChild(Own.java:115) 在 zmq.SocketBase.addEndpoint(SocketBase.java:590) 在 zmq.SocketBase.connect(SocketBase.java:582) 在 org.zeromq.ZMQ $Socket.connect(ZMQ.java:2531) 位于 com.flir.flironeexampleapplication.GLPreviewActivity.onDeviceConnected(GLPreviewActivity.java:115) 位于 com.flir.flironesdk.EmbeddedDevice$4.run(EmbeddedDevice.java:512) 位于 java.lang .Thread.run(Thread.java:764)
我的环境:
知道问题是什么吗?多谢。
jeromq 是 libzmq 的 Java 实现。我有一个从 jeromq 源创建的 .jar 文件。但是,我无法从 MATLAB 调用 jeromq 中的类。我已经使用过 addjavaclasspath,addjavalibrarypath但仍然无法让它工作。有没有人在 MATLAB 中有一个简单的工作示例?