小编use*_*197的帖子

消息包和日期时间

zeromq我需要一种在 python 多处理进程之间每秒发送 300 条短消息的快速方法。每条消息都需要包含一个IDandtime.time()

msgpack似乎是在通过 发送之前序列化字典的最佳方法zeromq,并且方便地,msgpack有一个正是我需要的示例,除了它有一个datetime.datetime.now().

import datetime

import msgpack

useful_dict = {
    "id": 1,
    "created": datetime.datetime.now(),
}

def decode_datetime(obj):
    if b'__datetime__' in obj:
        obj = datetime.datetime.strptime(obj["as_str"], "%Y%m%dT%H:%M:%S.%f")
    return obj

def encode_datetime(obj):
    if isinstance(obj, datetime.datetime):
        return {'__datetime__': True, 'as_str': obj.strftime("%Y%m%dT%H:%M:%S.%f")}
    return obj


packed_dict = msgpack.packb(useful_dict, default=encode_datetime)
this_dict_again = msgpack.unpackb(packed_dict, object_hook=decode_datetime)
Run Code Online (Sandbox Code Playgroud)

问题是他们的示例不起作用,我收到此错误:

    obj = datetime.datetime.strptime(obj["as_str"], "%Y%m%dT%H:%M:%S.%f")
KeyError: 'as_str'
Run Code Online (Sandbox Code Playgroud)

也许因为我使用的是python 3.4,但我不知道strptime有什么问题。将不胜感激您的帮助。

python datetime zeromq python-3.x msgpack

5
推荐指数
1
解决办法
5464
查看次数

ZeroMQ:带有大消息的 REQ/REP

我正在尝试300 MB通过REP-socket发送大消息 ( ) 。
有多个客户端,每个客户端都通过REQ-socket连接到服务器。服务器为每个客户端创建一个专用套接字,等待请求(包含一个标识符)并使用ZMQ_SENDMORE以下两部分发送消息:

  1. 元数据 (~ 1 KB)
  2. 数据 (~ 300 MB)

之后REP-socket 立即关闭。该Context()-instance然后在父线程关闭。ZMQ_LINGER套接字的时间保留为默认值(无限)。有时会发送元数据,但不会发送图像数据。
我跟踪了 ZeroMQ 中的调用,发现内部(windows)套接字在消息数据发送到网络之前已关闭。我认为zmq_term()只要队列中有未发送的消息就会阻塞。
作为一种解决方法,我将客户端更改为在收到数据作为确认后发送另一条请求消息。这很有效,但我不太确定我是否已经解决了根本问题。

ZeroMQ 版本是 4.0.4。我正在使用 C++ 绑定。服务器和客户端都在 Windows(7 和 10)上运行。

c++ sockets zeromq

5
推荐指数
1
解决办法
995
查看次数

如何找出PUB服务器中所有订阅的过滤器?

我有一个PUB服务器。它如何知道订阅了哪些过滤器,以便服务器知道它必须创建哪些数据?一旦没有客户端感兴趣,

服务器就不需要创建数据。SUB

假设可能的过滤器集很大(或无限),但订阅者在任何给定时间都只订阅其中的几个。

示例:假设SUB客户仅订阅纽约和巴黎的几个区号的天气源数据。那么PUB服务器就不必为世界上每个其他城市的每个其他区号创建天气数据,然后再次将其全部丢弃。

如何找出PUB服务器中所有订阅的过滤器?

如果没有简单的方法,我该如何用其他方法解决这个问题?

zeromq

5
推荐指数
1
解决办法
94
查看次数

在iPythonNotebook中中断Flask应用程序会导致ZMQerror

我正试图flask在这样的iPython笔记本中运行一个最简单的应用程序演示.

from flask import Flask
app = Flask(__name__)

@app.route('/')
def hello_world():.
    return 'Hello World!'

if __name__ == '__main__':
    app.run(d)
Run Code Online (Sandbox Code Playgroud)

我第一次运行它,一切都很好.然后我打断了牢房app.run().但是下次我运行时,笔记本会抛出一些错误信息:

An exception has occurred, use %tb to see the full traceback.

SystemExit: 1
Run Code Online (Sandbox Code Playgroud)

然后我%tb编辑并得到以下回溯:

SystemExit                                Traceback (most recent call last)
<ipython-input-7-a59dfe133898> in <module>()
----> 1 myapp.run(debug=True)

C:\Users\Lewis\AppData\Local\Enthought\Canopy\User\lib\site-packages\flask\app.pyc in run(self, host, port, debug, **options)
    770         options.setdefault('use_debugger', self.debug)
    771         try:
--> 772             run_simple(host, port, self, **options)
    773         finally:
    774             # reset the first request …
Run Code Online (Sandbox Code Playgroud)

zeromq flask pyzmq ipython-notebook

5
推荐指数
1
解决办法
2140
查看次数

ZeroMQ,我们可以使用inproc:transport以及pub/sub消息传递模式

场景:

我们正在ZeroMQ(特别jeroMq)评估事件驱动机制.

应用程序分布在多个服务(发布者和订阅者都是服务)可以存在于同一个jvm或不同节点中,这取决于部署体系结构.

意见

为了玩游戏我使用jero mq 创建了一个pub/ subpattern inproc:作为传输(版本:0.3.5)

  1. 线程发布能够发布(看起来像发布,至少没有错误)
  2. 另一个线程中的订户没有收到任何东西.

使用inproc:连同pub/sub可行吗?

尝试谷歌搜索,但找不到任何具体的,任何见解?

pub/ subwith的代码示例inproc:

使用jero mq(版本:0.3.5)的inproc pub sub的工作代码示例对以后访问此帖子的人有用.一个出版商出版的话题AB,和两个用户接收AB分别

