如何将一个float数组(没有序列化/反序列化)从Scala(JeroMQ)传输到C(ZMQ)?

use*_*878 6 json distributed scala zeromq jeromq

目前,我正在使用JSON库来序列化发送方(JeroMQ)上的数据,并在接收方(C,ZMQ)进行反序列化.但是,在解析时,JSON库开始消耗大量内存,操作系统会终止进程.所以,我想按原样发送float数组,即不使用JSON.

现有的发件人代码在下面(syn0并且syn1Double数组).如果syn0并且syn1每个大约100 MB,则在解析接收到的数组时,该进程将被终止,即下面的代码段的最后一行:

import org.zeromq.ZMQ
import com.codahale.jerkson
socket.connect("tcp://localhost:5556")

socket.send(json.JSONObject(Map("syn0"->json.JSONArray(List.fromArray(syn0Global)))).toString())
println("SYN0 Request sent”)
val reply_syn0 = socket.recv(0)
println("Response received after syn0: " + new String(reply_syn0))
logInfo("Sending Syn1 request … , size : " + syn1Global.length )

socket.send(json.JSONObject(Map("syn1"->json.JSONArray(List.fromArray(syn1Global)))).toString())
println("SYN1 Request sent")
val reply_syn1 = socket.recv(0)

socket.send(json.JSONObject(Map("foldComplete"->"Done")).toString())
println("foldComplete sent")
//  Get the reply.
val reply_foldComplete = socket.recv(0)
val processedSynValuesJson = new String(reply_foldComplete)
val processedSynValues_jerkson =   jerkson.Json.parse[Map[String,List[Double]]](processedSynValuesJson)
Run Code Online (Sandbox Code Playgroud)

可以在不使用JSON的情况下传输这些数组吗?

这里我在两个C程序之间传输一个float数组:

//client.c
int main (void)
{
printf ("Connecting to hello world server…\n");
void *context = zmq_ctx_new ();
void *requester = zmq_socket (context, ZMQ_REQ);
zmq_connect (requester, "tcp://localhost:5555");

int request_nbr;
float send_buffer[10];
float recv_buffer[10];

for(int i = 0; i < 10; i++)
    send_buffer[i] = i;

for (request_nbr = 0; request_nbr != 10; request_nbr++) {
    //char buffer [10];
    printf ("Sending Hello %d…\n", request_nbr);
    zmq_send (requester, send_buffer, 10*sizeof(float), 0);
    zmq_recv (requester, recv_buffer, 10*sizeof(float), 0);
    printf ("Received World %.3f\n", recv_buffer[5]);
}
zmq_close (requester);
zmq_ctx_destroy (context);
return 0;
}

//server.c

int main (void)
{
//  Socket to talk to clients
void *context = zmq_ctx_new ();
void *responder = zmq_socket (context, ZMQ_REP);
int rc = zmq_bind (responder, "tcp://*:5555");
assert (rc == 0);
float recv_buffer[10];
float send_buffer[10];
while (1) {
    //char buffer [10];
    zmq_recv (responder, recv_buffer, 10*sizeof(float), 0);
    printf ("Received Hello\n");
    for(int i = 0; i < 10; i++)
            send_buffer[i] = recv_buffer[i]+5;
    zmq_send (responder, send_buffer, 10*sizeof(float), 0);
}
return 0;
}
Run Code Online (Sandbox Code Playgroud)

最后,我尝试使用Scala做类似的尝试(下面是客户端代码):

def main(args: Array[String]) {
val context = ZMQ.context(1)
val socket = context.socket(ZMQ.REQ)

println("Connecting to hello world server…")
socket.connect ("tcp://localhost:5555")
val msg : Array[Float] = Array(1,2,3,4,5,6,7,8,9,10)
val bbuf = java.nio.ByteBuffer.allocate(4*msg.length)
bbuf.asFloatBuffer.put(java.nio.FloatBuffer.wrap(msg))


for (request_nbr <- 1 to 10)  {
    socket.sendByteBuffer(bbuf,0)

}
}
Run Code Online (Sandbox Code Playgroud)

Jas*_*son 3

您需要以某种形式或方式序列化数据 - 最终,您将在一侧获取内存中的结构,并指导另一侧如何重建该结构(使用两种单独的语言的奖励点,其中内存中的结构无论如何可能是不同的)。我建议您使用新的 JSON 库,因为这似乎是问题所在,但您可以使用更有效的协议。 Protocol Buffers在多种语言中享有良好的支持,这可能是我要开始的地方。