当高吞吐量(3GB/s)文件系统可用时,如何在 Java 中使用多线程读取文件

san*_*hah 4 java io file-io multithreading

我知道对于普通的主轴驱动系统,使用多线程读取文件效率低下。

这是一个不同的情况,我有一个高吞吐量的文件系统可供我使用,它提供高达 3GB/s 的读取速度,具有 196 个 CPU 内核和 2TB RAM

单线程 Java 程序以最大 85-100 MB/s 的速度读取文件,因此我有可能比单线程更好。我必须读取大小为 1TB 的文件,并且我有足够的 RAM 来加载它。

目前我使用以下或类似的东西,但需要用多线程编写一些东西以获得更好的吞吐量:

Java 7 文件:50 MB/s

List<String> lines = Files.readAllLines(Paths.get(path), encoding);
Run Code Online (Sandbox Code Playgroud)

Java commons-io:48 MB/s

List<String> lines = FileUtils.readLines(new File("/path/to/file.txt"), "utf-8");
Run Code Online (Sandbox Code Playgroud)

与番石榴相同:45 MB/s

List<String> lines = Files.readLines(new File("/path/to/file.txt"), Charset.forName("utf-8"));
Run Code Online (Sandbox Code Playgroud)

Java 扫描程序类:非常慢

Scanner s = new Scanner(new File("filepath"));
ArrayList<String> list = new ArrayList<String>();
while (s.hasNext()){
    list.add(s.next());
}
s.close();
Run Code Online (Sandbox Code Playgroud)

我希望能够以正确的排序顺序加载文件并尽可能快地构建相同的 ArrayList。

还有另一个问题读起来相似,但实际上不同,因为:问题是讨论多线程文件 I/O 在物理上不可能高效的系统,但由于技术进步,我们现在拥有的系统是旨在支持高吞吐量 I/O ,因此限制因素是 CPU/SW ,这可以通过 I/O 多线程来克服。

另一个问题没有回答如何为多线程 I/O 编写代码。

san*_*hah 5

这是使用多个线程读取单个文件的解决方案。

将文件分成N个chunk,在一个线程中读取每个chunk,然后按顺序合并。当心跨越块边界的线。这是用户slaks建议的基本思想

下面对单个 20 GB 文件的多线程实现进行基准测试:

1 个线程:50 秒:400 MB/s

2 个线程:30 秒:666 MB/s

4 线程:20 秒:1GB/s

8 线程:60 秒:333 MB/s

等效的 Java7 readAllLines() :400 秒:50 MB/s

注意:这可能仅适用于旨在支持高吞吐量 I/O 的系统,而不适用于普通的个人计算机

package filereadtests;

import java.io.*;
import static java.lang.Math.toIntExact;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FileRead implements Runnable
{

private FileChannel _channel;
private long _startLocation;
private int _size;
int _sequence_number;

public FileRead(long loc, int size, FileChannel chnl, int sequence)
{
    _startLocation = loc;
    _size = size;
    _channel = chnl;
    _sequence_number = sequence;
}

@Override
public void run()
{
    try
    {
        System.out.println("Reading the channel: " + _startLocation + ":" + _size);

        //allocate memory
        ByteBuffer buff = ByteBuffer.allocate(_size);

        //Read file chunk to RAM
        _channel.read(buff, _startLocation);

        //chunk to String
        String string_chunk = new String(buff.array(), Charset.forName("UTF-8"));

        System.out.println("Done Reading the channel: " + _startLocation + ":" + _size);

    } catch (Exception e)
    {
        e.printStackTrace();
    }
}

//args[0] is path to read file
//args[1] is the size of thread pool; Need to try different values to fing sweet spot
public static void main(String[] args) throws Exception
{
    FileInputStream fileInputStream = new FileInputStream(args[0]);
    FileChannel channel = fileInputStream.getChannel();
    long remaining_size = channel.size(); //get the total number of bytes in the file
    long chunk_size = remaining_size / Integer.parseInt(args[1]); //file_size/threads

    //Max allocation size allowed is ~2GB
    if (chunk_size > (Integer.MAX_VALUE - 5))
    {
        chunk_size = (Integer.MAX_VALUE - 5);
    }

    //thread pool
    ExecutorService executor = Executors.newFixedThreadPool(Integer.parseInt(args[1]));

    long start_loc = 0;//file pointer
    int i = 0; //loop counter
    while (remaining_size >= chunk_size)
    {
        //launches a new thread
        executor.execute(new FileRead(start_loc, toIntExact(chunk_size), channel, i));
        remaining_size = remaining_size - chunk_size;
        start_loc = start_loc + chunk_size;
        i++;
    }

    //load the last remaining piece
    executor.execute(new FileRead(start_loc, toIntExact(remaining_size), channel, i));

    //Tear Down
    executor.shutdown();

    //Wait for all threads to finish
    while (!executor.isTerminated())
    {
        //wait for infinity time
    }
    System.out.println("Finished all threads");
    fileInputStream.close();
}

}
Run Code Online (Sandbox Code Playgroud)