问题很简单,我很惊讶它在搜索时没有立即弹出.
我有一个需要处理的CSV文件,可能非常大.应将每一行传递给处理器,直到处理完所有行.为了读取CSV文件,我将使用OpenCSV,它实质上提供了readNext()方法,它给了我下一行.如果没有更多行可用,则所有处理器都应终止.
为此,我创建了一个非常简单的groovy脚本,定义了一个同步的readNext()方法(因为下一行的读取并不是非常耗时),然后创建了几个读取下一行并处理它的线程.它工作正常,但......
难道不应该有我可以使用的内置解决方案吗?这不是gpars集合处理,因为它总是假设内存中存在一个现有集合.相反,我无法将其全部读入内存并进行处理,这将导致内存异常.
所以....有一个很好的模板用于使用几个工作线程"逐行"处理CSV文件?
同时访问文件可能不是一个好主意,GPars的fork/join-processing仅适用于内存数据(集合).我的意思是按顺序将文件读入列表.当列表达到一定大小时,使用GPars同时处理列表中的条目,清除列表,然后继续阅读行.
这可能是演员的一个好问题.同步读者actor可以将CSV行移交给并行处理器actor.例如:
@Grab(group='org.codehaus.gpars', module='gpars', version='0.12')
import groovyx.gpars.actor.DefaultActor
import groovyx.gpars.actor.Actor
class CsvReader extends DefaultActor {
void act() {
loop {
react {
reply readCsv()
}
}
}
}
class CsvProcessor extends DefaultActor {
Actor reader
void act() {
loop {
reader.send(null)
react {
if (it == null)
terminate()
else
processCsv(it)
}
}
}
}
def N_PROCESSORS = 10
def reader = new CsvReader().start()
(0..<N_PROCESSORS).collect { new CsvProcessor(reader: reader).start() }*.join()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2722 次 |
| 最近记录: |