use*_*878 6 json distributed scala zeromq jeromq
目前,我正在使用JSON库来序列化发送方(JeroMQ)上的数据,并在接收方(C,ZMQ)进行反序列化.但是,在解析时,JSON库开始消耗大量内存,操作系统会终止进程.所以,我想按原样发送float数组,即不使用JSON.
现有的发件人代码在下面(syn0并且syn1是Double数组).如果syn0并且syn1每个大约100 MB,则在解析接收到的数组时,该进程将被终止,即下面的代码段的最后一行:
import org.zeromq.ZMQ
import com.codahale.jerkson
socket.connect("tcp://localhost:5556")
socket.send(json.JSONObject(Map("syn0"->json.JSONArray(List.fromArray(syn0Global)))).toString())
println("SYN0 Request sent”)
val reply_syn0 = socket.recv(0)
println("Response received after syn0: " + new String(reply_syn0))
logInfo("Sending Syn1 request … , size : " + syn1Global.length )
socket.send(json.JSONObject(Map("syn1"->json.JSONArray(List.fromArray(syn1Global)))).toString())
println("SYN1 Request sent")
val reply_syn1 = socket.recv(0)
socket.send(json.JSONObject(Map("foldComplete"->"Done")).toString())
println("foldComplete sent")
// Get the reply.
val reply_foldComplete = socket.recv(0)
val processedSynValuesJson = new String(reply_foldComplete)
val processedSynValues_jerkson = jerkson.Json.parse[Map[String,List[Double]]](processedSynValuesJson)
Run Code Online (Sandbox Code Playgroud)
可以在不使用JSON的情况下传输这些数组吗?
这里我在两个C程序之间传输一个float数组:
//client.c
int main (void)
{
printf ("Connecting to hello world server…\n");
void *context = zmq_ctx_new ();
void *requester = zmq_socket (context, ZMQ_REQ);
zmq_connect (requester, "tcp://localhost:5555");
int request_nbr;
float send_buffer[10];
float recv_buffer[10];
for(int i = 0; i < 10; i++)
send_buffer[i] = i;
for (request_nbr = 0; request_nbr != 10; request_nbr++) {
//char buffer [10];
printf ("Sending Hello %d…\n", request_nbr);
zmq_send (requester, send_buffer, 10*sizeof(float), 0);
zmq_recv (requester, recv_buffer, 10*sizeof(float), 0);
printf ("Received World %.3f\n", recv_buffer[5]);
}
zmq_close (requester);
zmq_ctx_destroy (context);
return 0;
}
//server.c
int main (void)
{
// Socket to talk to clients
void *context = zmq_ctx_new ();
void *responder = zmq_socket (context, ZMQ_REP);
int rc = zmq_bind (responder, "tcp://*:5555");
assert (rc == 0);
float recv_buffer[10];
float send_buffer[10];
while (1) {
//char buffer [10];
zmq_recv (responder, recv_buffer, 10*sizeof(float), 0);
printf ("Received Hello\n");
for(int i = 0; i < 10; i++)
send_buffer[i] = recv_buffer[i]+5;
zmq_send (responder, send_buffer, 10*sizeof(float), 0);
}
return 0;
}
Run Code Online (Sandbox Code Playgroud)
最后,我尝试使用Scala做类似的尝试(下面是客户端代码):
def main(args: Array[String]) {
val context = ZMQ.context(1)
val socket = context.socket(ZMQ.REQ)
println("Connecting to hello world server…")
socket.connect ("tcp://localhost:5555")
val msg : Array[Float] = Array(1,2,3,4,5,6,7,8,9,10)
val bbuf = java.nio.ByteBuffer.allocate(4*msg.length)
bbuf.asFloatBuffer.put(java.nio.FloatBuffer.wrap(msg))
for (request_nbr <- 1 to 10) {
socket.sendByteBuffer(bbuf,0)
}
}
Run Code Online (Sandbox Code Playgroud)
您需要以某种形式或方式序列化数据 - 最终,您将在一侧获取内存中的结构,并指导另一侧如何重建该结构(使用两种单独的语言的奖励点,其中内存中的结构无论如何可能是不同的)。我建议您使用新的 JSON 库,因为这似乎是问题所在,但您可以使用更有效的协议。 Protocol Buffers在多种语言中享有良好的支持,这可能是我要开始的地方。
| 归档时间: |
|
| 查看次数: |
674 次 |
| 最近记录: |