joh*_*ohn 9 java multithreading design-patterns zeromq data-structures
我正在开发一个项目,我需要消耗大量的记录然后将这些记录发送到其他使用ZeroMQ的系统.
这是流程:
我不确定在我的下面的场景中最小化这个的最佳方法是什么.
下面是我的Processor类,其中一个.add()方法将由多个线程调用dataHolderByPartitionReference,以线程安全的方式填充CHM.然后,在Processor类的构造函数中,我启动后台线程,每30秒运行一次,通过调用SendToZeroMQ类将记录从同一个CHM推送到一组ZeroMQ服务器,如下所示:
Processor
public class Processor {
private final ScheduledExecutorService executorService = Executors
.newSingleThreadScheduledExecutor();
private final AtomicReference<ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>>> dataHolderByPartitionReference =
new AtomicReference<>(new ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>>());
private static class Holder {
private static final Processor INSTANCE = new Processor();
}
public static Processor getInstance() {
return Holder.INSTANCE;
}
private Processor() {
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
validateAndSendAllPartitions(dataHolderByPartitionReference
.getAndSet(new ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>>()));
}
}, 0, 30, TimeUnit.SECONDS);
}
private void validateAndSendAllPartitions(
ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>> dataHolderByPartition) {
// calling validateAndSend in parallel for each partition (which is map key)
// generally there will be only 5-6 unique partitions max
}
private void validateAndSend(final int partition,
final ConcurrentLinkedQueue<DataHolder> dataHolders) {
Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder = new HashMap<>();
int totalSize = 0;
while (!dataHolders.isEmpty()) {
.........
.........
SendToZeroMQ.getInstance().executeAsync(partition, clientKeyBytesAndProcessBytesHolder);
}
// calling again with remaining values
SendToZeroMQ.getInstance().executeAsync(partition, clientKeyBytesAndProcessBytesHolder);
}
// called by multiple threads to populate dataHolderByPartitionReference CHM
public void add(final int partition, final DataHolder holder) {
// store records in dataHolderByPartitionReference in a thread safe way
}
}
Run Code Online (Sandbox Code Playgroud)
以下是我的SendToZeroMQ类,它将记录发送到一组ZeroMQ服务器,并相应地重试,具体取决于确认传递.
ResponsePoller线程,它会一直运行,看看哪些记录已经被确认(我们之前已经发送过),所以只要记录被确认,ResponsePoller线程就会从retryBucket中删除这些记录,这样就不会得到这些记录.重试.SendToZeroMQ
public class SendToZeroMQ {
// do I need these two ScheduledExecutorService or one is sufficient to start my both the thread?
private final ScheduledExecutorService executorServicePoller = Executors
.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService executorService = Executors
.newSingleThreadScheduledExecutor();
private final Cache<Long, byte[]> retryBucket = CacheBuilder.newBuilder().maximumSize(10000000)
.removalListener(RemovalListeners.asynchronous(new CustomListener(), executorService))
.build();
private static class Holder {
private static final SendToZeroMQ INSTANCE = new SendToZeroMQ();
}
public static SendToZeroMQ getInstance() {
return Holder.INSTANCE;
}
private SendToZeroMQ() {
executorServicePoller.submit(new ResponsePoller());
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
for (Entry<Long, byte[]> entry : retryBucket.asMap().entrySet()) {
executeAsync(entry.getKey(), entry.getValue());
}
}
}, 0, 1, TimeUnit.MINUTES);
}
public boolean executeAsync(final long address, final byte[] encodedByteArray) {
Optional<ZMQObj> liveSockets = PoolManager.getInstance().getNextSocket();
if (!liveSockets.isPresent()) {
return false;
}
return executeAsync(address, encodedByteArray, liveSockets.get().getSocket());
}
public boolean executeAsync(final long address, final byte[] encodedByteArray, final Socket socket) {
ZMsg msg = new ZMsg();
msg.add(encodedByteArray);
boolean sent = msg.send(socket);
msg.destroy();
// add to retry bucket
retryBucket.put(address, encodedByteArray);
return sent;
}
public boolean executeAsync(final int partition,
final Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder) {
Optional<ZMQObj> liveSockets = PoolManager.getInstance().getNextSocket();
if (!liveSockets.isPresent()) {
return false;
}
Map<Long, byte[]> addressToencodedByteArray = encode(partition, clientKeyBytesAndProcessBytesHolder);
long address = addressToencodedByteArray.entrySet().iterator().next().getKey();
byte[] encodedByteArray = addressToencodedByteArray.entrySet().iterator().next().getValue();
return executeAsync(address, encodedByteArray, liveSockets.get().getSocket());
}
private Map<Long, byte[]> encode(final int partition,
final Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder) {
// this address will be unique always
long address = TestUtils.getAddress();
Frame frame = new Frame(............);
byte[] packedByteArray = frame.serialize();
// this map will always have one entry in it.
return ImmutableMap.of(address, packedByteArray);
}
public void removeFromRetryBucket(final long address) {
retryBucket.invalidate(address);
}
}
Run Code Online (Sandbox Code Playgroud)
下面是我的ResponsePoller类,它等待所有那些记录的确认,这些记录已经由其他后台线程发送.如果收到确认,则将其从重试存储桶中删除,以便不会重试.
public class ResponsePoller implements Runnable {
private static final Random random = new Random();
private static final int listenerPort = 8076;
@Override
public void run() {
ZContext ctx = new ZContext();
Socket client = ctx.createSocket(ZMQ.PULL);
// Set random identity to make tracing easier
String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
client.setIdentity(identity.getBytes(ZMQ.CHARSET));
client.bind("tcp://" + TestUtils.getIPAddress() + ":" + listenerPort);
PollItem[] items = new PollItem[] {new PollItem(client, Poller.POLLIN)};
while (!Thread.currentThread().isInterrupted()) {
// Tick once per second, pulling in arriving messages
for (int centitick = 0; centitick < 100; centitick++) {
ZMQ.poll(items, 10);
if (items[0].isReadable()) {
ZMsg msg = ZMsg.recvMsg(client);
Iterator<ZFrame> it = msg.iterator();
while (it.hasNext()) {
ZFrame frame = it.next();
try {
long address = TestUtils.getAddress(frame.getData());
// remove from retry bucket since we got the acknowledgment for this record
SendToZeroMQ.getInstance().removeFromRetryBucket(address);
} catch (Exception ex) {
// log error
} finally {
frame.destroy();
}
}
msg.destroy();
}
}
}
ctx.destroy();
}
}
Run Code Online (Sandbox Code Playgroud)
我试图从设计角度看,设计这个问题的最佳方法是什么,以便我的所有逻辑无缝地工作?
我很确定与我所拥有的相比,有更好的方法来设计这个问题 - 更好的方法是什么?
在我看来,只要您使用 TCP 进行底层通信,您就不必担心“应用层”的数据接收确认。
在这种情况下 - 由于 ZeroMQ 是建立在 TCP 本身之上,经过进一步优化,您不必担心数据传输是否成功,只要传输层没有异常(显然会反弹给您处理该情况) )。
我看到你的问题的方式是 - 你正在运行 Kafka 消费者线程,它将接收消息并将其反弹回另一个消息队列(在本例中是 ZMQ,它使用TCP并保证成功的消息传递,或者在较低的位置抛出异常通信层)。
我能想到的最简单的解决方案是在每个消费者内部使用线程池,并尝试使用 ZMQ 发送消息。在任何网络错误的情况下,只要应用程序守护程序正在运行,您就可以轻松地汇集该消息以供以后使用或记录。
在建议的解决方案中,我假设消息的顺序不在问题空间中。而且你并没有把事情复杂化。