And*_*rey 6 tcp scala akka akka-stream
我采用了一种简单的TCP协议来与Akka Streams交换消息(见下文).但是,似乎没有立即处理传入的消息; 也就是说,在从客户端一个接一个地发送两条消息的情况下,只有在从服务器发送一些消息后才打印第一条消息:
At t=1, on [client] A is entered
At t=2, on [client] B is entered
At t=3, on [server] Z is entered
At t=4, on [server] A is printed
At t=5, on [server] Y is entered
At t=6, on [server] B is printed
Run Code Online (Sandbox Code Playgroud)
我期望/想要看到的内容:
At t=1, on [client] A is entered
At t=2, on [server] A is printed
At t=3, on [client] B is entered
At t=4, on [server] B is printed
At t=5, on [server] Z is entered
At t=6, on [client] Z is printed
At t=7, on [server] Y is entered
At t=8, on [client] Y is printed
Run Code Online (Sandbox Code Playgroud)
我错过了什么?也许我需要以某种方式使两端的水槽急切?或者每个接收器以某种方式被相应的源阻塞(当源等待来自命令行的输入时)?
import java.nio.charset.StandardCharsets.UTF_8
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{BidiFlow, Flow, Sink, Source, Tcp}
import akka.util.ByteString
import com.typesafe.config.ConfigFactory
import scala.io.StdIn
object AkkaStreamTcpChatter extends App {
implicit val system = ActorSystem("akka-stream-tcp-chatter", ConfigFactory.defaultReference())
implicit val materializer = ActorMaterializer()
type Message = String
val (host, port) = ("localhost", 46235)
val deserialize:ByteString => Message = _.utf8String
val serialize:Message => ByteString = message => ByteString(message getBytes UTF_8)
val incoming:Flow[ByteString, Message, _] = Flow fromFunction deserialize
val outgoing:Flow[Message, ByteString, _] = Flow fromFunction serialize
val protocol = BidiFlow.fromFlows(incoming, outgoing)
def prompt(s:String):Source[Message, _] = Source fromIterator {
() => Iterator.continually(StdIn readLine s"[$s]> ")
}
val print:Sink[Message, _] = Sink foreach println
args.headOption foreach {
case "server" => server()
case "client" => client()
}
def server():Unit =
Tcp()
.bind(host, port)
.runForeach { _
.flow
.join(protocol)
.runWith(prompt("S"), print)
}
def client():Unit =
Tcp()
.outgoingConnection(host, port)
.join(protocol)
.runWith(prompt("C"), print)
}
Run Code Online (Sandbox Code Playgroud)
我认为问题在于Akka Stream会进行操作员融合.这意味着完整的流处理在单个actor上运行.当它阻止阅读你的消息时,它无法打印出任何东西.
解决方案是在源之后添加异步边界.请参阅下面的示例.
def server(): Unit =
Tcp()
.bind(host, port)
.runForeach {
_
.flow
.join(protocol)
.runWith(prompt("S").async, print) // note .async here
}
def client(): Unit =
Tcp()
.outgoingConnection(host, port)
.join(protocol).async
.runWith(prompt("C").async, print) // note .async here
Run Code Online (Sandbox Code Playgroud)
当您添加异步边界时,融合不会跨越边界发生,并且prompt在另一个actor上运行,因此不会阻止print显示任何内容.
| 归档时间: |
|
| 查看次数: |
2480 次 |
| 最近记录: |