发送记录并等待其确认接收

joh*_*ohn 10 java multithreading thread-safety race-condition guava

我使用下面的类通过使用socket以同步方式或异步方式将数据发送到我们的消息队列,如下所示.

  • sendAsync - 它不间断地异步发送数据.发送后,(on LINE A)它会添加到retryHolder存储桶,这样如果没有收到确认,那么它将再次从构造函数中启动的后台线程重试.
  • send- 它在内部调用sendAsync方法,然后在特定的超时时间内休眠,如果没有收到确认,则从retryHolder桶中删除,以便我们不再重试.

所以上述两种方法之间的唯一区别是 - 对于异步,我需要不惜一切代价重试,但是对于同步,我不需要重试但看起来可能会重试,因为我们共享相同的重试桶缓存并重试线程运行每1秒钟.

ResponsePoller是一个类,它接收发送到我们的消息队列的数据的确认,然后调用removeFromretryHolder下面的方法删除地址,以便我们在收到确认后不重试.

public class SendToQueue {
  private final ExecutorService cleanupExecutor = Executors.newFixedThreadPool(5);
  private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
  private final Cache<Long, byte[]> retryHolder =
      CacheBuilder
          .newBuilder()
          .maximumSize(1000000)
          .concurrencyLevel(100)
          .removalListener(
              RemovalListeners.asynchronous(new LoggingRemovalListener(), cleanupExecutor)).build();

  private static class Holder {
    private static final SendToQueue INSTANCE = new SendToQueue();
  }

  public static SendToQueue getInstance() {
    return Holder.INSTANCE;
  }

  private SendToQueue() {
    executorService.submit(new ResponsePoller()); // another thread which receives acknowledgement and then delete entry from the `retryHolder` cache accordingly.
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        // retry again
        for (Entry<Long, byte[]> entry : retryHolder.asMap().entrySet()) {
          sendAsync(entry.getKey(), entry.getValue());
        }
      }
    }, 0, 1, TimeUnit.SECONDS);
  }

  public boolean sendAsync(final long address, final byte[] encodedRecords, final Socket socket) {
    ZMsg msg = new ZMsg();
    msg.add(encodedRecords);
    // send data on a socket LINE A
    boolean sent = msg.send(socket);
    msg.destroy();
    retryHolder.put(address, encodedRecords);
    return sent;
  }

  public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
    boolean sent = sendAsync(address, encodedRecords, socket);
    // if the record was sent successfully, then only sleep for timeout period
    if (sent) {
      try {
        TimeUnit.MILLISECONDS.sleep(500);
      } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
      }
    }
    // if key is not present, then acknowledgement was received successfully
    sent = !retryHolder.asMap().containsKey(address);
    // and key is still present in the cache, then it means acknowledgment was not received after
    // waiting for timeout period, so we will remove it from cache.
    if (!sent)
      removeFromretryHolder(address);
    return sent;
  }

  public void removeFromretryHolder(final long address) {
    retryHolder.invalidate(address);
  }
}
Run Code Online (Sandbox Code Playgroud)

如果有人调用send方法我们不重试的最佳方式是什么,但我们仍然需要知道是否收到了确认.唯一的问题是我根本不需要重试.

对于所有同步调用,我们是否需要单独的存储桶才能确认,我们不会从该存储桶重试?

ewr*_*ner 2

该代码存在许多潜在问题:

  • 在致电 之前可能会收到答复retryHolder#put
  • 重试消息时也可能存在竞争条件。
  • 如果两条消息发送到同一地址,第二条消息会覆盖第一条消息吗?
  • 发送总是浪费时间睡觉,请使用wait+notify代替。

我会存储一个具有更多状态的类。retryIfNoAnswer它可以包含重试处理程序可以检查的标志(是/否)。它可以提供使用waitForAnswer/的/markAnswerReceived方法,以便发送不必休眠固定时间。如果获得答案,该方法可以返回 true,如果超时则返回 false。在发送之前将对象放入重试处理程序中,并使用时间戳,以便仅重试早于特定年龄的消息。这修复了第一个竞争条件。waitnotifywaitForAnswer

