我知道multiprocessing.Manager()它是如何用于创建共享对象的,特别是可以在worker之间共享的队列.有这个问题,这个问题,这个问题甚至是我自己的一个问题.
但是,我需要定义很多队列,每个队列都链接一对特定的进程.假设每对进程及其链接队列由变量标识key.
当我需要放置和获取数据时,我想使用字典来访问我的队列.我无法做到这一点.我尝试过很多东西.随着multiprocessing进口为mp:
for key in all_keys: DICT[key] = mp.Queue在多处理模块导入的配置文件中定义一个dict (调用它multi.py)不会返回错误,但是队列DICT[key]之间没有共享队列,每个队列似乎都有自己的队列副本,因此没有通信发生.
如果我尝试在定义DICT进程并启动它们的主多处理函数的开头定义,比如
DICT = mp.Manager().dict()
for key in all_keys:
DICT[key] = mp.Queue()
Run Code Online (Sandbox Code Playgroud)
我收到了错误
RuntimeError: Queue objects should only be shared between processes through
inheritance
Run Code Online (Sandbox Code Playgroud)
改为
DICT = mp.Manager().dict()
for key in all_keys:
DICT[key] = mp.Manager().Queue()
Run Code Online (Sandbox Code Playgroud)
只会让事情变得更糟.multi.py在主函数的头部而不是在main函数内部尝试类似的定义会返回类似的错误.
必须有一种方法可以在进程之间共享许多队列,而无需在代码中明确命名每个队列.有任何想法吗?
编辑
这是该程序的基本架构:
1-加载第一个模块,它定义一些变量,导入multi,启动multi.main()和加载另一个模块,该模块启动一系列模块加载和代码执行.与此同时...
2- …
我是multiprocessingpython中的新软件包,对于那些了解更多内容的人来说,我的困惑可能很容易.我一直在阅读有关并发的内容,并且已经搜索了其他类似的问题并且一无所获.(仅供参考我不希望使用multithreading,因为GIL将我的应用程序有很多慢下来.)
我在事件的框架内思考.我希望有多个进程在运行,等待事件发生.如果事件发生,它将被分配给特定进程,该进程运行然后返回其空闲状态.可能有更好的方法来做到这一点,但我的理由是我应该生成所有进程一次并使它们无限期地打开,而不是每次事件发生时创建然后关闭进程.速度对我来说是一个问题,我的事件每秒可能发生数千次.
我想出了以下玩具示例,其意图是将偶数发送到一个进程,将奇数发送到另一个进程.两个进程都是相同的,它们只是将数字附加到列表中.
from multiprocessing import Process, Queue, Pipe
slist=['even','odd']
Q={}
Q['even'] = Queue()
Q['odd'] = Queue()
ev,od = [],[]
Q['even'].put(ev)
Q['odd'].put(od)
P={}
P['even'] = Pipe()
P['odd'] = Pipe()
def add_num(s):
""" The worker function, invoked in a process. The results are placed in
a list that's pushed to a queue."""
# while True :
if not P[s][1].recv():
print s,'- do nothing'
else:
d = Q[s].get()
print d
d.append(P[s][1].recv())
Q[s].put(d)
print Q[s].get()
P[s][0].send(False)
print …Run Code Online (Sandbox Code Playgroud) python multiprocessing while-loop python-2.7 python-multiprocessing
我想sub在我的main存储库中有一个存储库,它可以从与我的main. 所以我想要一个子模块。但我想在本地创建子模块,然后将其推送到远程。它还不存在于任何地方。
我见过的所有示例都涉及将远程存储库拉入子模块,这就是子模块的创建方式
git submodule add git@github.com:url_to/awesome_submodule.git path_to_awesome_submodule
Run Code Online (Sandbox Code Playgroud)
有没有办法创建子模块,进行一些提交,然后将它们推送到有问题的远程存储库?
这个问题之前已经被问过,在这里。我有完全一样的问题。我想从一堆不同的进程进行发布,并每次都使用相同的端口。
我尝试了答案中提出的解决方案,但这对我不起作用。我收到错误
File "/usr/local/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/local/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/home/akay/afk/multi.py", line 18, in to_zmq
socket.connect("tcp://*:%s" % port)
File "zmq/backend/cython/socket.pyx", line 478, in zmq.backend.cython.socket.Socket.connect (zmq/backend/cython/socket.c:4308)
ZMQError: Invalid argument
Run Code Online (Sandbox Code Playgroud)
# Socket to talk to server
port = '5556'
context = zmq.Context()
socket = context.socket(zmq.SUB)
print "Listening for stream...", m
socket.bind("tcp://localhost:%s" % port) #change connect to bind, as per answer above
socket.setsockopt(zmq.SUBSCRIBE, topicfilter)
Run Code Online (Sandbox Code Playgroud)
我正在使用 python 2.7 …
我正在使用带有 python 的 quickfix。查看此处的文档页面告诉我们如何获取字段。假设 a message = fix.message(with quickfix as fix) 来自交易对手。我可以通过调用获得 35 (MsgType) 字段
message.getHeader().getField(fix.MsgType())
例如,它返回 35=X.
我的问题是:是否有任何方法可以返回X?或者我是否必须切片所有内容(例如35=X[3:],返回X)并因此知道所有字符串的长度?
我正在使用Python API运行QuickFix,并使用FIX4.2连接到TT FIX适配器
我正在登录并发送针对两种工具的市场数据请求。这样就可以正常工作,并且可以从仪器中获得预期的数据。我可以从消息中获取各种信息。
但是,我在获取符号(标志55)字段时遇到麻烦。
import quickfix as fix
def fromApp(self, message, sessionID):
ID = fix.Symbol()
message.getField(ID)
print ID
Run Code Online (Sandbox Code Playgroud)
这适用于我收到的第一条消息[初始市场数据快照(标志35 = W)]。一旦开始进行增量刷新(标志35 = X),就无法再获取“符号”字段。每条到达的消息都会导致“未找到字段”错误。
这使我感到困惑,因为在日志中,无论消息类型是W还是X,符号字段始终存在。
认为符号在刷新消息的标题中,我尝试get.Field(ID)在35 = W和get.Header().getField(ID)35 = X时使用,但是这不起作用。
有人可以帮我弄清楚这里发生了什么吗?我希望能够明确告诉我的计算机正在查看什么仪器。
谢谢
我在 Windows 中使用 Quickfix 和 python 绑定。我过去曾能够提出市场数据请求。我最近更换了不同的 API 提供商(Cunningham,又名 CTS),并且遇到了很多问题。然而,至少其中之一似乎是 Quickfix 内部的。这让我很困惑。
\n\n当我发送市场数据请求时,我会收到回复。这是典型的 35=W 消息,市场快照。
\n\nQuickfix 正在拒绝此消息,因为标签 269 出现多次!
\n\n当然,标签269是MDEntryType,它应该出现多次。另请注意,定义了标签 268 NoMDEntries,它表示该组中有 21 个条目。
\n\n我认为这是 QuickFix 的内部原因,因为 QuickFix 正在生成错误消息并将其发送回 CTS。此外,此错误会在消息传递给函数之前中止消息fromApp。(我知道,因为我的解析器在调用时将自己应用于消息,fromApp甚至没有收到此消息)。
有任何想法吗?消息如下。
\n\n(编辑 - 我已经关闭了配置文件中的数据字典 - 它与此有什么关系吗?)
\n\n<20140915-22:39:11.953,FIX.4.2:XXXXX->CTS,传入>\n (8=FIX.4.2 \xe2\x98\xba 9=836 \xe2\x98\xba 35=W \xe2\x98 \xba 34=4 \xe2\x98\xba 49=CTS \xe2\x98\xba 56=XXXXX \xe2\x98\xba 52=20140915-22:39:11.963 \xe2\x98\xba 48=XDLCM\nE_F ZN (Z14) \xe2\x98\xba 387=2559 \xe2\x98\xba 965=2 \xe2\x98\xba 268=21 \xe2\x98\xba 269=0 \xe2\x98\xba 270=124156250 \xe2 \x98\xba …
python ×7
fix-protocol ×3
quickfix ×3
pyzmq ×2
zeromq ×2
git ×1
git-subrepo ×1
github ×1
port ×1
python-2.7 ×1
queue ×1
repository ×1
sockets ×1
while-loop ×1