zeromq我需要一种在 python 多处理进程之间每秒发送 300 条短消息的快速方法。每条消息都需要包含一个IDandtime.time()
msgpack似乎是在通过 发送之前序列化字典的最佳方法zeromq,并且方便地,msgpack有一个正是我需要的示例,除了它有一个datetime.datetime.now().
import datetime
import msgpack
useful_dict = {
"id": 1,
"created": datetime.datetime.now(),
}
def decode_datetime(obj):
if b'__datetime__' in obj:
obj = datetime.datetime.strptime(obj["as_str"], "%Y%m%dT%H:%M:%S.%f")
return obj
def encode_datetime(obj):
if isinstance(obj, datetime.datetime):
return {'__datetime__': True, 'as_str': obj.strftime("%Y%m%dT%H:%M:%S.%f")}
return obj
packed_dict = msgpack.packb(useful_dict, default=encode_datetime)
this_dict_again = msgpack.unpackb(packed_dict, object_hook=decode_datetime)
Run Code Online (Sandbox Code Playgroud)
问题是他们的示例不起作用,我收到此错误:
obj = datetime.datetime.strptime(obj["as_str"], "%Y%m%dT%H:%M:%S.%f")
KeyError: 'as_str'
Run Code Online (Sandbox Code Playgroud)
也许因为我使用的是python 3.4,但我不知道strptime有什么问题。将不胜感激您的帮助。
我正在尝试300 MB通过REP-socket发送大消息 ( ) 。
有多个客户端,每个客户端都通过REQ-socket连接到服务器。服务器为每个客户端创建一个专用套接字,等待请求(包含一个标识符)并使用ZMQ_SENDMORE以下两部分发送消息:
1 KB)300 MB)之后REP-socket 立即关闭。该Context()-instance然后在父线程关闭。ZMQ_LINGER套接字的时间保留为默认值(无限)。有时会发送元数据,但不会发送图像数据。
我跟踪了 ZeroMQ 中的调用,发现内部(windows)套接字在消息数据发送到网络之前已关闭。我认为zmq_term()只要队列中有未发送的消息就会阻塞。
作为一种解决方法,我将客户端更改为在收到数据作为确认后发送另一条请求消息。这很有效,但我不太确定我是否已经解决了根本问题。
ZeroMQ 版本是 4.0.4。我正在使用 C++ 绑定。服务器和客户端都在 Windows(7 和 10)上运行。
我有一个PUB服务器。它如何知道订阅了哪些过滤器,以便服务器知道它必须创建哪些数据?一旦没有客户端感兴趣,
服务器就不需要创建数据。SUB
假设可能的过滤器集很大(或无限),但订阅者在任何给定时间都只订阅其中的几个。
示例:假设SUB客户仅订阅纽约和巴黎的几个区号的天气源数据。那么PUB服务器就不必为世界上每个其他城市的每个其他区号创建天气数据,然后再次将其全部丢弃。
如何找出PUB服务器中所有订阅的过滤器?
如果没有简单的方法,我该如何用其他方法解决这个问题?
我正试图flask在这样的iPython笔记本中运行一个最简单的应用程序演示.
from flask import Flask
app = Flask(__name__)
@app.route('/')
def hello_world():.
return 'Hello World!'
if __name__ == '__main__':
app.run(d)
Run Code Online (Sandbox Code Playgroud)
我第一次运行它,一切都很好.然后我打断了牢房app.run().但是下次我运行时,笔记本会抛出一些错误信息:
An exception has occurred, use %tb to see the full traceback.
SystemExit: 1
Run Code Online (Sandbox Code Playgroud)
然后我%tb编辑并得到以下回溯:
SystemExit Traceback (most recent call last)
<ipython-input-7-a59dfe133898> in <module>()
----> 1 myapp.run(debug=True)
C:\Users\Lewis\AppData\Local\Enthought\Canopy\User\lib\site-packages\flask\app.pyc in run(self, host, port, debug, **options)
770 options.setdefault('use_debugger', self.debug)
771 try:
--> 772 run_simple(host, port, self, **options)
773 finally:
774 # reset the first request …Run Code Online (Sandbox Code Playgroud) 场景:
我们正在ZeroMQ(特别jeroMq)评估事件驱动机制.
应用程序分布在多个服务(发布者和订阅者都是服务)可以存在于同一个jvm或不同节点中,这取决于部署体系结构.
意见
为了玩游戏我使用jero mq 创建了一个pub/ subpattern inproc:作为传输(版本:0.3.5)
题
使用inproc:连同pub/sub可行吗?
尝试谷歌搜索,但找不到任何具体的,任何见解?
pub/ subwith的代码示例inproc:
使用jero mq(版本:0.3.5)的inproc pub sub的工作代码示例对以后访问此帖子的人有用.一个出版商出版的话题A和B,和两个用户接收A并B分别
/**
* @param args
*/
public static void main(String[] args) {
// The single ZMQ instance
final Context context = ZMQ.context(1);
ExecutorService executorService = Executors.newFixedThreadPool(3);
//Publisher
executorService.execute(new Runnable() {
@Override
public void run() { …Run Code Online (Sandbox Code Playgroud) 我已经构建了一个基于 python 编写的应用程序ZeroMQ,但现在我面临着性能问题。所以我决定使用 Golang 重写我的应用程序的一些模块。但是当我尝试在不同语言实现的套接字之间建立消息传递时,任何事情都不起作用。
到目前为止,我已经搜索过,但没有找到有关ZeroMQ使用不同语言的兼容性问题的任何信息。
所以问题是:我可以使用golang进行基于ZeroMQpython编写的客户端的服务器实现来连接它吗?
还是我必须只使用一种语言?
编辑:这是我正在尝试正常工作的典型服务器和客户端
服务器:
import zmqctx = zmq.Context()
sock = ctx.socket(zmq.REP)
sock.bind("tcp://*:57000")
msg = sock.recv()
Run Code Online (Sandbox Code Playgroud)
客户:
package main
import (
zmq "github.com/pebbe/zmq4"
)
func main() {
ctx, _ := zmq.NewContext()
sock, _ := ctx.NewSocket(zmq.REQ)
sock.Connect("tcp://localhost:57000")
sock.Send("simple message", 0)
}
Run Code Online (Sandbox Code Playgroud)
服务器卡在 sock.recv()
我来自mpi4py交互式调试代码的背景,因此每个不同 CPU 所经历的 python 实例显示在xterm窗口的不同实例中。按照下面链接中的建议,我已经能够使用诸如 的命令来执行我的代码$mpirun -np 4 xterm -e "ipython -i script.py",这是我从以下来源了解到的:debuggingmpi4py交互
以这种方式执行并行 python 代码意味着如果我插入断点 pdb.set_trace(),代码中与每个单个处理器的上下文相关的错误变得非常透明,这种方法极大地促进了猴子修补。
我现在已经从 转移mpi4py到SCOOP,我想知道是否有任何类似的方法可以在不同xterm实例中启动对应于不同 CPU 的 python 处理器?切换的原因是因为现在我正在使用一个 python 模块DEAP,该模块旨在与SCOOP.
我也想知道如果WakariIPcluster方法可以结合SCOOP或者mpi4py还有?
Note:我添加了标签,ZeroMQ因为我相信它SCOOP是建立在ZeroMQ.
我正在尝试在c#应用程序和分布式python服务器之间的TCP层上实现ZeroMQ模式.我有一个使用请求 - 回复模式的版本,在测试时看起来相对稳定.但是,在测试中,我调试了一些情况,我在收到回复之前意外地发送了多个请求,这显然是不可接受的.REQ/REPlocalhost
在实践中,网络可能会有大量丢弃的数据包,我怀疑我将丢弃大量的回复和/或无法发送请求.
1)有没有办法重置REQ/REP请求 - 回复套接字之间的连接?
一种REOUTER/DEALER模式会更有意义吗?由于这是我第一次使用ZeroMQ,我希望保持简单.
2)是否有一个良好的ZeroMQ机制来处理连接事件? 我一直在阅读"指南",有一些关于监控连接的提及,但没有例子.我找到了ZMonitor,但无法在c#中触发事件.
我试图了解分布式系统中队列的用例是什么。
以及它如何扩展以及如何确保它不是系统中的单点故障?
任何直接回答或对文档的引用都值得赞赏。
用例:
我知道队列是一个消息系统。并且它解耦了彼此通信的系统。但是,这是使用队列的唯一目的吗?
可扩展性:
队列如何针对大量数据进行扩展?既读又写。
可靠性:队列如何不成为系统中的单点故障?队列是否进行复制(类似于数据存储)?
我的问题没有指定到任何特定的队列服务器,如 Kafka 或 JMS。就一般而言。
一个简单的程序,用于计算数字平方并存储结果:
import time
from joblib import Parallel, delayed
import multiprocessing
array1 = [ 0 for i in range(100000) ]
def myfun(i):
return i**2
#### Simple loop ####
start_time = time.time()
for i in range(100000):
array1[i]=i**2
print( "Time for simple loop --- %s seconds ---" % ( time.time()
- start_time
)
)
#### Parallelized loop ####
start_time = time.time()
results = Parallel( n_jobs = -1,
verbose = 0,
backend = "threading"
)(
map( delayed( myfun ),
range( 100000 )
)
) …Run Code Online (Sandbox Code Playgroud)