kir*_*uku 221 scala akka-stream
Akka Streams库已经提供了大量文档.然而,对我来说,主要的问题是它提供了太多的材料 - 我觉得我必须学习的概念数量让我感到非常不知所措.很多例子显示出非常重量级,并且不能轻易地翻译成现实世界的用例,因此非常深奥.我认为它提供了太多的细节而没有解释如何一起构建所有构建块以及它如何帮助解决特定问题.
有源,汇,流,图阶段,部分图,物化,图DSL和更多,我只是不知道从哪里开始.该快速入门指南,就是要一个首发位置,但我不明白.它只是抛出上面提到的概念而不解释它们.此外,代码示例无法执行 - 缺少部分使我或多或少无法遵循文本.
任何人都可以解释概念源,汇,流,图阶段,部分图,物化以及其他一些我用简单的单词和简单的例子来解释的东西,这些例子不能解释每一个细节(并且可能不需要在一开始)?
kir*_*uku 501
This answer is based on akka-stream
version 2.4.2
. The API can be slightly different in other versions. The dependency can be consumed by sbt:
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.4.2"
Run Code Online (Sandbox Code Playgroud)
Alright, lets get started. The API of Akka Streams consists of three main types. In contrast to Reactive Streams, these types are a lot more powerful and therefore more complex. It is assumed that for all the code examples the following definitions already exist:
import scala.concurrent._
import akka._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.util._
implicit val system = ActorSystem("TestSystem")
implicit val materializer = ActorMaterializer()
import system.dispatcher
Run Code Online (Sandbox Code Playgroud)
The import
statements are needed for the type declarations. system
represents the actor system of Akka and materializer
represents the evaluation context of the stream. In our case we use a ActorMaterializer
, which means that the streams are evaluated on top of actors. Both values are marked as implicit
, which gives the Scala compiler the possibility to inject these two dependencies automatically whenever they are needed. We also import system.dispatcher
, which is a execution context for Futures
.
Akka Streams have these key properties:
Materializer
.Source
,Sink
和Flow
.构建块形成一个图表,其评估基于Materializer
并且需要明确触发.下面将给出如何使用三种主要类型的更深入的介绍.
A Source
是数据创建者,它充当流的输入源.每个Source
都有一个输出通道,没有输入通道.所有数据都通过输出通道流向连接到的任何数据Source
.
图片来自boldradius.com.
A Source
可以通过多种方式创建:
scala> val s = Source.empty
s: akka.stream.scaladsl.Source[Nothing,akka.NotUsed] = ...
scala> val s = Source.single("single element")
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...
scala> val s = Source(1 to 3)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...
scala> val s = Source(Future("single value from a Future"))
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...
scala> s runForeach println
res0: scala.concurrent.Future[akka.Done] = ...
single value from a Future
Run Code Online (Sandbox Code Playgroud)
在上述情况下,我们Source
使用有限数据,这意味着它们最终会终止.不应忘记,默认情况下Reactive Streams是惰性和异步的.这意味着显然必须请求流的评估.在Akka Streams中,这可以通过这些run*
方法完成.这runForeach
与众所周知的foreach
功能没有区别- 通过run
添加它明确表示我们要求对流进行评估.由于有限数据很无聊,我们继续无限的:
scala> val s = Source.repeat(5)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...
scala> s take 3 runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
5
5
5
Run Code Online (Sandbox Code Playgroud)
通过该take
方法,我们可以创建一个人为停止点,阻止我们无限期地进行评估.由于actor支持是内置的,我们也可以使用发送给actor的消息轻松地提供流:
def run(actor: ActorRef) = {
Future { Thread.sleep(300); actor ! 1 }
Future { Thread.sleep(200); actor ! 2 }
Future { Thread.sleep(100); actor ! 3 }
}
val s = Source
.actorRef[Int](bufferSize = 0, OverflowStrategy.fail)
.mapMaterializedValue(run)
scala> s runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
3
2
1
Run Code Online (Sandbox Code Playgroud)
We can see that the Futures
are executed asynchronously on different threads, which explains the result. In the above example a buffer for the incoming elements is not necessary and therefore with OverflowStrategy.fail
we can configure that the stream should fail on a buffer overflow. Especially through this actor interface, we can feed the stream through any data source. It doesn't matter if the data is created by the same thread, by a different one, by another process or if they come from a remote system over the Internet.
A Sink
基本上与a相反Source
.它是流的端点,因此消耗数据.A Sink
具有单个输入通道而没有输出通道.Sinks
当我们想要以可重用的方式指定数据收集器的行为而不评估流时,尤其需要.已知的run*
方法不允许我们使用这些属性,因此优选使用它Sink
.
图片来自boldradius.com.
实施的简短示例Sink
:
scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...
scala> val sink = Sink.foreach[Int](elem => println(s"sink received: $elem"))
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...
scala> val flow = source to sink
flow: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...
scala> flow.run()
res3: akka.NotUsed = NotUsed
sink received: 1
sink received: 2
sink received: 3
Run Code Online (Sandbox Code Playgroud)
使用该方法可以将Source
a 连接到a .它返回一个所谓的,就像我们稍后会看到一种特殊形式的- 一个可以通过调用它的方法来执行的流.Sink
to
RunnableFlow
Flow
run()
图片来自boldradius.com.
当然可以将到达接收器的所有值转发给actor:
val actor = system.actorOf(Props(new Actor {
override def receive = {
case msg => println(s"actor received: $msg")
}
}))
scala> val sink = Sink.actorRef[Int](actor, onCompleteMessage = "stream completed")
sink: akka.stream.scaladsl.Sink[Int,akka.NotUsed] = ...
scala> val runnable = Source(1 to 3) to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...
scala> runnable.run()
res3: akka.NotUsed = NotUsed
actor received: 1
actor received: 2
actor received: 3
actor received: stream completed
Run Code Online (Sandbox Code Playgroud)
如果您需要在Akka流和现有系统之间建立连接,那么数据源和接收器是很棒的,但是人们无法对它们做任何事情.流程是Akka Streams基础抽象中最后一个缺失的部分.它们充当不同流之间的连接器,可用于转换其元素.
图片来自boldradius.com.
If a Flow
is connected to a Source
a new Source
is the result. Likewise, a Flow
connected to a Sink
creates a new Sink
. And a Flow
connected with both a Source
and a Sink
results in a RunnableFlow
. Therefore, they sit between the input and the output channel but by themselves do not correspond to one of the flavors as long as they are not connected to either a Source
or a Sink
.
Image taken from boldradius.com.
In order to get a better understanding of Flows
, we will have a look at some examples:
scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...
scala> val sink = Sink.foreach[Int](println)
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...
scala> val invert = Flow[Int].map(elem => elem * -1)
invert: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...
scala> val doubler = Flow[Int].map(elem => elem * 2)
doubler: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...
scala> val runnable = source via invert via doubler to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...
scala> runnable.run()
res10: akka.NotUsed = NotUsed
-2
-4
-6
Run Code Online (Sandbox Code Playgroud)
Via the via
method we can connect a Source
with a Flow
. We need to specify the input type because the compiler can't infer it for us. As we can already see in this simple example, the flows invert
and double
are completely independent from any data producers and consumers. They only transform the data and forward it to the output channel. This means that we can reuse a flow among multiple streams:
scala> val s1 = Source(1 to 3) via invert to sink
s1: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...
scala> val s2 = Source(-3 to -1) via invert to sink
s2: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...
scala> s1.run()
res10: akka.NotUsed = NotUsed
-1
-2
-3
scala> s2.run()
res11: akka.NotUsed = NotUsed
3
2
1
Run Code Online (Sandbox Code Playgroud)
s1
and s2
represent completely new streams - they do not share any data through their building blocks.
Before we move on we should first revisit some of the key aspects of Reactive Streams. An unbounded number of elements can arrive at any point and can put a stream in different states. Beside from a runnable stream, which is the usual state, a stream may get stopped either through an error or through a signal that denotes that no further data will arrive. A stream can be modeled in a graphical way by marking events on a timeline as it is the case here:
Image taken from The introduction to Reactive Programming you've been missing.
We have already seen runnable flows in the examples of the previous section. We get a RunnableGraph
whenever a stream can actually be materialized, which means that a Sink
is connected to a Source
. So far we always materialized to the value Unit
, which can be seen in the types:
val source: Source[Int, NotUsed] = Source(1 to 3)
val sink: Sink[Int, Future[Done]] = Sink.foreach[Int](println)
val flow: Flow[Int, Int, NotUsed] = Flow[Int].map(x => x)
Run Code Online (Sandbox Code Playgroud)
For Source
and Sink
the second type parameter and for Flow
the third type parameter denote the materialized value. Throughout this answer, the full meaning of materialization shall not be explained. However, further details about materialization can be found at the official documentation. For now the only thing we need to know is that the materialized value is what we get when we run a stream. Since we were only interested in side effects so far, we got Unit
as the materialized value. The exception to this was a materialization of a sink, which resulted in a Future
. It gave us back a Future
, since this value can denote when the stream that is connected to the sink has been ended. So far, the previous code examples were nice to explain the concept but they were also boring because we only dealt with finite streams or with very simple infinite ones. To make it more interesting, in the following a full asynchronous and unbounded stream shall be explained.
As an example, we want to have a stream that captures click events. To make it more challenging, let's say we also want to group click events that happen in a short time after each other. This way we could easily discover double, triple or tenfold clicks. Furthermore, we want to filter out all single clicks. Take a deep breath and imagine how you would solve that problem in an imperative manner. I bet no one would be able to implement a solution that works correctly on the first try. In a reactive fashion this problem is trivial to solve. In fact, the solution is so simple and straightforward to implement that we can even express it in a diagram that directly describes the behavior of the code:
Image taken from The introduction to Reactive Programming you've been missing.
The gray boxes are functions that describe how one stream is transformed into another. With the throttle
function we accumulate clicks within 250 milliseconds, the map
and filter
functions should be self-explanatory. The color orbs represent an event and the arrows depict how they flow through our functions. Later in the processing steps, we get less and less elements that flow through our stream, since we group them together and filter them out. The code for this image would look something like this:
val multiClickStream = clickStream
.throttle(250.millis)
.map(clickEvents => clickEvents.length)
.filter(numberOfClicks => numberOfClicks >= 2)
Run Code Online (Sandbox Code Playgroud)
The whole logic can be represented in only four lines of code! In Scala, we could write it even shorter:
val multiClickStream = clickStream.throttle(250.millis).map(_.length).filter(_ >= 2)
Run Code Online (Sandbox Code Playgroud)
The definition of clickStream
is a little bit more complex but this is only the case because the example program runs on the JVM, where capturing of click events is not easily possible. Another complication is that Akka by default doesn't provide the throttle
function. Instead we had to write it by ourselves. Since this function is (as it is the case for the map
or filter
functions) reusable across different use cases I don't count these lines to the number of lines we needed to implement the logic. In imperative languages however, it is normal that logic can't be reused that easily and that the different logical steps happen all at one place instead of being applied sequentially, which means that we probably would have misshaped our code with the throttling logic. The full code example is available as a gist and shall not be discussed here any further.
What should be discussed instead is another example. While the click stream is a nice example to let Akka Streams handle a real world example, it lacks the power to show parallel execution in action. The next example shall represent a small web server that can handle multiple requests in parallel. The web sever shall be able to accept incoming connections and receive byte sequences from them that represent printable ASCII signs. These byte sequences or strings should be split at all newline-characters into smaller parts. After that, the server shall respond to the client with each of the split lines. Alternatively, it could do something else with the lines and give a special answer token, but we want to keep it simple in this example and therefore don't introduce any fancy features. Remember, the server needs to be able to handle multiple requests at the same time, which basically means that no request is allowed to block any other request from further execution. Solving all of these requirements can be hard in an imperative way - with Akka Streams however, we shouldn't need more than a few lines to solve any of these. First, let's have an overview over the server itself:
Basically, there are only three main building blocks. The first one needs to accept incoming connections. The second one needs to handle incoming requests and the third one needs to send a response. Implementing all of these three building blocks is only a little bit more complicated than implementing the click stream:
def mkServer(address: String, port: Int)(implicit system: ActorSystem, materializer: Materializer): Unit = {
import system.dispatcher
val connectionHandler: Sink[Tcp.IncomingConnection, Future[Unit]] =
Sink.foreach[Tcp.IncomingConnection] { conn =>
println(s"Incoming connection from: ${conn.remoteAddress}")
conn.handleWith(serverLogic)
}
val incomingCnnections: Source[Tcp.IncomingConnection, Future[Tcp.ServerBinding]] =
Tcp().bind(address, port)
val binding: Future[Tcp.ServerBinding] =
incomingCnnections.to(connectionHandler).run()
binding onComplete {
case Success(b) =>
println(s"Server started, listening on: ${b.localAddress}")
case Failure(e) =>
println(s"Server could not be bound to $address:$port: ${e.getMessage}")
}
}
Run Code Online (Sandbox Code Playgroud)
The function mkServer
takes (besides from the address and the port of the server) also an actor system and a materializer as implicit parameters. The control flow of the server is represented by binding
, which takes a source of incoming connections and forwards them to a sink of incoming connections. Inside of connectionHandler
, which is our sink, we handle every connection by the flow serverLogic
, which will be described later. binding
returns a Future
, which completes when the server has been started or the start failed, which could be the case when the port is already taken by another process. The code however, doesn't completely reflect the graphic as we can't see a building block that handles responses. The reason for this is that the connection already provides this logic by itself. It is a bidirectional flow and not just a unidirectional one as the flows we have seen in the previous examples. As it was the case for materialization, such complex flows shall not be explained here. The official documentation has plenty of material to cover more complex flow graphs. For now it is enough to know that Tcp.IncomingConnection
represents a connection that knows how to receive requests and how to send responses. The part that is still missing is the serverLogic
building block. It can look like this:
Once again, we are able to split the logic in several simple building blocks that all together form the flow of our program. First we want to split our sequence of bytes in lines, which we have to do whenever we find a newline character. After that, the bytes of each line need to be converted to a string because working with raw bytes is cumbersome. Overall we could receive a binary stream of a complicated protocol, which would make working with the incoming raw data extremely challenging. Once we have a readable string, we can create an answer. For simplicity reasons the answer can be anything in our case. In the end, we have to convert back our answer to a sequence of bytes that can be sent over the wire. The code for the entire logic may look like this:
val serverLogic: Flow[ByteString, ByteString, Unit] = {
val delimiter = Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true)
val receiver = Flow[ByteString].map { bytes =>
val message = bytes.utf8String
println(s"Server received: $message")
message
}
val responder = Flow[String].map { message =>
val answer = s"Server hereby responds to message: $message\n"
ByteString(answer)
}
Flow[ByteString]
.via(delimiter)
.via(receiver)
.via(responder)
}
Run Code Online (Sandbox Code Playgroud)
We already know that serverLogic
is a flow that takes a ByteString
and has to produce a ByteString
. With delimiter
we can split a ByteString
in smaller parts - in our case it needs to happen whenever a newline character occurs. receiver
is the flow that takes all of the split byte sequences and converts them to a string. This is of course a dangerous conversion, since only printable ASCII characters should be converted to a string but for our needs it is good enough. responder
is the last component and is responsible for creating an answer and converting the answer back to a sequence of bytes. As opposed to the graphic we didn't split this last component in two, since the logic is trivial. At the end, we connect all of the flows through the via
function. At this point one may ask whether we took care of the multi-user property that was mentioned at the beginning. And indeed we did even though it may not be obvious immediately. By looking at this graphic it should get more clear:
The serverLogic
component is nothing but a flow that contains smaller flows. This component takes an input, which is a request, and produces an output, which is the response. Since flows can be constructed multiple times and they all work independently to each other, we achieve through this nesting our multi-user property. Every request is handled within its own request and therefore a short running request can overrun a previously started long running request. In case you wondered, the definition of serverLogic
that was shown previously can of course be written a lot shorter by inlining most of its inner definitions:
val serverLogic = Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
.map(msg => s"Server hereby responds to message: $msg\n")
.map(ByteString(_))
Run Code Online (Sandbox Code Playgroud)
A test of the web server may look like this:
$ # Client
$ echo "Hello World\nHow are you?" | netcat 127.0.0.1 6666
Server hereby responds to message: Hello World
Server hereby responds to message: How are you?
Run Code Online (Sandbox Code Playgroud)
In order for the above code example to function correctly, we first need to start the server, which is depicted by the startServer
script:
$ # Server
$ ./startServer 127.0.0.1 6666
[DEBUG] Server started, listening on: /127.0.0.1:6666
[DEBUG] Incoming connection from: /127.0.0.1:37972
[DEBUG] Server received: Hello World
[DEBUG] Server received: How are you?
Run Code Online (Sandbox Code Playgroud)
The full code example of this simple TCP server can be found here. We are not only able to write a server with Akka Streams but also the client. It may look like this:
val connection = Tcp().outgoingConnection(address, port)
val flow = Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
.map(println)
.map(_ ? StdIn.readLine("> "))
.map(_+"\n")
.map(ByteString(_))
connection.join(flow).run()
Run Code Online (Sandbox Code Playgroud)
The full code TCP client can be found here. The code looks quite similar but in contrast to the server we don't have to manage the incoming connections anymore.
In the previous sections we have seen how we can construct simple programs out of flows. However, in reality it is often not enough to just rely on already built-in functions to construct more complex streams. If we want to be able to use Akka Streams for arbitrary programs we need to know how to build our own custom control structures and combinable flows that allow us to tackle the complexity of our applications. The good news is that Akka Streams was designed to scale with the needs of the users and in order to give you a short introduction into the more complex parts of Akka Streams, we add some more features to our client/server example.
One thing we can't do yet is closing a connection. At this point it starts to get a little bit more complicated because the stream API we have seen so far doesn't allow us to stop a stream at an arbitrary point. However, there is the GraphStage
abstraction, which can be used to create arbitrary graph processing stages with any number of input or output ports. Let's first have a look at the server side, where we introduce a new component, called closeConnection
:
__PRE__