nba*_*man 7 regex iterator scala stream chunking
我目前正在使用一种不太像scala的方法来解析大型Unix邮箱文件。我仍在学习该语言,并想挑战自己以寻求更好的方法,但是,我不相信我对使用an可以做什么Iterator以及如何有效使用它有扎实的了解。
我目前正在使用 org.apache.james.mime4j,并且使用org.apache.james.mime4j.mboxiterator.MboxIterator来java.util.Iterator从文件中获取,因此:
// registers an implementation of a ContentHandler that
// allows me to construct an object representing an email
// using callbacks
val handler: ContentHandler = new MyHandler();
// creates a parser that parses a SINGLE email from a given InputStream
val parser: MimeStreamParser = new MimeStreamParser(configBuilder.build());
// register my handler
parser.setContentHandler(handler);
// Get a java.util.Iterator
val iterator = MboxIterator.fromFile(fileName).build();
// For each email, process it using above Handler
iterator.forEach(p => parser.parse(p.asInputStream(Charsets.UTF_8)))
Run Code Online (Sandbox Code Playgroud)
根据我的理解,Scala Iterator更加健壮,并且可能具有更多的能力来处理类似的事情,尤其是因为我无法始终将整个文件放入内存中。
我需要构建自己的版本MboxIterator。我仔细研究了源代码,MboxIterator并找到了一个很好的RegEx模式来确定各个电子邮件的开头,但是,从现在开始,我一直在空白。
我这样创建了RegEx:
val MESSAGE_START = Pattern.compile(FromLinePatterns.DEFAULT, Pattern.MULTILINE);
Run Code Online (Sandbox Code Playgroud)
我想做的事情(根据我到目前为止的了解):
FileInputStream从MBOX文件构建一个。Iterator.continually(stream.read())通过流中读取.takeWhile()继续阅读,直到流的末尾MESSAGE_START.matcher(someString).find(),或者使用它来查找单独的消息的索引我觉得我应该能够使用map(),find(),filter()并collect()做到这一点,但我得到的事实揭去,他们只给我Ints到与工作。
我将如何完成?
编辑:
在对该主题进行了更多思考之后,我想到了另一种描述我认为需要做的事情的方式:
我需要继续从流中读取,直到获得与RegEx匹配的字符串为止
也许group以前读取的字节?
将其发送到某个地方进行处理
从某种意义上将其删除,这样下次我遇到匹配项时就不会分组
继续阅读信息流,直到找到下一个匹配项。
利润???
编辑2:
我想我越来越近了。使用这样的方法可以让我得到一个迭代器。但是,有两个问题:1.这是否浪费内存?这是否意味着所有内容都已读入内存?2.我还需要找出一种分裂的的match,但仍然包括在返回的迭代器。
def split[T](iter: Iterator[T])(breakOn: T => Boolean):
Iterator[Iterator[T]] =
new Iterator[Iterator[T]] {
def hasNext = iter.hasNext
def next = {
val cur = iter.takeWhile(!breakOn(_))
iter.dropWhile(breakOn)
cur
}
}.withFilter(l => l.nonEmpty)
Run Code Online (Sandbox Code Playgroud)
如果我理解正确的话,您想要惰性地对由正则表达式可识别模式分隔的大文件进行分块。
您可以尝试Iterator为每个请求返回一个,但正确的迭代器管理并不是微不足道的。
我倾向于对客户端隐藏所有文件和迭代器管理。
class MBox(filePath :String) {
private val file = io.Source.fromFile(filePath)
private val itr = file.getLines().buffered
private val header = "From .+ \\d{4}".r //adjust to taste
def next() :Option[String] =
if (itr.hasNext) {
val sb = new StringBuilder()
sb.append(itr.next() + "\n")
while (itr.hasNext && !header.matches(itr.head))
sb.append(itr.next() + "\n")
Some(sb.mkString)
} else {
file.close()
None
}
}
Run Code Online (Sandbox Code Playgroud)
测试:
val mbox = new MBox("so.txt")
mbox.next()
//res0: Option[String] =
//Some(From MAILER-DAEMON Fri Jul 8 12:08:34 2011
//some text AAA
//some text BBB
//)
mbox.next()
//res1: Option[String] =
//Some(From MAILER-DAEMON Mon Jun 8 12:18:34 2012
//small text
//)
mbox.next()
//res2: Option[String] =
//Some(From MAILER-DAEMON Tue Jan 8 11:18:14 2013
//some text CCC
//some text DDD
//)
mbox.next() //res3: Option[String] = None
Run Code Online (Sandbox Code Playgroud)
每个打开的文件只有一个Iterator,并且仅对其调用安全方法。文件文本仅根据请求实现(加载),并且客户端仅获取所请求的内容(如果可用)。如果更适用的话,String您可以将每一行作为集合的一部分返回,Seq[String]而不是一长串中的所有行。
更新:可以修改它以方便迭代。
class MBox(filePath :String) extends Iterator[String] {
private val file = io.Source.fromFile(filePath)
private val itr = file.getLines().buffered
private val header = "From .+ \\d{4}".r //adjust to taste
def next() :String = {
val sb = new StringBuilder()
sb.append(itr.next() + "\n")
while (itr.hasNext && !header.matches(itr.head))
sb.append(itr.next() + "\n")
sb.mkString
}
def hasNext: Boolean =
if (itr.hasNext) true else {file.close(); false}
}
Run Code Online (Sandbox Code Playgroud)
现在你可以.foreach(),,,等等。但你也可以做危险的事情,比如.map()加载整个文件。.flatMap().toList