如何在netty中异步访问memcached

Viv*_*dey 6 memcached spymemcached netty

我在netty中编写服务器,我需要调用memcached.我正在使用spymemcached,可以轻松地进行同步memcached调用.我希望这个memcached调用是异步的.那可能吗?netty提供的示例似乎没有帮助.

我尝试使用回调:ExecutorService在我的处理程序中创建了一个池,并向该池提交了一个回调工作程序.像这样:

公共类MyHandler扩展ChannelInboundMessageHandlerAdapter <MyPOJO>实现CallbackInterface {

   ...
   private static ExecutorService pool = Executors.newFixedThreadPool(20);


   @Override
   public void messageReceived(ChannelHandlerContext ctx, MyPOJO pojo) {
       ...
       CallingbackWorker worker = new CallingbackWorker(key, this);
       pool.submit(worker);
       ...
   }
   public void myCallback() {
       //get response
       this.ctx.nextOutboundMessageBuf().add(response);
   }
Run Code Online (Sandbox Code Playgroud)

}

CallingbackWorker 好像:

public class CallingbackWorker实现Callable {

  public CallingbackWorker(String key, CallbackInterface c) {
       this.c = c;
       this.key = key;
  }
  public Object call() {
    //get value from key
    c.myCallback(value);
  }
Run Code Online (Sandbox Code Playgroud)

然而,当我这样做,this.ctx.nextOutboundMessageBuf()myCallback卡住.

总的来说,我的问题是:如何在Netty中进行异步memcached调用?

Bil*_*tch 3

这里有两个问题:一个是您尝试编码的方式的小问题,另一个是许多提供异步服务调用的库的大问题,但没有好的方法可以在像这样的异步框架中充分利用它们内蒂。这迫使用户进行像这样的次优黑客攻击,或者是一种不太糟糕但仍然不是理想的方法,我稍后会介绍。

首先是编码问题。问题是您尝试从与处理程序关联的线程以外的线程调用 ChannelHandlerContext 方法,这是不允许的。这很容易修复,如下所示。您可以通过其他几种方式对其进行编码,但这可能是最简单的:

private static ExecutorService pool = Executors.newFixedThreadPool(20);

public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
    //...

    final GetFuture<String> future = memcachedClient().getAsync("foo", stringTranscoder());

    // first wait for the response on a pool thread
    pool.execute(new Runnable() {
        public void run() {
            String value;
            Exception err;
            try {
                value = future.get(3, TimeUnit.SECONDS); // or whatever timeout you want
                err = null;
            } catch (Exception e) {
                err = e;
                value = null;
            }
            // put results into final variables; compiler won't let us do it directly above
            final fValue = value;
            final fErr = err;

            // now process the result on the ChannelHandler's thread
            ctx.executor().execute(new Runnable() {
                public void run() {
                    handleResult(fValue, fErr);
                }
            });
        }
    });
// note that we drop through to here right after calling pool.execute() and
// return, freeing up the handler thread while we wait on the pool thread.
}

private void handleResult(String value, Exception err) {
     // handle it
}
Run Code Online (Sandbox Code Playgroud)

这将会起作用,并且可能足以满足您的应用程序。但是您有一个固定大小的线程池,因此如果您要处理超过 20 个并发连接,这将成为瓶颈。您可以增加池大小,或使用无限制的池大小,但此时,您可能还不如在 Tomcat 下运行,因为内存消耗和上下文切换开销开始成为问题,并且您会失去可扩展性,而这正是 Tomcat 的吸引力首先是内蒂!

事实是,Spymemcached 是基于 NIO 的、事件驱动的,并且仅使用一个线程来完成其所有工作,但却无法提供充分利用其事件驱动特性的方法。我希望他们很快就能解决这个问题,就像 Netty 4 和 Cassandra 最近通过在 Future 对象上提供回调(侦听器)方法一样。

同时,和你一样,我研究了替代方案,并且对我的发现不太满意,我(昨天)编写了一个 Future 跟踪器类,它可以以可配置的速率轮询多达数千个 Future,并给你打电话当它们完成时返回到您选择的线程(执行器)。它仅使用一个线程来执行此操作。如果你想尝试一下,我已将其放在 GitHub 上,但请注意,正如他们所说,它仍然是湿的。我在过去的一天里对它进行了很多测试,即使有 10000 个并发模拟 Future 对象,每毫秒轮询一次,它的 CPU 利用率也可以忽略不计,尽管它开始上升到超过 10000。使用它,上面的例子看起来像这样:

// in some globally-accessible class:

public static final ForeignFutureTracker FFT = new ForeignFutureTracker(1, TimeUnit.MILLISECONDS);

// in a handler class:

public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
// ...

  final GetFuture<String> future = memcachedClient().getAsync("foo", stringTranscoder());

  // add a listener for the Future, with a timeout in 2 seconds, and pass
  // the Executor for the current context so the callback will run
  // on the same thread.
  Global.FFT.addListener(future, 2, TimeUnit.SECONDS, ctx.executor(), 
    new ForeignFutureListener<String,GetFuture<String>>() {

      public void operationSuccess(String value) {
        // do something ...
        ctx.fireChannelRead(someval);
      }
      public void operationTimeout(GetFuture<String> f) {
        // do something ...
      }
      public void operationFailure(Exception e) {
        // do something ...
      }
    });
}
Run Code Online (Sandbox Code Playgroud)

您不希望任何时候有超过一两个 FFT 实例处于活动状态,否则它们可能会耗尽 CPU。但单个实例可以处理数千个未完成的期货;拥有第二个的唯一原因是为了以较慢的轮询速率(例如 10-20 毫秒)处理较高延迟的调用,例如 S3。

轮询方法的一个缺点是它会增加少量的延迟。例如,每毫秒轮询一次,平均会增加 500 微秒的响应时间。对于大多数应用程序来说,这不会成为问题,而且我认为线程池方法所节省的内存和 CPU 足以抵消这一点。

我预计在一年左右的时间内,这将不再是问题,因为更多的异步客户端提供回调机制,让您充分利用 NIO 和事件驱动模型。