如何开始使用Akka Streams?

kir*_*uku 221 scala akka-stream

Akka Streams库已经提供了大量文档.然而,对我来说,主要的问题是它提供了太多的材料 - 我觉得我必须学习的概念数量让我感到非常不知所措.很多例子显示出非常重量级,并且不能轻易地翻译成现实世界的用例,因此非常深奥.我认为它提供了太多的细节而没有解释如何一起构建所有构建块以及它如何帮助解决特定问题.

有源,汇,流,图阶段,部分图,物化,图DSL和更多,我只是不知道从哪里开始.该快速入门指南,就是要一个首发位置,但我不明白.它只是抛出上面提到的概念而不解释它们.此外,代码示例无法执行 - 缺少部分使我或多或少无法遵循文本.

任何人都可以解释概念源,汇,流,图阶段,部分图,物化以及其他一些我用简单的单词和简单的例子来解释的东西,这些例子不能解释每一个细节(并且可能不需要在一开始)?

kir*_*uku 501

This answer is based on akka-stream version 2.4.2. The API can be slightly different in other versions. The dependency can be consumed by sbt:

libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.4.2"
Run Code Online (Sandbox Code Playgroud)

Alright, lets get started. The API of Akka Streams consists of three main types. In contrast to Reactive Streams, these types are a lot more powerful and therefore more complex. It is assumed that for all the code examples the following definitions already exist:

import scala.concurrent._
import akka._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.util._

implicit val system = ActorSystem("TestSystem")
implicit val materializer = ActorMaterializer()
import system.dispatcher
Run Code Online (Sandbox Code Playgroud)

The import statements are needed for the type declarations. system represents the actor system of Akka and materializer represents the evaluation context of the stream. In our case we use a ActorMaterializer, which means that the streams are evaluated on top of actors. Both values are marked as implicit, which gives the Scala compiler the possibility to inject these two dependencies automatically whenever they are needed. We also import system.dispatcher, which is a execution context for Futures.

A New API

Akka Streams have these key properties:

  • They implement the Reactive Streams specification, whose three main goals backpressure, async and non-blocking boundaries and interoperability between different implementations do fully apply for Akka Streams too.
  • 它们为流的评估引擎提供了抽象,这被称为Materializer.
  • 程序被制定为可重用的构建块,它们被表示为三种主要类型Source,SinkFlow.构建块形成一个图表,其评估基于Materializer并且需要明确触发.

下面将给出如何使用三种主要类型的更深入的介绍.

资源

A Source是数据创建者,它充当流的输入源.每个Source都有一个输出通道,没有输入通道.所有数据都通过输出通道流向连接到的任何数据Source.

资源

图片来自boldradius.com.

A Source可以通过多种方式创建:

scala> val s = Source.empty
s: akka.stream.scaladsl.Source[Nothing,akka.NotUsed] = ...

scala> val s = Source.single("single element")
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...

scala> val s = Source(1 to 3)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val s = Source(Future("single value from a Future"))
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...

scala> s runForeach println
res0: scala.concurrent.Future[akka.Done] = ...
single value from a Future
Run Code Online (Sandbox Code Playgroud)

在上述情况下,我们Source使用有限数据,这意味着它们最终会终止.不应忘记,默认情况下Reactive Streams是惰性和异步的.这意味着显然必须请求流的评估.在Akka Streams中,这可以通过这些run*方法完成.这runForeach与众所周知的foreach功能没有区别- 通过run添加它明确表示我们要求对流进行评估.由于有限数据很无聊,我们继续无限的:

scala> val s = Source.repeat(5)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> s take 3 runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
5
5
5
Run Code Online (Sandbox Code Playgroud)

通过该take方法,我们可以创建一个人为停止点,阻止我们无限期地进行评估.由于actor支持是内置的,我们也可以使用发送给actor的消息轻松地提供流:

def run(actor: ActorRef) = {
  Future { Thread.sleep(300); actor ! 1 }
  Future { Thread.sleep(200); actor ! 2 }
  Future { Thread.sleep(100); actor ! 3 }
}
val s = Source
  .actorRef[Int](bufferSize = 0, OverflowStrategy.fail)
  .mapMaterializedValue(run)

