根据它们在自己的池中是活还是死而释放一个套接字?

joh*_*ohn 16 java multithreading design-patterns thread-safety atomicity

我使用下面的类通过同步或异步使用套接字将数据发送到我们的消息队列,如下所示.这取决于我是否要调用同步或异步方法在套接字上发送数据的要求.大多数情况下,我们将异步发送数据,但有时我可能需要同步发送数据.

  • sendAsync - 它以异步方式发送数据,我们不会阻塞正在发送数据的线程.如果未收到确认,则它将再次从SendToQueue仅在构造函数中启动的后台线程重试.
  • send - 它在套接字上同步发送数据.它在内部调用doSendAsync方法,然后在特定的超时时间内休眠,如果没有收到确认,则从cache桶中删除,这样我们就不会再次重试.

因此,上述两种方法之间的唯一区别是 - 对于异步情况,如果没有收到确认,我需要不惜一切代价重试但是对于同步我根本不需要重试,这就是为什么我在PendingMessage类中存储更多状态的原因.

ResponsePoller是一个类,它接收发送到特定套接字上的消息队列的数据的确认,然后调用handleAckReceived下面的方法删除地址,以便我们在收到确认后不重试.如果收到确认,那么套接字是活的,否则它已经死了.

public class SendToQueue {
  private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
  private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder()
          .maximumSize(1000000)
          .concurrencyLevel(100)
          .build();

  private static class PendingMessage {
    private final long _address;
    private final byte[] _encodedRecords;
    private final boolean _retryEnabled;
    private final Object _monitor = new Object();
    private long _sendTimeMillis;
    private volatile boolean _acknowledged;

    public PendingMessage(long address, byte[] encodedRecords, boolean retryEnabled) {
      _address = address;
      _sendTimeMillis = System.currentTimeMillis();
      _encodedRecords = encodedRecords;
      _retryEnabled = retryEnabled;
    }

    public synchronized boolean hasExpired() {
      return System.currentTimeMillis() - _sendTimeMillis > 500L;
    }

    public synchronized void markResent() {
      _sendTimeMillis = System.currentTimeMillis();
    }

    public boolean shouldRetry() {
      return _retryEnabled && !_acknowledged;
    }

    public boolean waitForAck() {
      try {
        synchronized (_monitor) {
          _monitor.wait(500L);
        }
        return _acknowledged;
      } catch (InterruptedException ie) {
        return false;
      }
    }

    public void ackReceived() {
      _acknowledged = true;
      synchronized (_monitor) {
        _monitor.notifyAll();
      }
    }

    public long getAddress() {
      return _address;
    }

    public byte[] getEncodedRecords() {
      return _encodedRecords;
    }
  }

  private static class Holder {
    private static final SendToQueue INSTANCE = new SendToQueue();
  }

  public static SendToQueue getInstance() {
    return Holder.INSTANCE;
  }

  private void handleRetries() {
    List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
    for (PendingMessage m : messages) {
      if (m.hasExpired()) {
        if (m.shouldRetry()) {
          m.markResent();
          doSendAsync(m, Optional.<Socket>absent());
        } else {
          cache.invalidate(m.getAddress());
        }
      }
    }
  }

  private SendToQueue() {
    executorService.submit(new ResponsePoller()); // another thread which receives acknowledgment
                                                  // and then delete entry from the cache
                                                  // accordingly.
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        handleRetries();
      }
    }, 0, 1, TimeUnit.SECONDS);
  }

  public boolean sendAsync(final long address, final byte[] encodedRecords) {
    PendingMessage m = new PendingMessage(address, encodedRecords, true);
    cache.put(address, m);
    return doSendAsync(m, Optional.<Socket>absent());
  }

  private boolean doSendAsync(final PendingMessage pendingMessage, final Optional<Socket> socket) {
    Optional<Socket> actualSocket = socket;
    if (!actualSocket.isPresent()) {
      SocketHolder liveSocket = SocketManager.getInstance().getSocket();
      actualSocket = Optional.of(liveSocket.getSocket());
    }

    ZMsg msg = new ZMsg();
    msg.add(pendingMessage.getEncodedRecords());
    try {
      return msg.send(actualSocket.get());
    } finally {
      msg.destroy();
    }
  }

  public boolean send(final long address, final byte[] encodedRecords) {
    return send(address, encodedRecords, Optional.<Socket>absent());
  }

  public boolean send(final long address, final byte[] encodedRecords,
      final Optional<Socket> socket) {
    PendingMessage m = new PendingMessage(address, encodedRecords, false);
    cache.put(address, m);
    try {
      if (doSendAsync(m, socket)) {
        return m.waitForAck();
      }
      return false;
    } finally {
      cache.invalidate(address);
    }
  }   

  // called by acknowledgment thread which is in "ResponsePoller" class
  public void handleAckReceived(final long address) {
    PendingMessage m = cache.getIfPresent(address);
    if (m != null) {
      m.ackReceived();
      cache.invalidate(address);
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

当我在套接字上发送数据时,如果我收到相同数据的确认,则表示Socket处于活动状态,但如果数据未返回确认,则表示套接字已死(但我将继续重试发送数据).

因此,使用我的上述设计(或者如果有更好的方法),我怎么能弄清楚是否有任何套接字死或者是因为未收到确认或者是从该套接字接收到的,并且基于此我需要释放套接字通过调用以下方法返回其池(无论是活还是死),具体取决于是否接收到同步或异步情况的确认.

我还需要配置计数,如果在x的特定套接字上没有收到确认(其中x是数字> 0,默认值应该是2)次,则只标记套接字死机.做这件事的最好和最有效的方法是什么?

SocketManager.getInstance().releaseSocket(socket, SocketState.LIVE);
SocketManager.getInstance().releaseSocket(socket, SocketState.DEAD);
Run Code Online (Sandbox Code Playgroud)

Mic*_*ski 1

假设您在家,笔记本电脑和路由器已插入电缆,路由器后面有电缆调制解调器。如果您关闭路由器 - 您的笔记本电脑会知道 - 没有电压。如果你关闭调制解调器……那就很棘手了。你根本不可能知道这一点。一个潜在的问题是没有到主机的路由。但即使您已连接,也可能是其他问题。一些协议(例如 ssh)内置了 ping,因此它们具有保持连接的状态。如果您的应用程序在每个时间间隔都不执行任何操作,那么客户端和服务器之间就会出现乒乓球现象,因此您知道它是否还活着。

如果您对协议有完全控制权 - keep-alive 是选项之一。

您的客户处于一端,但一般来说,很难让两方确保达成协议。拜占庭将军问题描述了一般网络模型,其中每个节点不知道任何其他节点,并且只能信任所知道的节点。

一般来说我不会自己写分布式系统。我正在使用Hystrix来实现这一点。链接是他们的配置,所以你可以看到它有多大。您可以跟踪服务器是否正在工作。此外,当它再次回来时,您可以准备策略来弄清楚它,而不是淹没它,取消过时的消息,等等 - 图表、统计数据、与其他解决方案的集成。有很大的社区在使用它并解决问题。这是比自己做更好的选择。

不确定您是否只有这一项服务可以使用,或者 Hystrix 是否适合您。在Java中,人们在处理问题时倾向于使用层、框架......希望它有所帮助。