Akka Streams库已经提供了大量文档.然而,对我来说,主要的问题是它提供了太多的材料 - 我觉得我必须学习的概念数量让我感到非常不知所措.很多例子显示出非常重量级,并且不能轻易地翻译成现实世界的用例,因此非常深奥.我认为它提供了太多的细节而没有解释如何一起构建所有构建块以及它如何帮助解决特定问题.
有源,汇,流,图阶段,部分图,物化,图DSL和更多,我只是不知道从哪里开始.该快速入门指南,就是要一个首发位置,但我不明白.它只是抛出上面提到的概念而不解释它们.此外,代码示例无法执行 - 缺少部分使我或多或少无法遵循文本.
任何人都可以解释概念源,汇,流,图阶段,部分图,物化以及其他一些我用简单的单词和简单的例子来解释的东西,这些例子不能解释每一个细节(并且可能不需要在一开始)?
我想Source在它上面创建一个和后来的推送元素,如:
val src = ... // create the Source here
// and then, do something like this
pushElement(x1, src)
pushElement(x2, src)
Run Code Online (Sandbox Code Playgroud)
建议的方法是什么?
谢谢!
我目前正在与Akka Stream Kafka合作与kafka互动,我很惊讶与Kafka Streams有什么不同.
我知道基于Akka的方法实现了反应性规范并处理了kafka流似乎缺乏的背压和功能.
使用kafka流比akka溪流kafka有什么好处?
scala stream-processing typesafe akka-stream apache-kafka-streams
akka-http表示使用multipart/form-data编码上传的文件Source[ByteString, Any].我需要使用期望的Java库解组它InputStream.
怎么Source[ByteString, Any]可以变成一个InputStream?
接收器和用户的概念看起来与我类似.此外,我没有看到在反应流规范中明确定义接收器的概念.
我正在尝试使用Source.actorRef方法来创建akka.stream.scaladsl.Source对象.形式的东西
import akka.stream.OverflowStrategy.fail
import akka.stream.scaladsl.Source
case class Weather(zip : String, temp : Double, raining : Boolean)
val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail)
val sunnySource = weatherSource.filter(!_.raining)
...
Run Code Online (Sandbox Code Playgroud)
我的问题是:如何将数据发送到基于ActorRef的Source对象?
我假设向Source发送消息是一种形式
//does not compile
weatherSource ! Weather("90210", 72.0, false)
weatherSource ! Weather("02139", 32.0, true)
Run Code Online (Sandbox Code Playgroud)
但是weatherSource没有!操作员或tell方法.
该文件是不是关于如何使用Source.actorRef太过描述,它只是说,你可以...
提前感谢您的审核和回复.
我有示例代码生成一个未绑定的源并使用它:
对象Main {
def main(args : Array[String]): Unit = {
implicit val system = ActorSystem("Sys")
import system.dispatcher
implicit val materializer = ActorFlowMaterializer()
val source: Source[String] = Source(() => {
Iterator.continually({ "message:" + ThreadLocalRandom.current().nextInt(10000)})
})
source.runForeach((item:String) => { println(item) })
.onComplete{ _ => system.shutdown() }
}
Run Code Online (Sandbox Code Playgroud)
}
我想创建实现的类:
trait MySources {
def addToSource(item: String)
def getSource() : Source[String]
}
Run Code Online (Sandbox Code Playgroud)
我需要使用多个线程,例如:
class MyThread(mySources: MySources) extends Thread {
override def run(): Unit = {
for(i <- 1 to 1000000) { // here will …Run Code Online (Sandbox Code Playgroud) 可以分别使用actor Source.actorPublisher()和Sink.actorSubscriber()方法从actor创建源和接收器.但是有可能创建一个Flow演员吗?
从概念上讲,似乎并没有一个很好的理由,因为它实现了两者ActorPublisher和ActorSubscriber特征,但不幸的是,该Flow对象没有任何方法可以做到这一点.在这篇优秀的博客文章中,它是在早期版本的Akka Streams中完成的,所以问题是它是否也可以在最新版本(2.4.9)中完成.
我需要遍历一个形状像树的API.例如,目录结构或讨论的线程.它可以通过以下流程建模:
type ItemId = Int
type Data = String
case class Item(data: Data, kids: List[ItemId])
def randomData(): Data = scala.util.Random.alphanumeric.take(2).mkString
// 0 => [1, 9]
// 1 => [10, 19]
// 2 => [20, 29]
// ...
// 9 => [90, 99]
// _ => []
// NB. I don't have access to this function, only the itemFlow.
def nested(id: ItemId): List[ItemId] =
if (id == 0) (1 to 9).toList
else if (1 <= id && id <= 9) ((id …Run Code Online (Sandbox Code Playgroud) 从 2.6 开始,我在这一行收到弃用警告:
import akka.stream.ActorMaterializer
implicit val actorMaterializer = ActorMaterializer()
Run Code Online (Sandbox Code Playgroud)
警告:
不推荐使用对象 ActorMaterializer 中的方法应用(自 2.6.0 起):使用具有流属性或配置设置的系统范围的物化器来更改默认值
我不明白该消息,我该怎么办?什么是“系统范围的物化器”,它位于某个 akka 包中?