如何在Scala中处理大型列表?

zfz*_*zfz 1 python jvm scala file out-of-memory

我曾经在Python中处理统计信息.例如,一个大文件包含数千万个ID:

$ cat report_ids | head
3788065
7950319
140494477
182851142
120757318
160033281
78087029
42591118
104363873
212143796
...
Run Code Online (Sandbox Code Playgroud)

在IPython中,以下行始终有效.

In [1]: lines = [line.strip() for  line in open('./report_ids').readlines()]

In [2]: from collections import Counter

In [3]: d = Counter(lines)

In [4]: d[lines[0]]
Out[4]: 9
Run Code Online (Sandbox Code Playgroud)

当我在Scala中尝试相同时,会发生内存不足错误.

val lines = scala.io.Source.fromFile("./report_ids").getLines.toList
lines: List[String] = List(3788065, 7950319, 140494477, 182851142, 120757318, 160033281, 78087029, 42591118, 104363873, 212143796, 175644298, 112703123, 213308679, 109649718, 11947300, 214660563, 83402867, 162877289, 83030111, 78231639, 45129180, 11635655, 34778452, 46604760, 142519099, 213261965, 137812002, 167057636, 119258917, 212722777, 177979907, 13754217, 156769524, 40682536, 202195379, 91879046, 22766751, 6656279, 11972540, 76929862, 91616020, 110579570, 143849021, 27239477, 65146692, 142968764, 153891284, 182405787, 153038108, 50714639, 113386401, 96657813, 75908413, 32215626, 175000692, 154337083, 113754207, 165109267, 3788065, 42285876, 171004203, 109802388, 92956305, 46690091, 103638776, 15141632, 110579570, 120984867, 183167775, 86841540, 60465849, 27239477, 91760184, 213464...

scala> val g = lines.groupBy(e => e).mapValues(x => x.length)
java.lang.OutOfMemoryError: GC overhead limit exceeded
    at scala.collection.immutable.HashMap$HashTrieMap.updated0(HashMap.scala:328)
    at scala.collection.immutable.HashMap$HashTrieMap.updated0(HashMap.scala:326)
    at scala.collection.immutable.HashMap.$plus(HashMap.scala:57)
    at scala.collection.immutable.HashMap.$plus(HashMap.scala:36)
    at scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:28)
    at scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:24)
    at scala.collection.TraversableLike$$anonfun$groupBy$3.apply(TraversableLike.scala:334)
    at scala.collection.TraversableLike$$anonfun$groupBy$3.apply(TraversableLike.scala:333)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
    at scala.collection.TraversableLike$class.groupBy(TraversableLike.scala:333)
    at scala.collection.AbstractTraversable.groupBy(Traversable.scala:105)
    at .<init>(<console>:8)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
    at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
    at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
    at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
Run Code Online (Sandbox Code Playgroud)

然后我在Scala中尝试了懒惰的方法,它仍然无法正常工作.

scala> lazy val lines = scala.io.Source.fromFile("./report_ids").getLines.toList
lines: List[String] = <lazy>

scala> val g = lines.groupBy(e => e).mapValues(x => x.length)
java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOfRange(Arrays.java:2694)
    at java.lang.String.<init>(String.java:203)
    at java.io.BufferedReader.readLine(BufferedReader.java:349)
    at java.io.BufferedReader.readLine(BufferedReader.java:382)
    at scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:67)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:176)
    at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:257)
    at scala.collection.AbstractIterator.toList(Iterator.scala:1157)
    at .lines$lzycompute(<console>:7)
    at .lines(<console>:7)
    at .<init>(<console>:8)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
    at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
    at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
    at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
    at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
Run Code Online (Sandbox Code Playgroud)

那么我怎么能像在Python中一样在Scala中完成这个小组呢?谢谢.

Dan*_*ral 8

我认为在Python和Scala中做的事情有些不同.让我们看看第一行:

lines = [line.strip() for  line in open('./report_ids').readlines()]
Run Code Online (Sandbox Code Playgroud)

在我看来,你正在使用迭代(在Scala术语中),而不是真正的列表.我可能错了 - 我不经常使用Python来记住 - 但让我们假设你是,并看看如何在Scala中获得相同的东西.你有这个:

val lines = scala.io.Source.fromFile("./report_ids").getLines.toList
Run Code Online (Sandbox Code Playgroud)

现在,Scala在标准库中没有可迭代的文件,尽管我认为Scala I/O附带了一个.在这里,你可以这样做一个迭代器(不一样):

val lines = scala.io.Source.fromFile("./report_ids").getLines
Run Code Online (Sandbox Code Playgroud)

只是不要把它变成一个List.:)现在,由于这是一个迭代器,而不是迭代器,一旦你使用它两次就会失败.所以让我们这样写:

def lines = scala.io.Source.fromFile("./report_ids").getLines
Run Code Online (Sandbox Code Playgroud)

现在你可以多次使用"线"了.遗憾的是,您将泄漏文件描述符 - 对于更严重的I/O处理,请查看更严重的I/O库,例如Scalaz Stream或Scala I/O. 或者使用贷款模式.

接下来,用以下Counter代码替换代码:

val g = lines.groupBy(e => e).mapValues(x => x.length)
Run Code Online (Sandbox Code Playgroud)

这将是内存密集型的.这样的事情应该会好得多:

val g = scala.collection.mutable.HashMap.empty[String, Int] withDefaultValue 0
for (line <- lines) g(line) += 1
Run Code Online (Sandbox Code Playgroud)

既然linesIterator,你将无法做到lines(0).对于第一行,您可以这样做lines.next,或者您可以在lines.toStream(0)不将整个文件读入内存的情况下访问索引.