如果没有收到确认,如何设计一个发送记录并重试发送它们的系统?

joh*_*ohn 9 java multithreading design-patterns zeromq data-structures

我正在开发一个项目,我需要消耗大量的记录然后将这些记录发送到其他使用ZeroMQ的系统.

这是流程:

  • 将所有传入记录存储在来自多个线程的CHM中.记录将以非常高的速度发生.
  • 从每1分钟运行一次的后台线程,将这些记录从CHM发送到ZeroMQ服务器.
  • 将每条记录发送到ZeroMQ服务器后,也将它们添加到重试存储桶中,以便在特定时间过后可以重试,如果尚未收到此记录的确认.
  • 我们还有一个poller runnable线程,它接收来自ZeroMQ服务器的确认,告知已收到这些记录,所以一旦我收到确认,我就会从重试桶中删除该记录,这样它就不会被重试.
  • 即使一些记录被发送两次,也没关系,但最好尽量减少这种情况.

我不确定在我的下面的场景中最小化这个的最佳方法是什么.

下面是我的Processor类,其中一个.add()方法将由多个线程调用dataHolderByPartitionReference,以线程安全的方式填充CHM.然后,在Processor类的构造函数中,我启动后台线程,每30秒运行一次,通过调用SendToZeroMQ类将记录从同一个CHM推送到一组ZeroMQ服务器,如下所示:


Processor

public class Processor {
  private final ScheduledExecutorService executorService = Executors
      .newSingleThreadScheduledExecutor();
  private final AtomicReference<ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>>> dataHolderByPartitionReference =
      new AtomicReference<>(new ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>>());

  private static class Holder {
    private static final Processor INSTANCE = new Processor();
  }

  public static Processor getInstance() {
    return Holder.INSTANCE;
  }

  private Processor() {
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        validateAndSendAllPartitions(dataHolderByPartitionReference
            .getAndSet(new ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>>()));
      }
    }, 0, 30, TimeUnit.SECONDS);
  }

  private void validateAndSendAllPartitions(
      ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>> dataHolderByPartition) {
        // calling validateAndSend in parallel for each partition (which is map key)
        // generally there will be only 5-6 unique partitions max
  }

  private void validateAndSend(final int partition,
      final ConcurrentLinkedQueue<DataHolder> dataHolders) {
    Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder = new HashMap<>();
    int totalSize = 0;
    while (!dataHolders.isEmpty()) {
        .........
        .........
        SendToZeroMQ.getInstance().executeAsync(partition, clientKeyBytesAndProcessBytesHolder);
    }
    // calling again with remaining values
    SendToZeroMQ.getInstance().executeAsync(partition, clientKeyBytesAndProcessBytesHolder);
  }

  // called by multiple threads to populate dataHolderByPartitionReference CHM
  public void add(final int partition, final DataHolder holder) {
    // store records in dataHolderByPartitionReference in a thread safe way
  }
}
Run Code Online (Sandbox Code Playgroud)

以下是我的SendToZeroMQ类,它将记录发送到一组ZeroMQ服务器,并相应地重试,具体取决于确认传递.

  • 首先,它将向ZeroMQ服务器发送记录.
  • 然后它将向retryBucket添加一条相同的记录,稍后将重试该记录,具体取决于是否收到确认.
  • 在同一个类中,我启动一个后台线程,每1分钟运行一次,再次发送记录,这些记录仍然在重试桶中.
  • 同一个类也会启动ResponsePoller线程,它会一直运行,看看哪些记录已经被确认(我们之前已经发送过),所以只要记录被确认,ResponsePoller线程就会从retryBucket中删除这些记录,这样就不会得到这些记录.重试.

SendToZeroMQ

public class SendToZeroMQ {
  // do I need these two ScheduledExecutorService or one is sufficient to start my both the thread?
  private final ScheduledExecutorService executorServicePoller = Executors
      .newSingleThreadScheduledExecutor();
  private final ScheduledExecutorService executorService = Executors
      .newSingleThreadScheduledExecutor();
  private final Cache<Long, byte[]> retryBucket = CacheBuilder.newBuilder().maximumSize(10000000)
      .removalListener(RemovalListeners.asynchronous(new CustomListener(), executorService))
      .build();

  private static class Holder {
    private static final SendToZeroMQ INSTANCE = new SendToZeroMQ();
  }

  public static SendToZeroMQ getInstance() {
    return Holder.INSTANCE;
  }

