joh*_*ohn 10 java multithreading thread-safety zeromq jeromq
我在多线程环境中使用Jeromq,如下所示.下面是我的代码,其中构造函数首先SocketManager连接到所有可用的套接字,然后我将它们放在方法中的liveSocketsByDatacentermap中connectToZMQSockets.之后,我在同一个构造函数中启动一个后台线程,该构造函数每30秒运行一次,它调用updateLiveSockets方法来ping所有那些已经存在于liveSocketsByDatacentermap中的套接字并更新liveSocketsByDatacenter映射,看看这些套接字是否存活.
并且getNextSocket()多个读取器线程同时调用方法以获取下一个可用的套接字,然后我们使用该套接字在其上发送数据.所以我的问题是我们在多线程环境中正确使用Jeromq吗?因为在我们尝试将数据发送到该实时套接字时,我们刚刚在生产环境中看到了这个堆栈跟踪的异常,所以我不确定它是否是一个错误或其他什么?
java.lang.ArrayIndexOutOfBoundsException: 256
at zmq.YQueue.push(YQueue.java:97)
at zmq.YPipe.write(YPipe.java:47)
at zmq.Pipe.write(Pipe.java:232)
at zmq.LB.send(LB.java:83)
at zmq.Push.xsend(Push.java:48)
at zmq.SocketBase.send(SocketBase.java:590)
at org.zeromq.ZMQ$Socket.send(ZMQ.java:1271)
at org.zeromq.ZFrame.send(ZFrame.java:131)
at org.zeromq.ZFrame.sendAndKeep(ZFrame.java:146)
at org.zeromq.ZMsg.send(ZMsg.java:191)
at org.zeromq.ZMsg.send(ZMsg.java:163)
Run Code Online (Sandbox Code Playgroud)
以下是我的代码:
public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<>();
private final ZContext ctx = new ZContext();
private static class Holder {
private static final SocketManager instance = new SocketManager();
}
public static SocketManager getInstance() {
return Holder.instance;
}
private SocketManager() {
connectToZMQSockets();
scheduler.scheduleAtFixedRate(this::updateLiveSockets, 30, 30, TimeUnit.SECONDS);
}
// during startup, making a connection and populate once
private void connectToZMQSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets);
}
}
private List<SocketHolder> connect(List<String> addresses, int socketType) {
List<SocketHolder> socketList = new ArrayList<>();
for (String address : addresses) {
try {
Socket client = ctx.createSocket(socketType);
// Set random identity to make tracing easier
String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
client.setIdentity(identity.getBytes(ZMQ.CHARSET));
client.setTCPKeepAlive(1);
client.setSendTimeOut(7);
client.setLinger(0);
client.connect(address);
SocketHolder zmq = new SocketHolder(client, ctx, address, true);
socketList.add(zmq);
} catch (Exception ex) {
// log error
}
}
return socketList;
}
// this method will be called by multiple threads concurrently to get the next live socket
// is there any concurrency or thread safety issue or race condition here?
public Optional<SocketHolder> getNextSocket() {
for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
if (liveSocket.isPresent()) {
return liveSocket;
}
}
return Optional.absent();
}
private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
if (!CollectionUtils.isEmpty(listOfEndPoints)) {
// The list of live sockets
List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
for (SocketHolder obj : listOfEndPoints) {
if (obj.isLive()) {
liveOnly.add(obj);
}
}
if (!liveOnly.isEmpty()) {
// The list is not empty so we shuffle it an return the first element
return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
}
}
return Optional.absent();
}
// runs every 30 seconds to ping all the socket to make sure whether they are alive or not
private void updateLiveSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) { // LINE A
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
Message message = new Message(holder, Partition.COMMAND);
// pinging to see whether a socket is live or not
boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
boolean isLive = (status) ? true : false;
SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
}
}
}
Run Code Online (Sandbox Code Playgroud)
以下是我如何从多个读者线程同时使用类的getNextSocket()方法SocketManager:
// this method will be called from multiple threads
public boolean sendAsync(final long addr, final byte[] reco) {
Optional<SocketHolder> liveSockets = SocketManager.getInstance().getNextSocket();
return sendAsync(addr, reco, liveSockets.get().getSocket(), false);
}
public boolean sendAsync(final long addr, final byte[] reco, final Socket socket,
final boolean messageA) {
ZMsg msg = new ZMsg();
msg.add(reco);
boolean sent = msg.send(socket);
msg.destroy();
retryHolder.put(addr, reco);
return sent;
}
public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
boolean sent = sendAsync(address, encodedRecords, socket, true);
// if the record was sent successfully, then only sleep for timeout period
if (sent) {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
// ...
return sent;
}
Run Code Online (Sandbox Code Playgroud)
我认为这不是正确的.这似乎getNextSocket()可以返回0MQ socket来thread A.同时,计时器线程可以访问0MQ socket它以对其进行ping操作.在这种情况下thread A,计时器线程正在变异0MQ socket,这将导致问题.那么解决这个问题的最佳和有效方法是什么?
注意: SocketHolder是一个不可变类
更新:
我刚注意到同样的问题发生在我的另一个盒子上,ArrayIndexOutOfBoundsException但是这次它的71行号在"YQueue"文件中.唯一一致的是256总是.所以应该有一些与256相关的东西,我无法弄清楚256这是什么?
java.lang.ArrayIndexOutOfBoundsException: 256
at zmq.YQueue.backPos(YQueue.java:71)
at zmq.YPipe.write(YPipe.java:51)
at zmq.Pipe.write(Pipe.java:232)
at zmq.LB.send(LB.java:83)
at zmq.Push.xsend(Push.java:48)
at zmq.SocketBase.send(SocketBase.java:590)
at org.zeromq.ZMQ$Socket.send(ZMQ.java:1271)
at org.zeromq.ZFrame.send(ZFrame.java:131)
at org.zeromq.ZFrame.sendAndKeep(ZFrame.java:146)
at org.zeromq.ZMsg.send(ZMsg.java:191)
at org.zeromq.ZMsg.send(ZMsg.java:163)
Run Code Online (Sandbox Code Playgroud)
虽然 ZeroMQ 文档和 Pieter HINTJENS 的优秀著作“Code Connected. Volume 1”没有忘记尽可能提醒这一事实,但在线程之间返回甚至共享 ZeroMQ 套接字实例的想法时常出现。当然,类实例的方法可能会在其内部方法和属性中提供几乎“隐藏”的功能,但适当的设计工作应该毫无例外、没有任何借口地防止任何此类副作用。
如果有定量事实的合理支持,共享可能是通用实例的一种方式zmq.Context(),但是清晰的分布式系统设计可能存在于真正的多代理方案上,其中每个代理都运行自己的Context()引擎,经过微调根据配置和性能偏好的相应组合。
切勿共享 ZeroMQ 套接字。确实,从来没有。即使最新的发展开始承诺在不久的将来在这个方向上会发生一些变化。用共享来污染任何高性能、低延迟的分布式系统设计都是一个坏习惯。不共享是该领域的最佳设计原则。
是的,解决此问题的最佳且有效的方法是永远不要共享 ZeroMQ 套接字。
这意味着永远不要返回任何对象,其属性是 ZeroMQ 套接字(您主动构建并从类方法中以大量方式返回该对象.connect(){...}。在您的情况下,所有类方法似乎都被保留private,这可能会融合以下问题:允许“其他线程”接触类私有套接字实例,但同样的原则也必须在所有属性级别上得到认可,这样才能有效。最后,这种“熔断”得到了捷径并被 违反了,这
public static SocketManager getInstance()混杂
地为任何外部请求者提供直接访问共享 ZeroMQ 套接字的类私有实例的机会。
如果某些文档几乎在每一章中都明确警告不要共享这些内容,那么人们就不应该共享这些内容。
因此,重新设计方法,以便SocketManager获得更多的功能,因为它是类方法,它将执行嵌入的必备功能,以便明确防止任何外部世界线程接触不可共享的实例,如 ZeroMQ 出版物中所述。
接下来是资源清单:您的代码似乎每 30 秒重新检查一次所有感兴趣的数据中心的当前状态。这实际上每分钟创建两次新的 List 对象。虽然您可能推测性地让java Garbage Collector 来整理所有的thrash,但不会从任何地方进一步引用,但这对于从之前的重新检查运行中嵌入到 List-s 中的 ZeroMQ 相关对象来说不是一个好主意。ZeroMQ 对象仍然从Zcontext()ZeroMQContext()核心工厂实例化 I/O 线程内部引用,它也可以被视为 ZeroMQ 套接字库存资源管理器。因此,所有new创建的套接字实例不仅从java- 端获得外部句柄,而且还从(Z)Context(). 到目前为止,一切都很好。但是,在代码中的任何地方都看不到任何方法,该方法可以取消对象实例中的任何和所有 ZeroMQ 套接字,这些套接字已与java- 端解除关联,但仍然从(Z)Context()- 端引用。对分配的资源进行显式资源退役是一种公平的设计实践,对于有限或受到其他约束的资源来说更是如此。对于 { "cheap" | 执行此操作的方式可能有所不同。“昂贵”}-此类资源管理处理的维护成本(将 ZeroMQ 套接字实例作为一些轻量级“消耗品/一次性”处理的成本非常昂贵……但那是另一个故事了)。
因此,还添加一组适当的资源重用/资源拆除方法,这将使new创建的套接字总量回到您的控制之下(您的代码负责域内有多少套接字处理程序(Z)Context()) -of-resources-control 可能会被创建,并且必须保持被管理——无论是否有意)。
有人可能会反对,自动检测和(可能会很好地延迟)垃圾收集可能会带来一些“承诺”,但是,您的代码仍然负责适当的资源管理,甚至 LMAX 人员也永远不会获得如此勇敢的性能,如果他们依赖于来自标准GC的“承诺”。你的问题比 LMAX 顶级性能所面临的问题要严重得多。您的代码(到目前为止已发布)对 ZeroMQ 相关资源没有任何.close()作用.term()。在消费不受控制(分布式需求)的生态系统中,这是完全不可能的做法。你必须保护你的船不被超载超过你知道它可以安全处理和动态卸载每个箱子的极限,而这些箱子在“对岸”没有收件人。
这是队长(你的代码设计师)的责任。
如果没有明确告诉最低级别(ZeroMQ -floor)负责库存管理的水手Context()有些箱子要卸下,问题仍然是你的。标准gc命令链不会“自动”执行此操作,无论“承诺”看起来如何,它都不会。因此,要明确您的 ZeroMQ 资源管理,评估执行这些步骤的返回代码,并适当处理在代码显式控制下执行这些资源管理操作所引发的任何和所有异常。
较低(如果不是可实现的最低)资源利用率和较高(如果不是可实现的最高)性能是正确完成这项工作的奖励。LMAX 的人是一个很好的例子,他们的表现远远超出了标准的 java“承诺”,因此人们可以向最好的人学习。
声明的调用签名与使用的调用签名似乎不匹配:
虽然我在这一点上可能是错误的,因为我的大部分设计工作不是在java多态调用接口中,但签名中似乎存在不匹配,发布为:
private List<SocketHolder> connect( Datacenters dc, // 1-st
List<String> addresses, // 2-nd
int socketType // 3-rd
) {
... /* implementation */
}
Run Code Online (Sandbox Code Playgroud)
以及
实际的方法调用,
在connectToZMQSockets()方法内部调用:
List<SocketHolder> addedColoSockets = connect( entry.getValue(), // 1-st
ZMQ.PUSH // 2-nd
);
Run Code Online (Sandbox Code Playgroud)