如何在Scala中编写聚合模式?

yur*_*ura 3 design-patterns scala aggregate-functions

假设我有Iterator[A](大小是无限的)并且我想从中得到Iterator[B]类型A的一些后续值被聚合的地方.

示例:我有字符串列表:

Iterator(
    "START",
    "DATA1",
    "DATA2",
    "DATA3",
    "START",
    "DATA1",
    "DATA2",
    //.. 10^10 more records
)
Run Code Online (Sandbox Code Playgroud)

我想加入从START到NEXT START的字符串.即编写解析器.

Iterator(
"START DATA1 DATA2 DATA3",
"START DATA1 DATA2",
    //.. 10^10 / 5 more records
)
Run Code Online (Sandbox Code Playgroud)

我知道如何强制执行此操作,但我想用scala高阶函数完成它.有任何想法吗?

PS EIP Aggregate http://camel.apache.org/aggregator2.html.

Der*_*att 5

好吧,无限的流会相当戏剧性地改变事物.假设我了解你的其他情况,这应该有效:

def aggregate(it: Iterator[String]) = new Iterator[String] {
  if (it.hasNext) it.next
  def hasNext = it.hasNext
  def next = "START " + (it.takeWhile(_ != "START")).mkString(" ")
}
Run Code Online (Sandbox Code Playgroud)

这样你就可以:

val i = aggregate(yourStream.iterator)
i.take(20).foreach(println) // or whatever
Run Code Online (Sandbox Code Playgroud)


par*_*tic 5

如果你想要一个功能性解决方案,你应该使用Streams而不是迭代器(流是不可变的).这是一种可能的方法:

def aggregate(strs: Stream[String] ) = { 
  aggregateRec( strs )
}

def aggregateRec( strs: Stream[String] ): Stream[String] = {
  val tail = strs.drop(1)
  if( tail.nonEmpty ) {
    val (str, rest ) = accumulate( tail )
    Stream.cons( str, aggregateRec( rest ) )
  }
  else Stream.empty
}

def accumulate( strs: Stream[String] ): (String, Stream[String])  = {
  val first = "START " + strs.takeWhile( _ != "START").mkString(" ")
  val rest = strs.dropWhile( _ != "START" )
  ( first, rest )
}
Run Code Online (Sandbox Code Playgroud)

它按预期工作:

val strs = Stream( "START", "1", "2", "3", "START", "A", "B" )
val strs2 = aggregate( strs )
strs2 foreach println
Run Code Online (Sandbox Code Playgroud)