Jer*_*emy 9 continuations scala netty
我正在使用Netty库(GitHub的第4版).它在Scala中运行良好,但我希望我的库能够使用延续传递样式进行异步等待.
传统上使用Netty你会做这样的事情(一个示例异步连接操作):
//client is a ClientBootstrap
val future:ChannelFuture = client.connect(remoteAddr);
future.addListener(new ChannelFutureListener {
def operationComplete (f:ChannelFuture) = {
//here goes the code that happens when the connection is made
}
})
Run Code Online (Sandbox Code Playgroud)
如果你正在实现一个库(我是),那么你基本上有三个简单的选项,允许库的用户在建立连接后做东西:
我想做的是第四种选择; 我没有在上面的计数中包含它,因为它并不简单.
我想使用scala分隔的continuation来使库有点像一个阻塞库,但它将在幕后无阻塞:
class MyLibraryClient {
def connect(remoteAddr:SocketAddress) = {
shift { retrn: (Unit => Unit) => {
val future:ChannelFuture = client.connect(remoteAddr);
future.addListener(new ChannelFutureListener {
def operationComplete(f:ChannelFuture) = {
retrn();
}
});
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
想象一下,其他读/写操作以相同的方式实现.这样做的目的是用户的代码看起来更像这样:
reset {
val conn = new MyLibraryClient();
conn.connect(new InetSocketAddress("127.0.0.1", 1337));
println("This will happen after the connection is finished");
}
Run Code Online (Sandbox Code Playgroud)
换句话说,该程序看起来像一个简单的阻塞式程序,但在幕后不会有任何阻塞或线程.
我遇到的麻烦是我不完全理解分隔连续的输入是如何工作的.当我尝试以上述方式实现它时,编译器抱怨我的operationComplete实现实际返回Unit @scala.util.continuations.cpsParam[Unit,Unit => Unit]而不是Unit.我得知scala的CPS中存在一种"陷阱",你必须使用一个shift方法的返回类型进行注释@suspendable,它会在调用堆栈中向上传递reset,但是似乎没有任何方法可以将其与预先存在的Java库,没有分隔连续的概念.
我觉得真的必须有办法解决这个问题 - 如果Swarm可以序列化连续并将它们阻塞在网络上以便在别处进行计算,那么必须可以简单地从预先存在的Java类调用延续.但我无法弄清楚它是如何完成的.我是否必须在Scala中重写netty的所有部分才能实现这一目标?
当我开始学习时,我发现对Scala 延续性的解释非常有帮助。shift[A, B, C]特别注意他解释和 的部分reset[B, C]。添加一个虚拟对象null作为 的最后一个语句operationComplete应该会有所帮助。
顺便说一句,如果它可能有嵌套,您需要在retrn()另一个内部调用。resetshift
编辑:这是一个工作示例
import scala.util.continuations._
import java.util.concurrent.Executors
object Test {
val execService = Executors.newFixedThreadPool(2)
def main(args: Array[String]): Unit = {
reset {
val conn = new MyLibraryClient();
conn.connect("127.0.0.1");
println("This will happen after the connection is finished");
}
println("Outside reset");
}
}
class ChannelFuture {
def addListener(listener: ChannelFutureListener): Unit = {
val future = this
Test.execService.submit(new Runnable {
def run(): Unit = {
listener.operationComplete(future)
}
})
}
}
trait ChannelFutureListener {
def operationComplete(f: ChannelFuture): Unit
}
class MyLibraryClient {
def connect(remoteAddr: String): Unit@cps[Unit] = {
shift {
retrn: (Unit => Unit) => {
val future: ChannelFuture = new ChannelFuture()
future.addListener(new ChannelFutureListener {
def operationComplete(f: ChannelFuture): Unit = {
println("operationComplete starts")
retrn();
null
}
});
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
可能的输出:
Outside reset
operationComplete starts
This will happen after the connection is finished
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2137 次 |
| 最近记录: |