不要同时在两个线程之间共享相同的套接字

joh*_*ohn 13 java concurrency multithreading synchronization thread-safety

我有大约60个套接字和20个线程,我想确保每个线程每次都在不同的套接字上工作,所以我不想在两个线程之间共享相同的套接字.

在我的SocketManager课堂上,我有一个后台线程,每60秒运行一次并调用updateLiveSockets()方法.在该updateLiveSockets()方法中,我迭代我拥有的所有套接字,然后通过调用类的send方法SendToQueue和响应的基础逐个ping它们,我将它们标记为活或死.在该updateLiveSockets()方法中,我总是需要迭代所有套接字并ping它们以检查它们是活的还是死的.

现在所有读者线程将同时调用类的getNextSocket()方法SocketManager以获得下一个可用的套接字以在该套接字上发送业务消息.所以我在套接字上发送了两种类型的消息:

  • 一个是ping套接字上的消息.这只是从类中的计时器线程调用updateLiveSockets()方法发送的SocketManager.
  • 其他是business套接字上的消息.这是在SendToQueue课堂上完成的.

因此,如果pinger线程正在ping一个套接字以检查它们是否存在,那么没有其他业务线程应该使用该套接字.类似地,如果业务线程使用套接字在其上发送数据,那么pinger线程不应该ping该套接字.这适用于所有套接字.但我需要确保在updateLiveSockets方法中,每当我的后台线程启动时,我们都会ping所有可用的套接字,以便我们可以确定哪个套接字是活的还是死的.

以下是我的SocketManager课程:

public class SocketManager {
  private static final Random random = new Random();
  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
  private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
      new ConcurrentHashMap<>();
  private final ZContext ctx = new ZContext();

  // ...

  private SocketManager() {
    connectToZMQSockets();
    scheduler.scheduleAtFixedRate(new Runnable() {
      public void run() {
        updateLiveSockets();
      }
    }, 60, 60, TimeUnit.SECONDS);
  }

  // during startup, making a connection and populate once
  private void connectToZMQSockets() {
    Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
    for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
      liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets);
    }
  }

  private List<SocketHolder> connect(List<String> paddes, int socketType) {
    List<SocketHolder> socketList = new ArrayList<>();
    // ....
    return socketList;
  }

  // this method will be called by multiple threads concurrently to get the next live socket
  // is there any concurrency or thread safety issue or race condition here?
  public Optional<SocketHolder> getNextSocket() {
    for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
      Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
      if (liveSocket.isPresent()) {
        return liveSocket;
      }
    }
    return Optional.absent();
  }

  private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
    if (!listOfEndPoints.isEmpty()) {
      // The list of live sockets
      List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
      for (SocketHolder obj : listOfEndPoints) {
        if (obj.isLive()) {
          liveOnly.add(obj);
        }
      }
      if (!liveOnly.isEmpty()) {
        // The list is not empty so we shuffle it an return the first element
        return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
      }
    }
    return Optional.absent();
  }

  // runs every 60 seconds to ping all the available socket to make sure whether they are alive or not
  private void updateLiveSockets() {
    Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;

    for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
      List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
      for (SocketHolder liveSocket : liveSockets) {
        Socket socket = liveSocket.getSocket();
        String endpoint = liveSocket.getEndpoint();
        Map<byte[], byte[]> holder = populateMap();
        Message message = new Message(holder, Partition.COMMAND);

        // pinging to see whether a socket is live or not
        boolean isLive = SendToQueue.getInstance().send(message.getAddress(), message.getEncodedRecords(), socket);
        SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
        liveUpdatedSockets.add(zmq);
      }
      liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

这是我的SendToQueue班级:

  // this method will be called by multiple reader threads (around 20) concurrently to send the data
  public boolean sendAsync(final long address, final byte[] encodedRecords) {
    PendingMessage m = new PendingMessage(address, encodedRecords, true);
    cache.put(address, m);
    return doSendAsync(m);
  }

  private boolean doSendAsync(final PendingMessage pendingMessage) {
    Optional<SocketHolder> liveSocket = SocketManager.getInstance().getNextSocket();
    if (!liveSocket.isPresent()) {
      // log error
      return false;
    }       
    ZMsg msg = new ZMsg();
    msg.add(pendingMessage.getEncodedRecords());
    try {
      // send data on a socket LINE A
      return msg.send(liveSocket.get().getSocket());
    } finally {
      msg.destroy();
    }
  }

  public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
    PendingMessage m = new PendingMessage(address, encodedRecords, socket, false);
    cache.put(address, m);
    try {
      if (doSendAsync(m, socket)) {
        return m.waitForAck();
      }
      return false;
    } finally {
      cache.invalidate(address);
    }
  }
Run Code Online (Sandbox Code Playgroud)

问题陈述

现在你可以看到我在两个线程之间共享相同的套接字.getNextSocket()SocketManager课堂上似乎可以回归0MQ socketThread A.同时,timer thread可以访问0MQ socket它来ping它.在这种情况下Thread A,timer thread正在变异0MQ socket,这可能会导致问题.所以我试图找到一种方法,以便我可以防止不同的线程同时向同一个套接字发送数据并破坏我的数据.

我能想到的一个解决方案是synchronization在发送数据时使用套接字,但如果许多线程使用相同的套接字,则资源利用率不高.此外,如果msg.send(socket);被阻止(技术上不应该),则阻止等待此套接字的所有线程.所以我想可能有更好的方法来确保每个线程同时使用不同的单个实时套接字而不是特定套接字上的同步.

Mou*_*aer 1

操作系统软件工程中有一个概念,称为临界区。当两个或多个进程共享数据并且并发执行时,就会出现临界区,在这种情况下,如果有另一个进程访问这些数据,则任何进程都不应修改甚至读取这些共享数据。因此,当一个进程进入临界区时,它应该通知所有其他并发执行的进程它当前正在修改临界区,因此所有其他进程都应该被阻止等待进入该临界区。你会问谁组织什么进程进入,这是另一个称为进程调度的问题,它控制什么进程应该进入这个关键部分,操作系统会为你做这件事。

所以对你来说最好的解决方案是使用一个信号量,其中信号量的值是套接字的数量,在你的情况下,我认为你有一个套接字,所以你将使用一个信号量-二进制信号量-用信号量值= 1初始化,那么你的代码应该分为四个主要部分:临界区入口临界区临界区退出剩余部分

  • 临界区入口:进程进入临界区并阻塞所有其他进程的地方。信号量将允许一个进程线程进入临界区-使用套接字-并且信号量的值将递减-等于零-。
  • 临界区:进程应该做的临界区代码。
  • 临界区退出:进程释放临界区以供其他进程进入。信号量值将递增——等于1——允许另一个进程进入
  • 剩余部分:除前 3 部分之外的所有代码的其余部分。

现在你只需打开任何有关信号量的 Java 教程就可以了解如何在 Java 中应用信号量,这非常简单。