我有一个客户端,其代码我无法更改 - 但我想(重新)使用ZeroMQ套接字编写.
客户端使用TCP原始UDP套接字和原始套接字.
我知道我可以ZMQ_ROUTER_RAW用于原始TCP套接字,但原始UDP数据流呢?
我们看到ZeroMQ上有一个奇怪且无法解释的现象,Windows 7通过TCP发送消息.(或者inproc,因为ZeroMQ在Windows内部使用TCP进行信令).
这种现象是前500条消息越来越慢,延迟越来越慢.然后,除了由CPU /网络争用引起的峰值之外,延迟下降和消息一直快速到达.
这里描述了这个问题:https://github.com/zeromq/libzmq/issues/1608
它始终是500条消息.如果我们发送没有延迟,那么消息被批处理,所以我们看到这个现象延伸了几千个发送.如果我们在发送之间延迟,我们会更清楚地看到图表.即使在发送之间延迟多达50-100毫秒也不会改变事情.
邮件大小也无关紧要.我已经测试了10字节消息和10K消息,结果相同.
最大延迟始终为2毫秒(2,000 usec).
在Linux机器上,我们没有看到这种现象.
我们想要做的是消除这个初始曲线,因此消息与正常的低延迟(大约20-100 usec)保持新的连接.
更新:该问题不会在Windows 10和8上显示.它似乎只发生在Windows 7上.
使用ZeroMQ .Context和.Socket实例,我能够推送/拉取消息
,例如在我的代码下方为队列设置:
ZMQ.Context context = ZMQ.context(1);
// Socket to send messages on
ZMQ.Socket sender = context.socket(ZMQ.PUSH);
sender.bind("tcp://*:5557");
// Send messages
sender.send("0", 0);
ZMQ.Socket receiver = context.socket(ZMQ.PULL);
receiver.connect("tcp://localhost:5557");
// receive messages
String string = new String(receiver.recv(0)).trim();
Run Code Online (Sandbox Code Playgroud)
Q1:如何在队列中实现主用/备用模式?
我的意思是将为一个主机和端口创建2个队列,如果一个队列(活动)发生故障,另一个(即备用)队列将立即启动以侦听/拉取消息.
任何实现它的示例或指导都会更有帮助.
Q2:是否有内置类来执行此类任务?
是否有某种规范或其他解释描述了正常情况,在这种情况下,对于每种通信类型,您可以预期ZeroMQ套接字上发送的消息不会被(所有)侦听进程接收?
例如,我有一个实验程序,它基本上假设PUB套接字的所有订阅者都接收在该套接字上发送的所有消息(在初始化握手之后).我想了解这种假设可能是错误的情况.谢谢,对不起,如果已经问过这个问题.
我已经阅读了0MQ指南,并且我理解了基本的套接字类型:PUSH/ PULL,REQ/ REP和PUB/ SUB.
关于ROUTER/ DEALER和X--sockets(例如,XSUB/ XPUB,XREQ/ XREP)我很困惑.
这些套接字类型有哪些用例?
我最近遇到一个要求,即我有一个.fit()训练有素的scikit-learn SVC分类器实例并且需要.predict()很多实例。
有没有办法.predict()通过任何scikit-learn内置工具仅并行化此方法?
from sklearn import svm
data_train = [[0,2,3],[1,2,3],[4,2,3]]
targets_train = [0,1,0]
clf = svm.SVC(kernel='rbf', degree=3, C=10, gamma=0.3, probability=True)
clf.fit(data_train, targets_train)
# this can be very large (~ a million records)
to_be_predicted = [[1,3,4]]
clf.predict(to_be_predicted)
Run Code Online (Sandbox Code Playgroud)
如果有人确实知道解决方案,如果您能分享它,我会非常高兴。
我编译了python包装器,nanomsg我想为包创建一个python安装程序.
可以通过运行来创建包
python setup.py bdist --format=wininst
Run Code Online (Sandbox Code Playgroud)
但是,我想nanomsg.dll/nanomsg.so包含在安装程序/包中,但我没有找到有关此问题的任何文档.
阅读关于线程安全的ZeroMQ常见问题时偶然发现.
我的多线程程序在ZeroMQ库中的奇怪位置不断崩溃.我究竟做错了什么?
ZeroMQ套接字不是线程安全的."指南"中对此进行了详细介绍.
简短的版本是不应该在线程之间共享套接字.我们建议为每个线程创建一个专用套接字.
对于每个线程的专用套接字不可行的情况,当且仅当每个线程在访问套接字之前执行完整的内存屏障时,才可以共享套接字.大多数语言都支持Mutex或Spinlock,它将代表您执行完整的内存屏障.
我的多线程程序在ZeroMQ库中的奇怪位置不断崩溃.
我究竟做错了什么?
以下是我的以下代码:
Celluloid::ZMQ.init
module Scp
module DataStore
class DataSocket
include Celluloid::ZMQ
def pull_socket(socket)
@read_socket = Socket::Pull.new.tap do |read_socket|
## IPC socket
read_socket.connect(socket)
end
end
def push_socket(socket)
@write_socket = Socket::Push.new.tap do |write_socket|
## IPC socket
write_socket.connect(socket)
end
end
def run
pull_socket and push_socket and loopify!
end
def loopify!
loop {
async.evaluate_response(read_socket.read_multipart)
}
end
def evaluate_response(data)
return_response(message_id,routing,Parser.parser(data))
end
def return_response(message_id,routing,object)
data = object.to_response
write_socket.send([message_id,routing,data])
end
end
end
end
DataSocket.new.run
Run Code Online (Sandbox Code Playgroud)
现在,有几件我不清楚的事情:
1) 假设 …
我使用SciPy fmin_bfgs()优化接收下一个警告NeuralNetwork.根据Backpropagation算法,一切都应该清晰简单.
1前馈培训示例.
2计算每个单元的误差项.
3累积梯度(对于第一个例子,我正在跳过正则化项).
Starting Loss: 7.26524579601
Check gradient: 2.02493576268
Warning: Desired error not necessarily achieved due to precision loss.
Current function value: 5.741300
Iterations: 3
Function evaluations: 104
Gradient evaluations: 92
Trained Loss: 5.74130012926
Run Code Online (Sandbox Code Playgroud)
我刚刚用MATLAB完成了同样的任务,它成功执行fmin了优化功能,但无法理解,我在Python实现中错过了什么.正如你所看到的,甚至scipy.optimize.check_grad回报太大的价值.
def feed_forward(x, theta1, theta2):
hidden_dot = np.dot(add_bias(x), np.transpose(theta1))
hidden_p = sigmoid(hidden_dot)
p = sigmoid(np.dot(add_bias(hidden_p), np.transpose(theta2)))
return hidden_dot, hidden_p, p
def cost(thetas, x, y, hidden, lam):
theta1, theta2 = get_theta_from(thetas, x, y, hidden)
_, _, …Run Code Online (Sandbox Code Playgroud) 我有一个很大的句子列表(约 7 百万个),我想从中提取名词。
我使用joblib库来并行化提取过程,如下所示:
import spacy
from tqdm import tqdm
from joblib import Parallel, delayed
nlp = spacy.load('en_core_web_sm')
class nouns:
def get_nouns(self, text):
doc = nlp(u"{}".format(text))
return [token.text for token in doc if token.tag_ in ['NN', 'NNP', 'NNS', 'NNPS']]
def parallelize(self, sentences):
results = Parallel(n_jobs=1)(delayed(self.get_nouns)(sent) for sent in tqdm(sentences))
return results
if __name__ == '__main__':
sentences = ['we went to the school yesterday',
'The weather is really cold',
'Can we catch the dog?',
'How old are you John?', …Run Code Online (Sandbox Code Playgroud) zeromq ×6
python ×4
sockets ×2
celluloid ×1
concurrency ×1
distribute ×1
java ×1
jeromq ×1
joblib ×1
low-latency ×1
nanomsg ×1
python-3.x ×1
ruby ×1
scikit-learn ×1
scipy ×1
setuptools ×1
spacy ×1
tcp ×1
udp ×1
winsock ×1