小编Wap*_*iti的帖子

在Python中的进程之间共享许多队列

我知道multiprocessing.Manager()它是如何用于创建共享对象的,特别是可以在worker之间共享的队列.有这个问题,这个问题,这个问题甚至是我自己的一个问题.

但是,我需要定义很多队列,每个队列都链接一对特定的进程.假设每对进程及其链接队列由变量标识key.

当我需要放置和获取数据时,我想使用字典来访问我的队列.我无法做到这一点.我尝试过很多东西.随着multiprocessing进口为mp:

for key in all_keys: DICT[key] = mp.Queue在多处理模块导入的配置文件中定义一个dict (调用它multi.py)不会返回错误,但是队列DICT[key]之间没有共享队列,每个队列似乎都有自己的队列副本,因此没有通信发生.

如果我尝试在定义DICT进程并启动它们的主多处理函数的开头定义,比如

DICT = mp.Manager().dict()    
for key in all_keys:
    DICT[key] = mp.Queue()
Run Code Online (Sandbox Code Playgroud)

我收到了错误

RuntimeError: Queue objects should only be shared between processes through
 inheritance
Run Code Online (Sandbox Code Playgroud)

改为

DICT = mp.Manager().dict()    
for key in all_keys:
    DICT[key] = mp.Manager().Queue()
Run Code Online (Sandbox Code Playgroud)

只会让事情变得更糟.multi.py在主函数的头部而不是在main函数内部尝试类似的定义会返回类似的错误.

必须有一种方法可以在进程之间共享许多队列,而无需在代码中明确命名每个队列.有任何想法吗?

编辑

这是该程序的基本架构:

1-加载第一个模块,它定义一些变量,导入multi,启动multi.main()和加载另一个模块,该模块启动一系列模块加载和代码执行.与此同时...

2- …

python queue multiprocessing python-multiprocessing

12
推荐指数
1
解决办法
1万
查看次数

使用while循环进行基本多处理

我是multiprocessingpython中的新软件包,对于那些了解更多内容的人来说,我的困惑可能很容易.我一直在阅读有关并发的内容,并且已经搜索了其他类似的问题并且一无所获.(仅供参考我希望使用multithreading,因为GIL将我的应用程序有很多慢下来.)

我在事件的框架内思考.我希望有多个进程在运行,等待事件发生.如果事件发生,它将被分配给特定进程,该进程运行然后返回其空闲状态.可能有更好的方法来做到这一点,但我的理由是我应该生成所有进程一次并使它们无限期地打开,而不是每次事件发生时创建然后关闭进程.速度对我来说是一个问题,我的事件每秒可能发生数千次.

我想出了以下玩具示例,其意图是将偶数发送到一个进程,将奇数发送到另一个进程.两个进程都是相同的,它们只是将数字附加到列表中.

from multiprocessing import Process, Queue, Pipe

slist=['even','odd']

Q={}
Q['even'] = Queue()
Q['odd'] = Queue()

ev,od = [],[]

Q['even'].put(ev)
Q['odd'].put(od)

P={}
P['even'] = Pipe()
P['odd'] = Pipe()



def add_num(s):
    """ The worker function, invoked in a process. The results are placed in
        a list that's pushed to a queue."""
#    while True :
    if not P[s][1].recv():
        print s,'- do nothing'

    else:            
        d = Q[s].get()
        print d
        d.append(P[s][1].recv())
        Q[s].put(d)
        print Q[s].get()
        P[s][0].send(False)
        print …
Run Code Online (Sandbox Code Playgroud)

python multiprocessing while-loop python-2.7 python-multiprocessing

10
推荐指数
1
解决办法
1万
查看次数

在ZeroMQ中调用recv_pyobj()时如何添加主题过滤器?

ZeroMQ提供了很好的文档,说明如何使用主题过滤器设置pub-sub模式,如api文档中所述.为方便起见, ZeroMQ还提供方法socket.send_json()socket.send_pyobj()(和recv对应方).

在pub-sub示例中,主题过滤器(字符串)附加到消息的开头(也是字符串).使用内置序列化时,有没有办法设置主题过滤器?如果我发送dictClass使用send_pyobj()我不能在它前面附加一个字符串.

python zeromq pyzmq

7
推荐指数
1
解决办法
1239
查看次数

无需远程创建 git 子模块

我想sub在我的main存储库中有一个存储库,它可以从与我的main. 所以我想要一个子模块。但我想在本地创建子模块,然后将其推送到远程。它还不存在于任何地方。

我见过的所有示例都涉及将远程存储库拉入子模块,这就是子模块的创建方式

git submodule add git@github.com:url_to/awesome_submodule.git path_to_awesome_submodule
Run Code Online (Sandbox Code Playgroud)

有没有办法创建子模块,进行一些提交,然后将它们推送到有问题的远程存储库?

git version-control github repository git-subrepo

6
推荐指数
1
解决办法
1108
查看次数

