进程池的应用程序级负载平衡器

nog*_*ard 4 c++ java architecture networking

我们在C++中使用传统的单片软件,其作用类似于请求 - 回复TCP服务器.该软件是单线程的,可以同时处理一个请求.目前,我们已经修复了这些流程的池,以并行服务多个客户端.

由于消息量大,客户端会定期遇到请求处理的严重延迟.目前,我们有一个想法是通过在客户端和工作者之间引入一种代理来解决这个问题:

代理

我们需要此代理的以下功能:

  1. 应用程序级负载平衡:通过检查请求上下文和客户端ID在工作者之间传播请求
  2. 控制和监视工作进程的生命周期
  3. 产生额外的工作进程(在不同的PC上)来处理峰值

事实上,我们希望它的行为类似于Java中的ExecutorService,但是使用工作进程而不是线程.目前的想法是基于Jetty或Tomcat服务器在Java中实现此Balancer,内部消息队列和servlet将请求转发给工作进程.

但我想知道:现有的解决方案(最好是Java)会自动化这个过程吗?实现此类代理的最简单方法是什么?

更新:

我对请求上下文做了什么 - 好吧,C++服务器是非常混乱的软件.实际上,每次它接收到不同的上下文时,它会相应地更新内部缓存以匹配该上下文 例如,如果您请求该服务器为您提供一些英语数据,那么它会将内部缓存重新加载到英语.如果下一个请求是法语,那么它会再次重新加载缓存.显然,我希望通过更智能地转发请求来最小化缓存重新加载的次数.

通信协议是自制的(基于TCP/IP),但从中提取上下文部分相对容易.

当前负载均衡是在客户端实现的,因此每个客户端都配置为知道所有服务器节点并以循环方式向它们发送请求.这种方法存在以下几个问题:客户端的连接管理复杂,与多个不了解对方的客户端的不正确工作,无法管理节点生命周期.我们无法通过重构来解决列出的问题.

我们最有可能最终会使用自制的转发解决方案,但我仍然想知道现有产品是否至少用于流程管理?理想情况下,这将是Java应用程序服务器,可以:

  • Spawn子节点(另一个Java进程)
  • 监控他们的生命周期
  • 通过一些协议与他们沟通

也许这个功能已经在一些现有的应用服务器中实现了?这将大大简化问题!

wal*_*ros 5

关于流程管理,您可以通过混合Apache Commons Exec库的功能轻松实现您的目标,该库可以帮助使用Apache Commons Pool库生成新的工作实例,该库将管理正在运行的实例.

实现非常简单,因为commons池将确保您可以使用一个对象,直到它返回池中.如果没有将对象返回到池中,则commons池将为您生成新实例.您可以通过添加监视程序服务(来自apache commons exec)来控制工作程序的生命周期 - 监视程序可以杀死未使用一段时间的实例,或者您也可以使用公共池本身,例如通过调用pool.clearOldest().您还可以通过调用pool.getNumActive()来查看当前处理的请求数(有多少工作者处于活动状态).请参阅GenericKeyedObjectPool的javadoc以查看更多内容.

可以使用在Tomcat服务器上运行的一个简单servlet来完成实现.这个servlet将实例化池,并通过调用pool.borowObject(参数)简单地向池中请求新的worker.在内部参数中,您可以定义工作人员应处理请求的特征(在您的情况下,参数应包括语言).如果没有这样的工人(例如没有工作人员为法语),游泳池将为您产生新工人.此外,如果有一个工作人员但工作人员当前正在处理另一个请求,则池也将为您生成一个新工作人员(因此您将有两个工作人员处理相同的语言).当您调用pool.returnObject(parameters,instance)时,Worker将准备好处理新请求.

整个实现只需要不到200行代码(完整代码见下文).代码包括工作进程从外部被杀或将崩溃的情况(请参阅WorkersFactory.activateObject()).

恕我直言:使用Apache Cammel对你来说不是一个好选择,因为它太大的工具而且它被设计成不同消息格式之间的中介总线.您不需要进行转换,也不需要处理不同格式的消息.寻求简单的解决方案.

package com.myapp;

import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.pool2.BaseKeyedPooledObjectFactory;
import org.apache.commons.pool2.KeyedPooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.Objects;

public class BalancingServlet extends javax.servlet.http.HttpServlet {

    private final WorkersPool workersPool;