scala> s runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
3
2
1
Run Code Online (Sandbox Code Playgroud)

We can see that the Futures are executed asynchronously on different threads, which explains the result. In the above example a buffer for the incoming elements is not necessary and therefore with OverflowStrategy.fail we can configure that the stream should fail on a buffer overflow. Especially through this actor interface, we can feed the stream through any data source. It doesn't matter if the data is created by the same thread, by a different one, by another process or if they come from a remote system over the Internet.

Sink

A Sink基本上与a相反Source.它是流的端点,因此消耗数据.A Sink具有单个输入通道而没有输出通道.Sinks当我们想要以可重用的方式指定数据收集器的行为而不评估流时,尤其需要.已知的run*方法不允许我们使用这些属性,因此优选使用它Sink.

下沉

图片来自boldradius.com.

实施的简短示例Sink:

scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val sink = Sink.foreach[Int](elem => println(s"sink received: $elem"))
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...

scala> val flow = source to sink
flow: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> flow.run()
res3: akka.NotUsed = NotUsed
sink received: 1
sink received: 2
sink received: 3
Run Code Online (Sandbox Code Playgroud)

使用该方法可以将Sourcea 连接到a .它返回一个所谓的,就像我们稍后会看到一种特殊形式的- 一个可以通过调用它的方法来执行的流.SinktoRunnableFlowFlowrun()

Runnable Flow

图片来自boldradius.com.

当然可以将到达接收器的所有值转发给actor:

val actor = system.actorOf(Props(new Actor {
  override def receive = {
    case msg => println(s"actor received: $msg")
  }
}))

scala> val sink = Sink.actorRef[Int](actor, onCompleteMessage = "stream completed")
sink: akka.stream.scaladsl.Sink[Int,akka.NotUsed] = ...

scala> val runnable = Source(1 to 3) to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> runnable.run()
res3: akka.NotUsed = NotUsed
actor received: 1
actor received: 2
actor received: 3
actor received: stream completed
Run Code Online (Sandbox Code Playgroud)

如果您需要在Akka流和现有系统之间建立连接,那么数据源和接收器是很棒的,但是人们无法对它们做任何事情.流程是Akka Streams基础抽象中最后一个缺失的部分.它们充当不同流之间的连接器,可用于转换其元素.

流

图片来自boldradius.com.

If a Flow is connected to a Source a new Source is the result. Likewise, a Flow connected to a Sink creates a new Sink. And a Flow connected with both a Source and a Sink results in a RunnableFlow. Therefore, they sit between the input and the output channel but by themselves do not correspond to one of the flavors as long as they are not connected to either a Source or a Sink.

全流

Image taken from boldradius.com.

In order to get a better understanding of Flows, we will have a look at some examples:

scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val sink = Sink.foreach[Int](println)
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...

scala> val invert = Flow[Int].map(elem => elem * -1)
invert: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...

scala> val doubler = Flow[Int].map(elem => elem * 2)
doubler: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...

scala> val runnable = source via invert via doubler to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> runnable.run()
res10: akka.NotUsed = NotUsed
-2
-4
-6
Run Code Online (Sandbox Code Playgroud)

Via the via method we can connect a Source with a Flow. We need to specify the input type because the compiler can't infer it for us. As we can already see in this simple example, the flows invert and double are completely independent from any data producers and consumers. They only transform the data and forward it to the output channel. This means that we can reuse a flow among multiple streams:

scala> val s1 = Source(1 to 3) via invert to sink
s1: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> val s2 = Source(-3 to -1) via invert to sink
s2: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> s1.run()
res10: akka.NotUsed = NotUsed
-1
-2
-3

scala> s2.run()
res11: akka.NotUsed = NotUsed
3
2
1
Run Code Online (Sandbox Code Playgroud)

s1 and s2 represent completely new streams - they do not share any data through their building blocks.

Unbounded Data Streams

