Aja*_*her 5 android zeromq jzmq
我正在尝试使用XPUB和XSUB来实现,如下图所示。我已经仔细阅读了所提供的示例,但在Java中无法获得XPUB和XSUB的示例。在这里,他们给出了一个用C编写的示例,该示例有点复杂,因为我是ZeroMQ的新手。
我正在尝试使用jni包装版本在android中使用它。请帮助我找到一个示例,说明如何使用Java在ZeroMQ中使用代理实现此Pub-Sub网络。
目前,我指的是http://zguide.zeromq.org/page:all
我尝试将其移植如下。 Subscriber.java
公共类订户扩展了线程实现Runnable {
private static final String TAG = "Subscriber";
private Context ctx;
public Subscriber(ZMQ.Context z_context) {
this.ctx = z_context;
}
@Override
public void run() {
super.run();
ZMQ.Socket mulServiceSubscriber = ctx.socket(ZMQ.SUB);
mulServiceSubscriber.connect("tcp://localhost:6001");
mulServiceSubscriber.subscribe("A".getBytes());
mulServiceSubscriber.subscribe("B".getBytes());
while (true) {
Log.d(TAG, "Subscriber loop started..");
String content = new String(mulServiceSubscriber.recv(0));
Log.d(TAG, "Subscriber Received : "+content);
}
}
Run Code Online (Sandbox Code Playgroud)
}
Publisher.java
公共类Publisher扩展了Thread实现Runnable {
private static final String TAG = "Publisher";
private Context ctx;
public Publisher(ZMQ.Context z_context) {
this.ctx = z_context;
}
@Override
public void run() {
super.run();
ZMQ.Socket publisher = ctx.socket(ZMQ.PUB);
publisher.connect("tcp://localhost:6000");
while (true) {
Log.d(TAG, "Publisher loop started..");
publisher.send(("A Hello " + new Random(100).nextInt()).getBytes() , 0);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Run Code Online (Sandbox Code Playgroud)
}
XListener.java(现在是一个简单的转发器)
公共类XListener扩展Thread实现Runnable {
private static final String TAG = null;
private Socket publisherX;
private Context ctx;
private Socket subscriberX;
public XListener(ZMQ.Context ctx, ZMQ.Socket subscriberX,
ZMQ.Socket publisherX) {
this.ctx = ctx;
this.subscriberX = subscriberX;
this.publisherX = publisherX;
}
@Override
public void run() {
super.run();
while (true) {
Log.d(TAG, "XListener loop started..");
String msg = new String(subscriberX.recvStr());
Log.v(TAG, "Listener Received: " +"MSG :"+msg);
publisherX.send(msg.getBytes(), 0);
}
}
Run Code Online (Sandbox Code Playgroud)
}
在应用程序中main()
私人void main(){
ZMQ.Context ctx = ZMQ.context(1);
ZMQ.Socket subscriberX = ctx.socket(ZMQ.XSUB);
subscriberX.bind("tcp://*:6000");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
ZMQ.Socket publisherX = ctx.socket(ZMQ.XPUB);
publisherX.bind("tcp://*:6001");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
new XListener(ctx, subscriberX, publisherX).start();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
new XSender(ctx, subscriberX, publisherX).start();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
new Subscriber(ctx).start();
new Publisher(ctx).start();
}
Run Code Online (Sandbox Code Playgroud)
使用该代码,我无法收听XSUB。移植espresso.c时,我无法在ZMQ的Java绑定中找到任何包装。如何实现一个简单的代理,或者我缺少什么?
哇我正在回答我自己的问题。我错过了添加从publisherX到subscriberX的转发器。这是缺少的代码。现在XSUB和XPUB可以发送和获取数据了。
公共类 XSender 扩展 Thread 实现 Runnable {
private static final String TAG = null;
private Socket publisherX;
private Context ctx;
private Socket subscriberX;
public XSender(ZMQ.Context ctx, ZMQ.Socket subscriberX,
ZMQ.Socket publisherX) {
this.ctx = ctx;
this.subscriberX = subscriberX;
this.publisherX = publisherX;
}
@Override
public void run() {
super.run();
while (true) {
// Read envelope with address
Log.d(TAG, "XListener loop started..");
String msg = new String(subscriberX.recv(0));
Log.v(TAG, "Listener Received: " +"MSG :"+msg);
publisherX.send(msg.getBytes(), 0);
}
}
Run Code Online (Sandbox Code Playgroud)
}
| 归档时间: |
|
| 查看次数: |
2602 次 |
| 最近记录: |