jil*_*len 5 scala scalaz scalaz-stream
我想实现一个客户端应用程序,首先向服务器发送请求然后等待其回复(类似于http)
我的客户流程可能是
val topic = async.topic[ByteVector]
val client = topic.subscribe
Run Code Online (Sandbox Code Playgroud)
这是api
trait Client {
val incoming = tcp.connect(...)(client)
val reqBus = topic.pubsh()
def ask(req: ByteVector): Task[Throwable \/ ByteVector] = {
(tcp.writes(req).flatMap(_ => tcp.reads(1024))).to(reqBus)
???
}
}
Run Code Online (Sandbox Code Playgroud)
那么,如何实现剩下的部分ask 呢?
通常,实现是通过接收器发布消息然后等待某些来源的某种回复(如您的主题)来完成的.
实际上我们的代码中有很多这样的习语:
def reqRply[I,O,O2](src:Process[Task,I],sink:Sink[Task,I],reply:Process[Task,O])(pf: PartialFunction[O,O2]):Process[Task,O2] = {
merge.mergeN(Process(reply, (src to sink).drain)).collectFirst(pf)
}
Run Code Online (Sandbox Code Playgroud)
基本上这第一个钩子回复流等待任何结果O确认我们的请求发送.然后我们发布消息I并咨询最终转换为然后终止的pf任何传入.OO2