Before we move on we should first revisit some of the key aspects of Reactive Streams. An unbounded number of elements can arrive at any point and can put a stream in different states. Beside from a runnable stream, which is the usual state, a stream may get stopped either through an error or through a signal that denotes that no further data will arrive. A stream can be modeled in a graphical way by marking events on a timeline as it is the case here:

显示流是按时间顺序排列的一系列正在进行的事件

Image taken from The introduction to Reactive Programming you've been missing.

We have already seen runnable flows in the examples of the previous section. We get a RunnableGraph whenever a stream can actually be materialized, which means that a Sink is connected to a Source. So far we always materialized to the value Unit, which can be seen in the types:

val source: Source[Int, NotUsed] = Source(1 to 3)
val sink: Sink[Int, Future[Done]] = Sink.foreach[Int](println)
val flow: Flow[Int, Int, NotUsed] = Flow[Int].map(x => x)
Run Code Online (Sandbox Code Playgroud)

For Source and Sink the second type parameter and for Flow the third type parameter denote the materialized value. Throughout this answer, the full meaning of materialization shall not be explained. However, further details about materialization can be found at the official documentation. For now the only thing we need to know is that the materialized value is what we get when we run a stream. Since we were only interested in side effects so far, we got Unit as the materialized value. The exception to this was a materialization of a sink, which resulted in a Future. It gave us back a Future, since this value can denote when the stream that is connected to the sink has been ended. So far, the previous code examples were nice to explain the concept but they were also boring because we only dealt with finite streams or with very simple infinite ones. To make it more interesting, in the following a full asynchronous and unbounded stream shall be explained.

ClickStream Example

As an example, we want to have a stream that captures click events. To make it more challenging, let's say we also want to group click events that happen in a short time after each other. This way we could easily discover double, triple or tenfold clicks. Furthermore, we want to filter out all single clicks. Take a deep breath and imagine how you would solve that problem in an imperative manner. I bet no one would be able to implement a solution that works correctly on the first try. In a reactive fashion this problem is trivial to solve. In fact, the solution is so simple and straightforward to implement that we can even express it in a diagram that directly describes the behavior of the code:

点击流示例的逻辑

Image taken from The introduction to Reactive Programming you've been missing.

The gray boxes are functions that describe how one stream is transformed into another. With the throttle function we accumulate clicks within 250 milliseconds, the map and filter functions should be self-explanatory. The color orbs represent an event and the arrows depict how they flow through our functions. Later in the processing steps, we get less and less elements that flow through our stream, since we group them together and filter them out. The code for this image would look something like this:

val multiClickStream = clickStream
    .throttle(250.millis)
    .map(clickEvents => clickEvents.length)
    .filter(numberOfClicks => numberOfClicks >= 2)
Run Code Online (Sandbox Code Playgroud)

The whole logic can be represented in only four lines of code! In Scala, we could write it even shorter:

val multiClickStream = clickStream.throttle(250.millis).map(_.length).filter(_ >= 2)
Run Code Online (Sandbox Code Playgroud)

The definition of clickStream is a little bit more complex but this is only the case because the example program runs on the JVM, where capturing of click events is not easily possible. Another complication is that Akka by default doesn't provide the throttle function. Instead we had to write it by ourselves. Since this function is (as it is the case for the map or filter functions) reusable across different use cases I don't count these lines to the number of lines we needed to implement the logic. In imperative languages however, it is normal that logic can't be reused that easily and that the different logical steps happen all at one place instead of being applied sequentially, which means that we probably would have misshaped our code with the throttling logic. The full code example is available as a gist and shall not be discussed here any further.

SimpleWebServer Example

What should be discussed instead is another example. While the click stream is a nice example to let Akka Streams handle a real world example, it lacks the power to show parallel execution in action. The next example shall represent a small web server that can handle multiple requests in parallel. The web sever shall be able to accept incoming connections and receive byte sequences from them that represent printable ASCII signs. These byte sequences or strings should be split at all newline-characters into smaller parts. After that, the server shall respond to the client with each of the split lines. Alternatively, it could do something else with the lines and give a special answer token, but we want to keep it simple in this example and therefore don't introduce any fancy features. Remember, the server needs to be able to handle multiple requests at the same time, which basically means that no request is allowed to block any other request from further execution. Solving all of these requirements can be hard in an imperative way - with Akka Streams however, we shouldn't need more than a few lines to solve any of these. First, let's have an overview over the server itself:

