joh*_*ohn 10 java multithreading thread-safety race-condition guava
我使用下面的类通过使用socket以同步方式或异步方式将数据发送到我们的消息队列,如下所示.
sendAsync - 它不间断地异步发送数据.发送后,(on LINE A)它会添加到retryHolder存储桶,这样如果没有收到确认,那么它将再次从构造函数中启动的后台线程重试.send- 它在内部调用sendAsync方法,然后在特定的超时时间内休眠,如果没有收到确认,则从retryHolder桶中删除,以便我们不再重试.所以上述两种方法之间的唯一区别是 - 对于异步,我需要不惜一切代价重试,但是对于同步,我不需要重试但看起来可能会重试,因为我们共享相同的重试桶缓存并重试线程运行每1秒钟.
ResponsePoller是一个类,它接收发送到我们的消息队列的数据的确认,然后调用removeFromretryHolder下面的方法删除地址,以便我们在收到确认后不重试.
public class SendToQueue {
private final ExecutorService cleanupExecutor = Executors.newFixedThreadPool(5);
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
private final Cache<Long, byte[]> retryHolder =
CacheBuilder
.newBuilder()
.maximumSize(1000000)
.concurrencyLevel(100)
.removalListener(
RemovalListeners.asynchronous(new LoggingRemovalListener(), cleanupExecutor)).build();
private static class Holder {
private static final SendToQueue INSTANCE = new SendToQueue();
}
public static SendToQueue getInstance() {
return Holder.INSTANCE;
}
private SendToQueue() {
executorService.submit(new ResponsePoller()); // another thread which receives acknowledgement and then delete entry from the `retryHolder` cache accordingly.
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
// retry again
for (Entry<Long, byte[]> entry : retryHolder.asMap().entrySet()) {
sendAsync(entry.getKey(), entry.getValue());
}
}
}, 0, 1, TimeUnit.SECONDS);
}
public boolean sendAsync(final long address, final byte[] encodedRecords, final Socket socket) {
ZMsg msg = new ZMsg();
msg.add(encodedRecords);
// send data on a socket LINE A
boolean sent = msg.send(socket);
msg.destroy();
retryHolder.put(address, encodedRecords);
return sent;
}
public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
boolean sent = sendAsync(address, encodedRecords, socket);
// if the record was sent successfully, then only sleep for timeout period
if (sent) {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
// if key is not present, then acknowledgement was received successfully
sent = !retryHolder.asMap().containsKey(address);
// and key is still present in the cache, then it means acknowledgment was not received after
// waiting for timeout period, so we will remove it from cache.
if (!sent)
removeFromretryHolder(address);
return sent;
}
public void removeFromretryHolder(final long address) {
retryHolder.invalidate(address);
}
}
Run Code Online (Sandbox Code Playgroud)
如果有人调用send方法我们不重试的最佳方式是什么,但我们仍然需要知道是否收到了确认.唯一的问题是我根本不需要重试.
对于所有同步调用,我们是否需要单独的存储桶才能确认,我们不会从该存储桶重试?
该代码存在许多潜在问题:
retryHolder#put。wait+notify代替。我会存储一个具有更多状态的类。retryIfNoAnswer它可以包含重试处理程序可以检查的标志(是/否)。它可以提供使用waitForAnswer/的/markAnswerReceived方法,以便发送不必休眠固定时间。如果获得答案,该方法可以返回 true,如果超时则返回 false。在发送之前将对象放入重试处理程序中,并使用时间戳,以便仅重试早于特定年龄的消息。这修复了第一个竞争条件。waitnotifywaitForAnswer
编辑:更新了下面的示例代码,与您的代码一起编译,未经测试:
public class SendToQueue {
private final ExecutorService cleanupExecutor = Executors.newFixedThreadPool(5);
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
// Not sure why you are using a cache rather than a standard ConcurrentHashMap?
private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
.concurrencyLevel(100)
.removalListener(RemovalListeners.asynchronous(new LoggingRemovalListener(), cleanupExecutor)).build();
private static class PendingMessage {
private final long _address;
private final byte[] _encodedRecords;
private final Socket _socket;
private final boolean _retryEnabled;
private final Object _monitor = new Object();
private long _sendTimeMillis;
private volatile boolean _acknowledged;
public PendingMessage(long address, byte[] encodedRecords, Socket socket, boolean retryEnabled) {
_address = address;
_sendTimeMillis = System.currentTimeMillis();
_encodedRecords = encodedRecords;
_socket = socket;
_retryEnabled = retryEnabled;
}
public synchronized boolean hasExpired() {
return System.currentTimeMillis() - _sendTimeMillis > 500L;
}
public synchronized void markResent() {
_sendTimeMillis = System.currentTimeMillis();
}
public boolean shouldRetry() {
return _retryEnabled && !_acknowledged;
}
public boolean waitForAck() {
try {
synchronized(_monitor) {
_monitor.wait(500L);
}
return _acknowledged;
}
catch (InterruptedException e) {
return false;
}
}
public void ackReceived() {
_acknowledged = true;
synchronized(_monitor) {
_monitor.notifyAll();
}
}
public long getAddress() {
return _address;
}
public byte[] getEncodedRecords() {
return _encodedRecords;
}
public Socket getSocket() {
return _socket;
}
}
private static class Holder {
private static final SendToQueue INSTANCE = new SendToQueue();
}
public static SendToQueue getInstance() {
return Holder.INSTANCE;
}
private void handleRetries() {
List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
for (PendingMessage m : messages) {
if (m.hasExpired()) {
if (m.shouldRetry()) {
m.markResent();
doSendAsync(m, m.getSocket());
}
else {
// Or leave the message and let send remove it
cache.invalidate(m.getAddress());
}
}
}
}
private SendToQueue() {
executorService.submit(new ResponsePoller()); // another thread which receives acknowledgement and then delete entry from the cache accordingly.
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
handleRetries();
}
}, 0, 1, TimeUnit.SECONDS);
}
public boolean sendAsync(final long address, final byte[] encodedRecords, final Socket socket) {
PendingMessage m = new PendingMessage(address, encodedRecords, socket, true);
cache.put(address, m);
return doSendAsync(m, socket);
}
private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
ZMsg msg = new ZMsg();
msg.add(pendingMessage.getEncodedRecords());
try {
// send data on a socket LINE A
return msg.send(socket);
}
finally {
msg.destroy();
}
}
public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
PendingMessage m = new PendingMessage(address, encodedRecords, socket, false);
cache.put(address, m);
try {
if (doSendAsync(m, socket)) {
return m.waitForAck();
}
return false;
}
finally {
// Alternatively (checks that address points to m):
// cache.asMap().remove(address, m);
cache.invalidate(address);
}
}
public void handleAckReceived(final long address) {
PendingMessage m = cache.getIfPresent(address);
if (m != null) {
m.ackReceived();
cache.invalidate(address);
}
}
}
Run Code Online (Sandbox Code Playgroud)
并从以下地方致电ResponsePoller:
SendToQueue.getInstance().handleAckReceived(addressFrom);
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
788 次 |
| 最近记录: |