如何在 zmq 中处理同一端口上的多个发布者?

这个问题之前已经被问过,在这里。我有完全一样的问题。我想从一堆不同的进程进行发布,并每次都使用相同的端口。

我尝试了答案中提出的解决方案,但这对我不起作用。我收到错误

    File "/usr/local/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/local/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/home/akay/afk/multi.py", line 18, in to_zmq
    socket.connect("tcp://*:%s" % port)
  File "zmq/backend/cython/socket.pyx", line 478, in zmq.backend.cython.socket.Socket.connect (zmq/backend/cython/socket.c:4308)
ZMQError: Invalid argument
Run Code Online (Sandbox Code Playgroud)

我的代码是这样的,基本上直接取自 zmq 文档中示例

# Socket to talk to server
port = '5556'
context = zmq.Context()
socket = context.socket(zmq.SUB)
print "Listening for stream...", m
socket.bind("tcp://localhost:%s" % port) #change connect to bind, as per answer above
socket.setsockopt(zmq.SUBSCRIBE, topicfilter)
Run Code Online (Sandbox Code Playgroud)

我正在使用 python 2.7 …

python sockets port zeromq pyzmq

5
推荐指数
1
解决办法
5580
查看次数

从快速修复消息中获取字段

我正在使用带有 python 的 quickfix。查看此处的文档页面告诉我们如何获取字段。假设 a message = fix.message(with quickfix as fix) 来自交易对手。我可以通过调用获得 35 (MsgType) 字段

message.getHeader().getField(fix.MsgType())

例如,它返回 35=X.

我的问题是:是否有任何方法可以返回X?或者我是否必须切片所有内容(例如35=X[3:],返回X)并因此知道所有字符串的长度?

python quickfix fix-protocol

5
推荐指数
1
解决办法
4338
查看次数

quickfix:如何从消息中获取符号(标志55)?

我正在使用Python API运行QuickFix,并使用FIX4.2连接到TT FIX适配器

我正在登录并发送针对两种工具的市场数据请求。这样就可以正常工作,并且可以从仪器中获得预期的数据。我可以从消息中获取各种信息。

但是,我在获取符号(标志55)字段时遇到麻烦。

    import quickfix as fix

    def fromApp(self, message, sessionID):

        ID = fix.Symbol()
        message.getField(ID)
        print ID
Run Code Online (Sandbox Code Playgroud)

这适用于我收到的第一条消息[初始市场数据快照(标志35 = W)]。一旦开始进行增量刷新(标志35 = X),就无法再获取“符号”字段。每条到达的消息都会导致“未找到字段”错误。

这使我感到困惑,因为在日志中,无论消息类型是W还是X,符号字段始终存在。

认为符号在刷新消息的标题中,我尝试get.Field(ID)在35 = W和get.Header().getField(ID)35 = X时使用,但是这不起作用。

有人可以帮我弄清楚这里发生了什么吗?我希望能够明确告诉我的计算机正在查看什么仪器。

谢谢

python quickfix fix-protocol

2
推荐指数
1
解决办法
1314
查看次数

Quickfix 无法读取重复组

我在 Windows 中使用 Quickfix 和 python 绑定。我过去曾能够提出市场数据请求。我最近更换了不同的 API 提供商(Cunningham,又名 CTS),并且遇到了很多问题。然而,至少其中之一似乎是 Quickfix 内部的。这让我很困惑。

\n\n

当我发送市场数据请求时,我会收到回复。这是典型的 35=W 消息,市场快照。

\n\n

Quickfix 正在拒绝此消息,因为标签 269 出现多次!

\n\n

当然,标签269是MDEntryType,它应该出现多次。另请注意,定义了标签 268 NoMDEntries,它表示该组中有 21 个条目。

\n\n

我认为这是 QuickFix 的内部原因,因为 QuickFix 正在生成错误消息并将其发送回 CTS。此外,此错误会在消息传递给函数之前中止消息fromApp。(我知道,因为我的解析器在调用时将自己应用于消息,fromApp甚至没有收到此消息)。

\n\n

有任何想法吗?消息如下。

\n\n

(编辑 - 我已经关闭了配置文件中的数据字典 - 它与此有什么关系吗?)

\n\n

<20140915-22:39:11.953,FIX.4.2:XXXXX->CTS,传入>\n (8=FIX.4.2 \xe2\x98\xba 9=836 \xe2\x98\xba 35=W \xe2\x98 \xba 34=4 \xe2\x98\xba 49=CTS \xe2\x98\xba 56=XXXXX \xe2\x98\xba 52=20140915-22:39:11.963 \xe2\x98\xba 48=XDLCM\nE_F ZN (Z14) \xe2\x98\xba 387=2559 \xe2\x98\xba 965=2 \xe2\x98\xba 268=21 \xe2\x98\xba 269=0 \xe2\x98\xba 270=124156250 \xe2 \x98\xba …

python quickfix fix-protocol

1
推荐指数
1
解决办法
2038
查看次数