/**
 * @param args
 */
public static void main(String[] args) {

    // The single ZMQ instance
    final Context context = ZMQ.context(1);

    ExecutorService executorService = Executors.newFixedThreadPool(3);
    //Publisher
    executorService.execute(new Runnable() {

        @Override
        public void run() { …
Run Code Online (Sandbox Code Playgroud)

java event-driven-design publish-subscribe zeromq jeromq

5
推荐指数
1
解决办法
1601
查看次数

用不同语言编写的 ZeroMQ 套接字的兼容性

我已经构建了一个基于 python 编写的应用程序ZeroMQ,但现在我面临着性能问题。所以我决定使用 Golang 重写我的应用程序的一些模块。但是当我尝试在不同语言实现的套接字之间建立消息传递时,任何事情都不起作用。

到目前为止,我已经搜索过,但没有找到有关ZeroMQ使用不同语言的兼容性问题的任何信息。

所以问题是:我可以使用golang进行基于ZeroMQpython编写的客户端的服务器实现来连接它吗?
还是我必须只使用一种语言?

编辑:这是我正在尝试正常工作的典型服务器和客户端

服务器:

import zmqctx = zmq.Context()
sock = ctx.socket(zmq.REP)
sock.bind("tcp://*:57000")
msg = sock.recv()
Run Code Online (Sandbox Code Playgroud)

客户:

package main

import (
    zmq "github.com/pebbe/zmq4"
)

func main() {

    ctx, _ := zmq.NewContext()
    sock, _ := ctx.NewSocket(zmq.REQ)

    sock.Connect("tcp://localhost:57000")
    sock.Send("simple message", 0)
}
Run Code Online (Sandbox Code Playgroud)

服务器卡在 sock.recv()

python go zeromq

5
推荐指数
1
解决办法
773
查看次数

以交互方式调试并行 python 和 SCOOP

我来自mpi4py交互式调试代码的背景,因此每个不同 CPU 所经历的 python 实例显示在xterm窗口的不同实例中。按照下面链接中的建议,我已经能够使用诸如 的命令来执行我的代码$mpirun -np 4 xterm -e "ipython -i script.py",这是我从以下来源了解到的:debuggingmpi4py交互

以这种方式执行并行 python 代码意味着如果我插入断点 pdb.set_trace(),代码中与每个单个处理器的上下文相关的错误变得非常透明,这种方法极大地促进了猴子修补。

我现在已经从 转移mpi4pySCOOP,我想知道是否有任何类似的方法可以在不同xterm实例中启动对应于不同 CPU 的 python 处理器?切换的原因是因为现在我正在使用一个 python 模块DEAP,该模块旨在与SCOOP.

我也想知道如果WakariIPcluster方法可以结合SCOOP或者mpi4py还有?

Note:我添加了标签,ZeroMQ因为我相信它SCOOP是建立在ZeroMQ.

python debugging zeromq mpi4py

5
推荐指数
0
解决办法
292
查看次数

如何保护ZeroMQ请求回复模式以防止潜在的消息丢失?

我正在尝试在c#应用程序和分布式python服务器之间的TCP层上实现ZeroMQ模式.我有一个使用请求 - 回复模式的版本,在测试时看起来相对稳定.但是,在测试中,我调试了一些情况,我在收到回复之前意外地发送了多个请求,这显然是不可接受的.REQ/REPlocalhost

在实践中,网络可能会有大量丢弃的数据包,我怀疑我将丢弃大量的回复和/或无法发送请求.

1)有没有办法重置REQ/REP请求 - 回复套接字之间的连接
一种REOUTER/DEALER模式会更有意义吗?由于这是我第一次使用ZeroMQ,我希望保持简单.

2)是否有一个良好的ZeroMQ机制来处理连接事件? 我一直在阅读"指南",有一些关于监控连接的提及,但没有例子.我找到了ZMonitor,但无法在c#中触发事件.

c# python network-programming connectivity zeromq

5
推荐指数
1
解决办法
2427
查看次数

如何理解队列在分布式系统中的作用?

我试图了解分布式系统中队列的用例是什么。
以及它如何扩展以及如何确保它不是系统中的单点故障?

任何直接回答或对文档的引用都值得赞赏。

  1. 用例:
    我知道队列是一个消息系统。并且它解耦了彼此通信的系统。但是,这是使用队列的唯一目的吗?

  2. 可扩展性:
    队列如何针对大量数据进行扩展?既读又写。

  3. 可靠性:队列如何不成为系统中的单点故障?队列是否进行复制(类似于数据存储)?

我的问题没有指定到任何特定的队列服务器,如 Kafka 或 JMS。就一般而言。

message-queue distributed-system

5
推荐指数
1
解决办法
566
查看次数

为什么以下简单的并行化代码比Python中的简单循环慢得多?

一个简单的程序,用于计算数字平方并存储结果:

    import time
    from joblib import Parallel, delayed
    import multiprocessing

    array1 = [ 0 for i in range(100000) ]

    def myfun(i):
        return i**2

    #### Simple loop ####
    start_time = time.time()

    for i in range(100000):
        array1[i]=i**2

    print( "Time for simple loop         --- %s seconds ---" % (  time.time()
                                                               - start_time
                                                                 )
            )
    #### Parallelized loop ####
    start_time = time.time()
    results = Parallel( n_jobs  = -1,
                        verbose =  0,
                        backend = "threading"
                        )(
                        map( delayed( myfun ),
                             range( 100000 )
                             )
                        ) …
Run Code Online (Sandbox Code Playgroud)

python arrays parallel-processing function

5
推荐指数
2
解决办法
685
查看次数