  private SendToZeroMQ() {
    executorServicePoller.submit(new ResponsePoller());
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        for (Entry<Long, byte[]> entry : retryBucket.asMap().entrySet()) {
          executeAsync(entry.getKey(), entry.getValue());
        }
      }
    }, 0, 1, TimeUnit.MINUTES);
  }

  public boolean executeAsync(final long address, final byte[] encodedByteArray) {
    Optional<ZMQObj> liveSockets = PoolManager.getInstance().getNextSocket();
    if (!liveSockets.isPresent()) {
      return false;
    }
    return executeAsync(address, encodedByteArray, liveSockets.get().getSocket());
  }

  public boolean executeAsync(final long address, final byte[] encodedByteArray, final Socket socket) {
    ZMsg msg = new ZMsg();
    msg.add(encodedByteArray);
    boolean sent = msg.send(socket);
    msg.destroy();
    // add to retry bucket
    retryBucket.put(address, encodedByteArray);
    return sent;
  }

  public boolean executeAsync(final int partition,
      final Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder) {
    Optional<ZMQObj> liveSockets = PoolManager.getInstance().getNextSocket();
    if (!liveSockets.isPresent()) {
      return false;
    }         
    Map<Long, byte[]> addressToencodedByteArray = encode(partition, clientKeyBytesAndProcessBytesHolder);
    long address = addressToencodedByteArray.entrySet().iterator().next().getKey();
    byte[] encodedByteArray = addressToencodedByteArray.entrySet().iterator().next().getValue();
    return executeAsync(address, encodedByteArray, liveSockets.get().getSocket());
  }

  private Map<Long, byte[]> encode(final int partition,
      final Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder) {

    // this address will be unique always
    long address = TestUtils.getAddress();
    Frame frame = new Frame(............);
    byte[] packedByteArray = frame.serialize();
    // this map will always have one entry in it.
    return ImmutableMap.of(address, packedByteArray);
  }

  public void removeFromRetryBucket(final long address) {
    retryBucket.invalidate(address);
  }
}
Run Code Online (Sandbox Code Playgroud)

下面是我的ResponsePoller类,它等待所有那些记录的确认,这些记录已经由其他后台线程发送.如果收到确认,则将其从重试存储桶中删除,以便不会重试.

public class ResponsePoller implements Runnable {
  private static final Random random = new Random();
  private static final int listenerPort = 8076;

  @Override
  public void run() {
    ZContext ctx = new ZContext();
    Socket client = ctx.createSocket(ZMQ.PULL);

    // Set random identity to make tracing easier
    String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
    client.setIdentity(identity.getBytes(ZMQ.CHARSET));
    client.bind("tcp://" + TestUtils.getIPAddress() + ":" + listenerPort);

    PollItem[] items = new PollItem[] {new PollItem(client, Poller.POLLIN)};

    while (!Thread.currentThread().isInterrupted()) {
      // Tick once per second, pulling in arriving messages
      for (int centitick = 0; centitick < 100; centitick++) {
        ZMQ.poll(items, 10);
        if (items[0].isReadable()) {
          ZMsg msg = ZMsg.recvMsg(client);
          Iterator<ZFrame> it = msg.iterator();
          while (it.hasNext()) {
            ZFrame frame = it.next();
            try {
              long address = TestUtils.getAddress(frame.getData());
              // remove from retry bucket since we got the acknowledgment for this record
              SendToZeroMQ.getInstance().removeFromRetryBucket(address);
            } catch (Exception ex) {
              // log error
            } finally {
              frame.destroy();
            }
          }
          msg.destroy();
        }
      }
    }
    ctx.destroy();
  }
}
Run Code Online (Sandbox Code Playgroud)

题:

  • 我试图从设计角度看,设计这个问题的最佳方法是什么,以便我的所有逻辑无缝地工作?

  • 我很确定与我所拥有的相比,有更好的方法来设计这个问题 - 更好的方法是什么?

Sid*_*agi 2

在我看来,只要您使用 TCP 进行底层通信,您就不必担心“应用层”的数据接收确认。

在这种情况下 - 由于 ZeroMQ 是建立在 TCP 本身之上,经过进一步优化,您不必担心数据传输是否成功,只要传输层没有异常(显然会反弹给您处理该情况) )。

我看到你的问题的方式是 - 你正在运行 Kafka 消费者线程,它将接收消息并将其反弹回另一个消息队列(在本例中是 ZMQ,它使用TCP并保证成功的消息传递,或者在较低的位置抛出异常通信层)。

我能想到的最简单的解决方案是在每个消费者内部使用线程池,并尝试使用 ZMQ 发送消息。在任何网络错误的情况下,只要应用程序守护程序正在运行,您就可以轻松地汇集该消息以供以后使用或记录。

在建议的解决方案中,我假设消息的顺序不在问题空间中。而且你并没有把事情复杂化。