编辑:更新了下面的示例代码,与您的代码一起编译,未经测试:

public class SendToQueue {
private final ExecutorService cleanupExecutor = Executors.newFixedThreadPool(5);
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);

// Not sure why you are using a cache rather than a standard ConcurrentHashMap?
private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
    .concurrencyLevel(100)
    .removalListener(RemovalListeners.asynchronous(new LoggingRemovalListener(), cleanupExecutor)).build();

private static class PendingMessage {
    private final long _address;
    private final byte[] _encodedRecords;
    private final Socket _socket;
    private final boolean _retryEnabled;
    private final Object _monitor = new Object();
    private long _sendTimeMillis;
    private volatile boolean _acknowledged;

    public PendingMessage(long address, byte[] encodedRecords, Socket socket, boolean retryEnabled) {
        _address = address;
        _sendTimeMillis = System.currentTimeMillis();
        _encodedRecords = encodedRecords;
        _socket = socket;
        _retryEnabled = retryEnabled;
    }

    public synchronized boolean hasExpired() {
        return System.currentTimeMillis() - _sendTimeMillis > 500L;
    }

    public synchronized void markResent() {
        _sendTimeMillis = System.currentTimeMillis();
    }

    public boolean shouldRetry() {
        return _retryEnabled && !_acknowledged;
    }

    public boolean waitForAck() {
        try {
            synchronized(_monitor) {
                _monitor.wait(500L);
            }
            return _acknowledged;
        }
        catch (InterruptedException e) {
            return false;
        }
    }

    public void ackReceived() {
        _acknowledged = true;
        synchronized(_monitor) {
            _monitor.notifyAll();
        }
    }

    public long getAddress() {
        return _address;
    }

    public byte[] getEncodedRecords() {
        return _encodedRecords;
    }

    public Socket getSocket() {
        return _socket;
    }
}

private static class Holder {
    private static final SendToQueue INSTANCE = new SendToQueue();
}

public static SendToQueue getInstance() {
    return Holder.INSTANCE;
}

private void handleRetries() {
    List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
    for (PendingMessage m : messages) {
        if (m.hasExpired()) {
            if (m.shouldRetry()) {
                m.markResent();
                doSendAsync(m, m.getSocket());
            }
            else {
                // Or leave the message and let send remove it
                cache.invalidate(m.getAddress());
            }
        }
    }
}

private SendToQueue() {
    executorService.submit(new ResponsePoller()); // another thread which receives acknowledgement and then delete entry from the cache accordingly.
    executorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            handleRetries();
        }
    }, 0, 1, TimeUnit.SECONDS);
}

public boolean sendAsync(final long address, final byte[] encodedRecords, final Socket socket) {
    PendingMessage m = new PendingMessage(address, encodedRecords, socket, true);
    cache.put(address, m);
    return doSendAsync(m, socket);
}

private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
    ZMsg msg = new ZMsg();
    msg.add(pendingMessage.getEncodedRecords());
    try {
        // send data on a socket LINE A
        return msg.send(socket);
    }
    finally {
        msg.destroy();
    }
}

public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
    PendingMessage m = new PendingMessage(address, encodedRecords, socket, false);
    cache.put(address, m);
    try {
        if (doSendAsync(m, socket)) {
            return m.waitForAck();
        }
        return false;
    }
    finally {
        // Alternatively (checks that address points to m):
        // cache.asMap().remove(address, m);
        cache.invalidate(address);
    }
}

public void handleAckReceived(final long address) {
    PendingMessage m = cache.getIfPresent(address);
    if (m != null) {
        m.ackReceived();
        cache.invalidate(address);
    }
}
}
Run Code Online (Sandbox Code Playgroud)

并从以下地方致电ResponsePoller

SendToQueue.getInstance().handleAckReceived(addressFrom);
Run Code Online (Sandbox Code Playgroud)