使用scala continuation与netty/NIO侦听器

Jer*_*emy 9 continuations scala netty

我正在使用Netty库(GitHub的第4版).它在Scala中运行良好,但我希望我的库能够使用延续传递样式进行异步等待.

传统上使用Netty你会做这样的事情(一个示例异步连接操作):

//client is a ClientBootstrap
val future:ChannelFuture = client.connect(remoteAddr);
future.addListener(new ChannelFutureListener {
    def operationComplete (f:ChannelFuture) = {
        //here goes the code that happens when the connection is made   
    }
})
Run Code Online (Sandbox Code Playgroud)

如果你正在实现一个库(我是),那么你基本上有三个简单的选项,允许库的用户在建立连接后做东西:

  1. 只需从connect方法返回ChannelFuture并让用户处理它 - 这不会提供netty的大量抽象.
  2. 将ChannelFutureListener作为connect方法的参数,并将其添加为ChannelFuture的侦听器.
  3. 将回调函数对象作为connect方法的参数,并从您创建的ChannelFutureListener中调用它(这将使得回调驱动的样式有点像node.js)

我想做的是第四种选择; 我没有在上面的计数中包含它,因为它并不简单.

我想使用scala分隔的continuation来使库有点像一个阻塞库,但它将在幕后无阻塞:

class MyLibraryClient {
    def connect(remoteAddr:SocketAddress) = {
        shift { retrn: (Unit => Unit) => {
                val future:ChannelFuture = client.connect(remoteAddr);
                future.addListener(new ChannelFutureListener {
                    def operationComplete(f:ChannelFuture) = {
                        retrn();
                    }   
                });
            }
        }   
    }
}
Run Code Online (Sandbox Code Playgroud)

想象一下,其他读/写操作以相同的方式实现.这样做的目的是用户的代码看起来更像这样:

reset {
    val conn = new MyLibraryClient();
    conn.connect(new InetSocketAddress("127.0.0.1", 1337));
    println("This will happen after the connection is finished");
}
Run Code Online (Sandbox Code Playgroud)

换句话说,该程序看起来像一个简单的阻塞式程序,但在幕后不会有任何阻塞或线程.

我遇到的麻烦是我不完全理解分隔连续的输入是如何工作的.当我尝试以上述方式实现它时,编译器抱怨我的operationComplete实现实际返回Unit @scala.util.continuations.cpsParam[Unit,Unit => Unit]而不是Unit.我得知scala的CPS中存在一种"陷阱",你必须使用一个shift方法的返回类型进行注释@suspendable,它会在调用堆栈中向上传递reset,但是似乎没有任何方法可以将其与预先存在的Java库,没有分隔连续的概念.

我觉得真的必须有办法解决这个问题 - 如果Swarm可以序列化连续并将它们阻塞在网络上以便在别处进行计算,那么必须可以简单地从预先存在的Java类调用延续.但我无法弄清楚它是如何完成的.我是否必须在Scala中重写netty的所有部分才能实现这一目标?

sha*_*ams 4

当我开始学习时,我发现对Scala 延续性的解释非常有帮助。shift[A, B, C]特别注意他解释和 的部分reset[B, C]。添加一个虚拟对象null作为 的最后一个语句operationComplete应该会有所帮助。

顺便说一句,如果它可能有嵌套,您需要在retrn()另一个内部调用。resetshift

编辑:这是一个工作示例

import scala.util.continuations._
import java.util.concurrent.Executors

object Test {

  val execService = Executors.newFixedThreadPool(2)

  def main(args: Array[String]): Unit = {
    reset {
      val conn = new MyLibraryClient();
      conn.connect("127.0.0.1");
      println("This will happen after the connection is finished");
    }
    println("Outside reset");
  }
}

class ChannelFuture {
  def addListener(listener: ChannelFutureListener): Unit = {
    val future = this
    Test.execService.submit(new Runnable {
      def run(): Unit = {
        listener.operationComplete(future)
      }
    })
  }
}

trait ChannelFutureListener {
  def operationComplete(f: ChannelFuture): Unit
}

class MyLibraryClient {
  def connect(remoteAddr: String): Unit@cps[Unit] = {
    shift {
      retrn: (Unit => Unit) => {
        val future: ChannelFuture = new ChannelFuture()
        future.addListener(new ChannelFutureListener {
          def operationComplete(f: ChannelFuture): Unit = {
            println("operationComplete starts")
            retrn();
            null
          }
        });
      }
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

可能的输出:

Outside reset
operationComplete starts
This will happen after the connection is finished
Run Code Online (Sandbox Code Playgroud)