使用Scala Iterator使用RegEx匹配将大型流(从字符串)分解成多个块,然后对这些块进行操作?

nba*_*man 7 regex iterator scala stream chunking

我目前正在使用一种不太像scala的方法来解析大型Unix邮箱文件。我仍在学习该语言,并想挑战自己以寻求更好的方法,但是,我不相信我对使用an可以做什么Iterator以及如何有效使用它有扎实的了解。

我目前正在使用 org.apache.james.mime4j,并且使用org.apache.james.mime4j.mboxiterator.MboxIteratorjava.util.Iterator从文件中获取,因此:

 // registers an implementation of a ContentHandler that
 // allows me to construct an object representing an email
 // using callbacks
 val handler: ContentHandler = new MyHandler();

 // creates a parser that parses a SINGLE email from a given InputStream
 val parser: MimeStreamParser = new MimeStreamParser(configBuilder.build());
 // register my handler
 parser.setContentHandler(handler);

 // Get a java.util.Iterator
 val iterator = MboxIterator.fromFile(fileName).build();
 // For each email, process it using above Handler
 iterator.forEach(p => parser.parse(p.asInputStream(Charsets.UTF_8)))
Run Code Online (Sandbox Code Playgroud)

根据我的理解,Scala Iterator更加健壮,并且可能具有更多的能力来处理类似的事情,尤其是因为我无法始终将整个文件放入内存中。

我需要构建自己的版本MboxIterator。我仔细研究了源代码,MboxIterator并找到了一个很好的RegEx模式来确定各个电子邮件的开头,但是,从现在开始,我一直在空白。

我这样创建了RegEx:

 val MESSAGE_START = Pattern.compile(FromLinePatterns.DEFAULT, Pattern.MULTILINE);
Run Code Online (Sandbox Code Playgroud)

我想做的事情(根据我到目前为止的了解):

  • FileInputStream从MBOX文件构建一个。
  • 使用Iterator.continually(stream.read())通过流中读取
  • 使用.takeWhile()继续阅读,直到流的末尾
  • 使用诸如之类的块来分流MESSAGE_START.matcher(someString).find(),或者使用它来查找单独的消息的索引
  • 读取创建的块,或读取创建的索引之间的位

我觉得我应该能够使用map()find()filter()collect()做到这一点,但我得到的事实揭去,他们只给我Ints到与工作。

我将如何完成?

编辑:

在对该主题进行了更多思考之后,我想到了另一种描述我认为需要做的事情的方式:

  1. 我需要继续从流中读取,直到获得与RegEx匹配的字符串为止

  2. 也许group以前读取的字节?

  3. 将其发送到某个地方进行处理

  4. 从某种意义上将其删除,这样下次我遇到匹配项时就不会分组

  5. 继续阅读信息流,直到找到下一个匹配项。

  6. 利润???

编辑2:

我想我越来越近了。使用这样的方法可以让我得到一个迭代器。但是,有两个问题:1.这是否浪费内存?这是否意味着所有内容都已读入内存?2.我还需要找出一种分裂match,但仍然包括在返回的迭代器。

def split[T](iter: Iterator[T])(breakOn: T => Boolean): 
    Iterator[Iterator[T]] =
        new Iterator[Iterator[T]] {
           def hasNext = iter.hasNext

           def next = {
              val cur = iter.takeWhile(!breakOn(_))
              iter.dropWhile(breakOn)
              cur
            }
 }.withFilter(l => l.nonEmpty)  
Run Code Online (Sandbox Code Playgroud)

jwv*_*wvh 2

如果我理解正确的话,您想要惰性地对由正则表达式可识别模式分隔的大文件进行分块。

您可以尝试Iterator为每个请求返回一个,但正确的迭代器管理并不是微不足道的。

我倾向于对客户端隐藏所有文件和迭代器管理。

class MBox(filePath :String) {
  private val file   = io.Source.fromFile(filePath)
  private val itr    = file.getLines().buffered
  private val header = "From .+ \\d{4}".r  //adjust to taste

  def next() :Option[String] =
    if (itr.hasNext) {
      val sb = new StringBuilder()
      sb.append(itr.next() + "\n")
      while (itr.hasNext && !header.matches(itr.head))
        sb.append(itr.next() + "\n")
      Some(sb.mkString)
    } else {
      file.close()
      None
    }
}
Run Code Online (Sandbox Code Playgroud)

测试:

val mbox = new MBox("so.txt")
mbox.next()
//res0: Option[String] =
//Some(From MAILER-DAEMON Fri Jul  8 12:08:34 2011
//some text AAA
//some text BBB
//)

mbox.next()
//res1: Option[String] =
//Some(From MAILER-DAEMON Mon Jun  8 12:18:34 2012
//small text
//)

mbox.next()
//res2: Option[String] =
//Some(From MAILER-DAEMON Tue Jan  8 11:18:14 2013
//some text CCC
//some text DDD
//)

mbox.next()  //res3: Option[String] = None
Run Code Online (Sandbox Code Playgroud)

每个打开的文件只有一个Iterator,并且仅对其调用安全方法。文件文本仅根据请求实现(加载),并且客户端仅获取所请求的内容(如果可用)。如果更适用的话,String您可以将每一行作为集合的一部分返回,Seq[String]而不是一长串中的所有行。


更新:可以修改它以方便迭代。

class MBox(filePath :String) extends Iterator[String] {
  private val file   = io.Source.fromFile(filePath)
  private val itr    = file.getLines().buffered
  private val header = "From .+ \\d{4}".r  //adjust to taste

  def next() :String = {
    val sb = new StringBuilder()
    sb.append(itr.next() + "\n")
    while (itr.hasNext && !header.matches(itr.head))
      sb.append(itr.next() + "\n")
    sb.mkString
  }

  def hasNext: Boolean =
    if (itr.hasNext) true else {file.close(); false}
}
Run Code Online (Sandbox Code Playgroud)

现在你可以.foreach(),,,等等。但你也可以做危险的事情,比如.map()加载整个文件。.flatMap().toList