如何使用CZMQ-4.0.2新zsock API创建发布/订阅架构?

use*_*869 8 c c++ zeromq

我想使用CZMQ-4.0.2创建一个发布/订阅架构,但我无法理解如何使用新的zsockAPI.

有人能指出我使用新API的一些例子吗?

Jak*_*orz 8

tldr;

示例位于网站的底部

小解释

我假设您要求CZMQ具体使用,而不是如何使用ZeroMQ套接字,以及PUB/SUB模式的怪癖.

使用CZMQ时,您无需担心上下文,而是在内部完成.zsock_newfunctions系列返回指向zsock_tsocket的不透明标识符.你需要记住zsock_destroy(&socket)在完成它时打电话,以避免内存泄漏.

在大多数常见用法中,您无需担心连接和绑定,因为zsock_new_XXX这样做.要知道采取了什么行动,您可以查看手册.

//  Create a PUB socket. Default action is bind.
CZMQ_EXPORT zsock_t *
    zsock_new_pub (const char *endpoint);
//  Create a SUB socket, and optionally subscribe to some prefix string. Default
//  action is connect.
CZMQ_EXPORT zsock_t *
    zsock_new_sub (const char *endpoint, const char *subscribe);
Run Code Online (Sandbox Code Playgroud)

如果您计划进行一些不寻常的绑定/连接,可以添加前缀endpoint.@表示绑定,>连接.

zsock_t *sock = zsock_new_push("@ipc://test");
Run Code Online (Sandbox Code Playgroud)

现在,发送邮件,你可以使用大量的方法(zsock_send,zmsg_send,zstr_send,zstr_sendx,zstr_sendf,zframe_send),最通用的是zsock_send.它有像原型一样的printf,你需要传递一条消息的图片.此字符串中的每个字符表示消息中的单个帧(或更多帧,因为您还可以传递另一个消息).它在这里描述:

//  Send a 'picture' message to the socket (or actor). The picture is a
//  string that defines the type of each frame. This makes it easy to send
//  a complex multiframe message in one call. The picture can contain any
//  of these characters, each corresponding to one or two arguments:
//
//      i = int (signed)
//      1 = uint8_t
//      2 = uint16_t
//      4 = uint32_t
//      8 = uint64_t
//      s = char *
//      b = byte *, size_t (2 arguments)
//      c = zchunk_t *
//      f = zframe_t *
//      h = zhashx_t *
//      U = zuuid_t *
//      p = void * (sends the pointer value, only meaningful over inproc)
//      m = zmsg_t * (sends all frames in the zmsg)
//      z = sends zero-sized frame (0 arguments)
//      u = uint (deprecated)
//
//  Note that s, b, c, and f are encoded the same way and the choice is
//  offered as a convenience to the sender, which may or may not already
//  have data in a zchunk or zframe. Does not change or take ownership of
//  any arguments. Returns 0 if successful, -1 if sending failed for any
//  reason.
CZMQ_EXPORT int
zsock_send (void *self, const char *picture, ...);
Run Code Online (Sandbox Code Playgroud)

一个可能不清楚的是void *self,它实际上是我们zsock_t *从zsock_new返回的.在原型中,它被声明为void *因为此函数也接受zactor_t *.

重要的是:Does not change or take ownership of any arguments..您需要在发送后释放/销毁数据.

接收看起来非常相似.它就像sscanf,并zsock_recv创建对象,所以再次,你需要照顾记忆.

ZeroMQ和CZMQ之间的行为差​​异很大,是LINGER套接字选项.对于ZeroMQ,它是无限的(-1),其中CZMQ的默认值为0(无阻塞).因此,无论何时您zsock_send跟进zsock_destroy,您的信息都可能无法发送.可以使用zsock_set_linger或全局为套接字单独设置Linger值zsys_set_linger.

发布者的示例

#include <czmq.h>

int main(int argc, char ** argv) {
  zsock_t *socket = zsock_new_pub("ipc://example.sock");
  assert(socket);

  while(!zsys_interrupted) {
    zsys_info("Publishing");
    zsock_send(socket, "sss", "TOPIC", "MESSAGE PART", "ANOTHER");
    zclock_sleep(1000);
  }

  zsock_destroy(&socket);
  return 0;
}
Run Code Online (Sandbox Code Playgroud)

订阅者的示例

#include <czmq.h>

int main(int argc, char ** argv) {
  zsock_t *socket = zsock_new_sub("ipc://example.sock", "TOPIC");
  assert(socket);

  char *topic;
  char *frame;
  zmsg_t *msg;
  int rc = zsock_recv(socket, "sm", &topic, &msg);
  assert(rc == 0);

  zsys_info("Recv on %s", topic);
  while(frame = zmsg_popstr(msg)) {
    zsys_info("> %s", frame);
    free(frame);
  }
  free(topic);
  zmsg_destroy(&msg);

  zsock_destroy(&socket);
  return 0;
}
Run Code Online (Sandbox Code Playgroud)