Zim*_*oot 18 java multithreading exception-handling message-queue akka
我有一个Java类型的actor,它负责外部资源上可能暂时不可用的过滤器/重试逻辑.演员的领域和常用方法是:
public class MyActorImpl implements MyActor {
private static final long MINWAIT = 50;
private static final long MAXWAIT = 1000;
private static final long DEFAULTWAIT = 0;
private static final double BACKOFFMULTIPLIER = 1.5;
private long updateWait(long currentWait) {
return Math.min(Math.max((long) (currentWait * BACKOFFMULTIPLIER), MINWAIT), MAXWAIT);
}
// mutable
private long opWait = DEFAULTWAIT;
private final Queue<OpInput> opBuffer = new ArrayDeque<>();
// called from external actor
public void operation(OpInput opInput) {
operation(opInput, DEFAULTWAIT);
}
// called internally
public void operation(OpInput opInput, long currentWait);
}
Run Code Online (Sandbox Code Playgroud)
actor有几个操作都具有或多或少相同的重试/缓冲逻辑; 每个操作都有自己的[op]Wait和[op]Buffer领域.
void operation(OpInput opInput)void operation(OpInput opInput, long currentWait)using DEFAULTWAIT用于第二个参数currentWait参数不相等opWait则输入存储在其中opBuffer,否则输入将被发送到外部资源.opWait设置为DEFAULTWAIT,并opBuffer通过该operation(opInput)方法发回内容.如果外部资源(或更可能是网络)返回错误,那么我使用ms 的延迟更新opWait = updateWait(opWait)并调度operation(opInput, opWait)actor系统调度程序opWait.即我正在使用actor系统调度程序来实现指数退避; 我正在使用该currentWait参数来标识我正在重试的消息,并且正在缓冲其他消息,直到外部资源成功处理主消息.
问题是,如果预定的operation(opInput, currentWait)消息丢失,那么我将永远缓冲消息,因为currentWait == opWait保护将对所有其他消息失败.我可以使用spring-retry之类的东西来实现指数退避,但是我没有看到合并操作的重试循环的方法,这意味着我可以在每个重试循环中使用一个线程(而使用actor系统的调度程序不会更多的系统压力).
我正在寻找一种更容错的方法来在actor和外部资源之间的接口上实现缓冲和指数退避,而不必为任务分配太多资源.
如果我正确理解你,如果唯一的问题是丢失预定的消息,为什么你不只是使用可靠的代理模式 这样的特定消息,然后如果它失败opWait = DEFAULTWAIT;
关于你的代码有一些我得到的,当你说public void operation(OpInput opInput)外部调用时,我不明白你的意思.你的意思是这个方法与网络交互,网络使用有时不可用的资源?
如果我可以,我可以建议一个替代方案.从我的理解你的主要问题是你有一个有时不可用的资源,所以你有一些你用某种等待逻辑实现的que/buffer,以便一旦它再次可用就会处理它,不幸的是涉及一些可能会丢失并导致无限等待的消息.我认为你可以通过超时使用期货实现你想要的.然后重试,如果未来在一段时间内没有完成,最多可以说3次重新尝试.您甚至可以根据服务器负载和完成消息所需的时间来调整此时间.希望有所帮助.