Ram*_*gil 5 scala akka akka-stream
我正在使用我无法控制的java库中的数据发布者.发布商库使用典型的回调设置; 在库代码中的某个地方(库是java,但我将在scala中描述terseness):
type DataType = ???
trait DataConsumer {
def onData(data : DataType) : Unit
}
Run Code Online (Sandbox Code Playgroud)
库的用户需要编写一个实现该onData方法的类并将其传递给a DataProducer,库代码如下所示:
class DataProducer(consumer : DataConsumer) {...}
Run Code Online (Sandbox Code Playgroud)
它DataProducer有自己的内部线程我无法控制,并伴随数据缓冲区,即onData每当有另一个DataType对象要消耗时调用.
所以,我的问题是:如何编写一个将原始库模式转换/转换为akka流源对象的图层?
先感谢您.
回调->源
下面详细说明Endre Varga的答案,该代码将创建DataConsumer回调函数,该函数会将消息发送到akka流中Source。
警告:创建功能性ActorPublish的功能远远超出我在下面指出的范围。特别是,需要进行缓冲以处理DataProducer呼叫onData比Sink信令需求快的情况(请参见本示例)。下面的代码只是设置“接线”。
import akka.actor.ActorRef
import akka.actor.Actor.noSender
import akka.stream.Actor.ActorPublisher
/**Incomplete ActorPublisher, see the example link.*/
class SourceActor extends ActorPublisher[DataType] {
def receive : Receive = {
case message : DataType => deliverBuf() //defined in example link
}
}
class ActorConsumer(sourceActor : ActorRef) extends DataConsumer {
override def onData(data : DataType) = sourceActor.tell(data, noSender)
}
//setup the actor that will feed the stream Source
val sourceActorRef = actorFactory actorOf Props[SourceActor]
//setup the Consumer object that will feed the Actor
val actorConsumer = ActorConsumer(sourceActorRef)
//setup the akka stream Source
val source = Source(ActorPublisher[DataType](sourceActorRef))
//setup the incoming data feed from 3rd party library
val dataProducer = DataProducer(actorConsumer)
Run Code Online (Sandbox Code Playgroud)
回调->全流
最初的问题专门要求对Source的回调,但是如果整个流(不仅是Source)都可用,则处理回调更容易处理。这是因为可以ActorRef使用Source#actorRef函数将流具体化为。举个例子:
val overflowStrategy = akka.stream.OverflowStrategy.dropHead
val bufferSize = 100
val streamRef =
Source
.actorRef[DataType](bufferSize, overflowStrategy)
.via(someFlow)
.to(someSink)
.run()
val streamConsumer = new DataConsumer {
override def onData(data : DataType) : Unit = streamRef ! data
}
val dataProducer = DataProducer(streamConsumer)
Run Code Online (Sandbox Code Playgroud)
有多种方法可以解决这个问题。一种是使用 ActorPublisher:http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M5/scala/stream-integrations.html#Integrating_with_Actors,您可以在其中更改回调,以便它向演员发送一条消息。根据回调的工作方式,您也可以使用 mapAsync(将回调转换为 Future)。仅当一个请求恰好产生一个回调调用时,这才有效。
| 归档时间: |
|
| 查看次数: |
1213 次 |
| 最近记录: |