scalaz-stream如何实现`ask-then-wait-reply` tcp客户端

jil*_*len 5 scala scalaz scalaz-stream

我想实现一个客户端应用程序,首先向服务器发送请求然后等待其回复(类似于http)

我的客户流程可能是

 val topic = async.topic[ByteVector]
 val client = topic.subscribe
Run Code Online (Sandbox Code Playgroud)

这是api

trait Client {
  val incoming = tcp.connect(...)(client)
   val reqBus = topic.pubsh()
   def ask(req: ByteVector): Task[Throwable \/ ByteVector] =  {
      (tcp.writes(req).flatMap(_ => tcp.reads(1024))).to(reqBus)
      ???
   }
}
Run Code Online (Sandbox Code Playgroud)

那么,如何实现剩下的部分ask 呢?

Pav*_*cek 6

通常,实现是通过接收器发布消息然后等待某些来源的某种回复(如您的主题)来完成的.

实际上我们的代码中有很多这样的习语:

def reqRply[I,O,O2](src:Process[Task,I],sink:Sink[Task,I],reply:Process[Task,O])(pf: PartialFunction[O,O2]):Process[Task,O2] = {
 merge.mergeN(Process(reply, (src to sink).drain)).collectFirst(pf)
}
Run Code Online (Sandbox Code Playgroud)

基本上这第一个钩子回复流等待任何结果O确认我们的请求发送.然后我们发布消息I并咨询最终转换为然后终止的pf任何传入.OO2

  • 啊,谢谢@pavel :)很高兴看到更多像这样的例子在野外使用:)将常见模式的例子添加到scalaz-stream wiki将有助于我们对库的新手:) (2认同)