服务器

Basically, there are only three main building blocks. The first one needs to accept incoming connections. The second one needs to handle incoming requests and the third one needs to send a response. Implementing all of these three building blocks is only a little bit more complicated than implementing the click stream:

def mkServer(address: String, port: Int)(implicit system: ActorSystem, materializer: Materializer): Unit = {
  import system.dispatcher

  val connectionHandler: Sink[Tcp.IncomingConnection, Future[Unit]] =
    Sink.foreach[Tcp.IncomingConnection] { conn =>
      println(s"Incoming connection from: ${conn.remoteAddress}")
      conn.handleWith(serverLogic)
    }

  val incomingCnnections: Source[Tcp.IncomingConnection, Future[Tcp.ServerBinding]] =
    Tcp().bind(address, port)

  val binding: Future[Tcp.ServerBinding] =
    incomingCnnections.to(connectionHandler).run()

  binding onComplete {
    case Success(b) =>
      println(s"Server started, listening on: ${b.localAddress}")
    case Failure(e) =>
      println(s"Server could not be bound to $address:$port: ${e.getMessage}")
  }
}
Run Code Online (Sandbox Code Playgroud)

The function mkServer takes (besides from the address and the port of the server) also an actor system and a materializer as implicit parameters. The control flow of the server is represented by binding, which takes a source of incoming connections and forwards them to a sink of incoming connections. Inside of connectionHandler, which is our sink, we handle every connection by the flow serverLogic, which will be described later. binding returns a Future, which completes when the server has been started or the start failed, which could be the case when the port is already taken by another process. The code however, doesn't completely reflect the graphic as we can't see a building block that handles responses. The reason for this is that the connection already provides this logic by itself. It is a bidirectional flow and not just a unidirectional one as the flows we have seen in the previous examples. As it was the case for materialization, such complex flows shall not be explained here. The official documentation has plenty of material to cover more complex flow graphs. For now it is enough to know that Tcp.IncomingConnection represents a connection that knows how to receive requests and how to send responses. The part that is still missing is the serverLogic building block. It can look like this:

服务器逻辑

Once again, we are able to split the logic in several simple building blocks that all together form the flow of our program. First we want to split our sequence of bytes in lines, which we have to do whenever we find a newline character. After that, the bytes of each line need to be converted to a string because working with raw bytes is cumbersome. Overall we could receive a binary stream of a complicated protocol, which would make working with the incoming raw data extremely challenging. Once we have a readable string, we can create an answer. For simplicity reasons the answer can be anything in our case. In the end, we have to convert back our answer to a sequence of bytes that can be sent over the wire. The code for the entire logic may look like this:

val serverLogic: Flow[ByteString, ByteString, Unit] = {
  val delimiter = Framing.delimiter(
    ByteString("\n"),
    maximumFrameLength = 256,
    allowTruncation = true)

  val receiver = Flow[ByteString].map { bytes =>
    val message = bytes.utf8String
    println(s"Server received: $message")
    message
  }

  val responder = Flow[String].map { message =>
    val answer = s"Server hereby responds to message: $message\n"
    ByteString(answer)
  }

  Flow[ByteString]
    .via(delimiter)
    .via(receiver)
    .via(responder)
}
Run Code Online (Sandbox Code Playgroud)

We already know that serverLogic is a flow that takes a ByteString and has to produce a ByteString. With delimiter we can split a ByteString in smaller parts - in our case it needs to happen whenever a newline character occurs. receiver is the flow that takes all of the split byte sequences and converts them to a string. This is of course a dangerous conversion, since only printable ASCII characters should be converted to a string but for our needs it is good enough. responder is the last component and is responsible for creating an answer and converting the answer back to a sequence of bytes. As opposed to the graphic we didn't split this last component in two, since the logic is trivial. At the end, we connect all of the flows through the via function. At this point one may ask whether we took care of the multi-user property that was mentioned at the beginning. And indeed we did even though it may not be obvious immediately. By looking at this graphic it should get more clear:

