如何最有效地使用Groovy/GPars处理CSV文件的行?

Sve*_*ges 10 groovy

问题很简单,我很惊讶它在搜索时没有立即弹出.

我有一个需要处理的CSV文件,可能非常大.应将每一行传递给处理器,直到处理完所有行.为了读取CSV文件,我将使用OpenCSV,它实质上提供了readNext()方法,它给了我下一行.如果没有更多行可用,则所有处理器都应终止.

为此,我创建了一个非常简单的groovy脚本,定义了一个同步的readNext()方法(因为下一行的读取并不是非常耗时),然后创建了几个读取下一行并处理它的线程.它工作正常,但......

难道不应该有我可以使用的内置解决方案吗?这不是gpars集合处理,因为它总是假设内存中存在一个现有集合.相反,我无法将其全部读入内存并进行处理,这将导致内存异常.

所以....有一个很好的模板用于使用几个工作线程"逐行"处理CSV文件?

Chr*_*orf 6

同时访问文件可能不是一个好主意,GPars的fork/join-processing仅适用于内存数据(集合).我的意思是按顺序将文件读​​入列表.当列表达到一定大小时,使用GPars同时处理列表中的条目,清除列表,然后继续阅读行.


ata*_*lor 5

这可能是演员的一个好问题.同步读者actor可以将CSV行移交给并行处理器actor.例如:

@Grab(group='org.codehaus.gpars', module='gpars', version='0.12')

import groovyx.gpars.actor.DefaultActor
import groovyx.gpars.actor.Actor

class CsvReader extends DefaultActor {
    void act() {
        loop {
            react {
                reply readCsv()
            }
        }
    }
}

class CsvProcessor extends DefaultActor {
    Actor reader
    void act() {
        loop {
            reader.send(null)
            react {
                if (it == null)
                    terminate()
                else
                    processCsv(it)
            }
        }
    }
}

def N_PROCESSORS = 10
def reader = new CsvReader().start()
(0..<N_PROCESSORS).collect { new CsvProcessor(reader: reader).start() }*.join()
Run Code Online (Sandbox Code Playgroud)