我对Akka很新,我正在努力学习基础知识.我的用例是不断从JMS队列中读取消息并将每条消息输出到一个新文件.我有基本的设置:
Source<String, NotUsed> jmsSource =
JmsSource
.textSource(JmsSourceSettings
.create(connectionFactory)
.withQueue("myQueue")
.withBufferSize(10));
Sink<ByteString, CompletionStage<IOResult>> fileSink =
FileIO.toFile(new File("random.txt"));
final Flow<String, ByteString, NotUsed> flow = Flow.fromFunction((String n) -> ByteString.fromString(n));
final RunnableGraph<NotUsed> runnable = jmsSource.via(flow).to(fileSink);
runnable.run(materializer);
Run Code Online (Sandbox Code Playgroud)
但是,我希望文件名是动态的(而不是硬编码为"random.txt"):它应该根据队列中每条消息的内容进行更改.当然,我可以在流程中选择文件名,但是如何设置该名称fileSink呢?我该如何最好地设置它?
我创建了一个基于的简单接收器akka.stream.impl.LazySink。我只在成功案例中使用单个元素进行了测试,因此请随意在此处或GitHub Gist发表评论。
import akka.NotUsed
import akka.stream.{Attributes, Inlet, SinkShape}
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.stage._
class OneToOneOnDemandSink[T, +M](sink: T => Sink[T, M]) extends GraphStage[SinkShape[T]] {
val in: Inlet[T] = Inlet("OneToOneOnDemandSink.in")
override val shape = SinkShape(in)
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
override def preStart(): Unit = pull(in)
val awaitingElementHandler = new InHandler {
override def onPush(): Unit = {
val element = grab(in)
val innerSource = createInnerSource(element)
val innerSink = sink(element)
Source.fromGraph(innerSource.source).runWith(innerSink)(subFusingMaterializer)
}
override def onUpstreamFinish(): Unit = completeStage()
override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex)
}
setHandler(in, awaitingElementHandler)
def createInnerSource(element: T): SubSourceOutlet[T] = {
val innerSource = new SubSourceOutlet[T]("OneToOneOnDemandSink.innerSource")
innerSource.setHandler(new OutHandler {
override def onPull(): Unit = {
innerSource.push(element)
innerSource.complete()
if (isClosed(in)) {
completeStage()
} else {
pull(in)
setHandler(in, awaitingElementHandler)
}
}
override def onDownstreamFinish(): Unit = {
innerSource.complete()
if (isClosed(in)) {
completeStage()
}
}
})
setHandler(in, new InHandler {
override def onPush(): Unit = {
val illegalStateException = new IllegalStateException("Got a push that we weren't expecting")
innerSource.fail(illegalStateException)
failStage(illegalStateException)
}
override def onUpstreamFinish(): Unit = {
// We don't stop until the inner stream stops.
setKeepGoing(true)
}
override def onUpstreamFailure(ex: Throwable): Unit = {
innerSource.fail(ex)
failStage(ex)
}
})
innerSource
}
}
}
object OneToOneOnDemandSink {
def apply[T, M](sink: T => Sink[T, M]): Sink[T, NotUsed] = Sink.fromGraph(new OneToOneOnDemandSink(sink))
}
Run Code Online (Sandbox Code Playgroud)
这将为每个元素创建一个新的 Sink,从而避免了很多复杂性,LazySink并且也没有合理的物化值可返回。