服务器和服务器逻辑相结合

The serverLogic component is nothing but a flow that contains smaller flows. This component takes an input, which is a request, and produces an output, which is the response. Since flows can be constructed multiple times and they all work independently to each other, we achieve through this nesting our multi-user property. Every request is handled within its own request and therefore a short running request can overrun a previously started long running request. In case you wondered, the definition of serverLogic that was shown previously can of course be written a lot shorter by inlining most of its inner definitions:

val serverLogic = Flow[ByteString]
  .via(Framing.delimiter(
      ByteString("\n"),
      maximumFrameLength = 256,
      allowTruncation = true))
  .map(_.utf8String)
  .map(msg => s"Server hereby responds to message: $msg\n")
  .map(ByteString(_))
Run Code Online (Sandbox Code Playgroud)

A test of the web server may look like this:

$ # Client
$ echo "Hello World\nHow are you?" | netcat 127.0.0.1 6666
Server hereby responds to message: Hello World
Server hereby responds to message: How are you?
Run Code Online (Sandbox Code Playgroud)

In order for the above code example to function correctly, we first need to start the server, which is depicted by the startServer script:

$ # Server
$ ./startServer 127.0.0.1 6666
[DEBUG] Server started, listening on: /127.0.0.1:6666
[DEBUG] Incoming connection from: /127.0.0.1:37972
[DEBUG] Server received: Hello World
[DEBUG] Server received: How are you?
Run Code Online (Sandbox Code Playgroud)

The full code example of this simple TCP server can be found here. We are not only able to write a server with Akka Streams but also the client. It may look like this:

val connection = Tcp().outgoingConnection(address, port)
val flow = Flow[ByteString]
  .via(Framing.delimiter(
      ByteString("\n"),
      maximumFrameLength = 256,
      allowTruncation = true))
  .map(_.utf8String)
  .map(println)
  .map(_ ? StdIn.readLine("> "))
  .map(_+"\n")
  .map(ByteString(_))

connection.join(flow).run()
Run Code Online (Sandbox Code Playgroud)

The full code TCP client can be found here. The code looks quite similar but in contrast to the server we don't have to manage the incoming connections anymore.

Complex Graphs

In the previous sections we have seen how we can construct simple programs out of flows. However, in reality it is often not enough to just rely on already built-in functions to construct more complex streams. If we want to be able to use Akka Streams for arbitrary programs we need to know how to build our own custom control structures and combinable flows that allow us to tackle the complexity of our applications. The good news is that Akka Streams was designed to scale with the needs of the users and in order to give you a short introduction into the more complex parts of Akka Streams, we add some more features to our client/server example.

One thing we can't do yet is closing a connection. At this point it starts to get a little bit more complicated because the stream API we have seen so far doesn't allow us to stop a stream at an arbitrary point. However, there is the GraphStage abstraction, which can be used to create arbitrary graph processing stages with any number of input or output ports. Let's first have a look at the server side, where we introduce a new component, called closeConnection:

__PRE__
        
  • 这篇文章可能对Stack Overflow的新文档功能有用 - 一旦为Scala打开. (9认同)
  • @Magisch永远记住:"我们不删除好的内容." 我不太确定,但我猜这个答案实际上可能有资格,尽管一切. (7认同)
  • @monksy我不打算在其他地方发布这个.如果需要,请随时在您的博客上重新发布此内容.现在的API在大多数部分都是稳定的,这意味着你甚至可能不需要关心维护(大多数有关Akka Streams的博客文章已经过时,因为它们显示的API不再存在). (4认同)
  • 它不会消失.为什么要这样? (3认同)
  • 这是我见过的最有用的 Stack Overflow 答案之一。我希望这在 Akka 文档中。@sshaef,非常感谢! (3认同)
  • @sschaef它可能会消失,因为问题是偏离主题的,并且已经关闭了. (2认同)