    public BalancingServlet() {
        workersPool = new WorkersPool(new WorkersFactory());
    }


    protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {

    }

    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        response.getWriter().println("Balancing");

        String language = request.getParameter("language");
        String someOtherParam = request.getParameter("other");
        WorkerParameters workerParameters = new WorkerParameters(language, someOtherParam);

        String requestSpecificParam1 = request.getParameter("requestParam1");
        String requestSpecificParam2 = request.getParameter("requestParam2");

        try {
            WorkerInstance workerInstance = workersPool.borrowObject(workerParameters);
            workerInstance.handleRequest(requestSpecificParam1, requestSpecificParam2);
            workersPool.returnObject(workerParameters, workerInstance);

        } catch (Exception e) {
            e.printStackTrace();
        }


    }
}

class WorkerParameters {
    private final String workerLangauge;
    private final String someOtherParam;

    WorkerParameters(String workerLangauge, String someOtherParam) {
        this.workerLangauge = workerLangauge;
        this.someOtherParam = someOtherParam;
    }

    public String getWorkerLangauge() {
        return workerLangauge;
    }

    public String getSomeOtherParam() {
        return someOtherParam;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        WorkerParameters that = (WorkerParameters) o;

        return Objects.equals(this.workerLangauge, that.workerLangauge) && Objects.equals(this.someOtherParam, that.someOtherParam);
    }

    @Override
    public int hashCode() {
        return Objects.hash(workerLangauge, someOtherParam);
    }
}

class WorkerInstance {
    private final Thread thread;
    private WorkerParameters workerParameters;

    public WorkerInstance(final WorkerParameters workerParameters) {
        this.workerParameters = workerParameters;

        // launch the process here   
        System.out.println("Spawing worker for language: " + workerParameters.getWorkerLangauge());

        // use commons Exec to spawn your process using command line here

        // something like


        thread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    String line = "C:/Windows/notepad.exe" ;
                    final CommandLine cmdLine = CommandLine.parse(line);

                    final DefaultExecutor executor = new DefaultExecutor();
                    executor.setExitValue(0);
//                    ExecuteWatchdog watchdog = new ExecuteWatchdog(60000); // if you want to kill process running too long
//                    executor.setWatchdog(watchdog);

                    int exitValue = executor.execute(cmdLine);
                    System.out.println("process finished with exit code: " + exitValue);
                } catch (IOException e) {
                    throw new RuntimeException("Problem while executing application for language: " + workerParameters.getWorkerLangauge(), e);
                }


            }
        });

        thread.start();


        System.out.println("Process spawned for language: " + workerParameters.getWorkerLangauge());


    }

    public void handleRequest(String someRequestParam1, String someRequestParam2) {
        System.out.println("Handling request for extra params: " + someRequestParam1 + ", " + someRequestParam2);

        // communicate with your application using parameters here

        // communcate via tcp or whatever protovol you want using extra parameters: someRequestParam1, someRequestParam2


    }

    public boolean isRunning() {
        return thread.isAlive();
    }


}

class WorkersFactory extends BaseKeyedPooledObjectFactory<WorkerParameters, WorkerInstance> {

    @Override
    public WorkerInstance create(WorkerParameters parameters) throws Exception {
        return new WorkerInstance(parameters);
    }

    @Override
    public PooledObject<WorkerInstance> wrap(WorkerInstance worker) {
        return new DefaultPooledObject<WorkerInstance>(worker);
    }

    @Override
    public void activateObject(WorkerParameters worker, PooledObject<WorkerInstance> p)
            throws Exception {
        System.out.println("Activating worker for lang: " + worker.getWorkerLangauge());

        if  (! p.getObject().isRunning()) {
            System.out.println("Worker for lang: " + worker.getWorkerLangauge() + " stopped working, needs to respawn it");
            throw new RuntimeException("Worker for lang: " + worker.getWorkerLangauge() + " stopped working, needs to respawn it");
        }
    }

    @Override
    public void passivateObject(WorkerParameters worker, PooledObject<WorkerInstance> p)
            throws Exception {
        System.out.println("Passivating worker for lang: " + worker.getWorkerLangauge());
    }

}

class WorkersPool extends GenericKeyedObjectPool<WorkerParameters, WorkerInstance> {

    public WorkersPool(KeyedPooledObjectFactory<WorkerParameters, WorkerInstance> factory) {
        super(factory);
    }
}
Run Code Online (Sandbox Code Playgroud)