Ser*_*nov 5 java sockets nio zero-copy
在Java中使用零拷贝将大文件从文件传输到套接字时,我发现了一些奇怪的行为.我的环境:
程序的作用:客户端将输入文件复制到套接字中,服务器使用零复制方法将套接字复制到输出文件:transferFrom和transferTo.如果文件大小相对较大,并非所有字节都到达服务器,在Windows情况下为100Mb +,在Centos情况下为2GB +.客户端和服务器驻留在同一台计算机上,localhost地址用于传输数据.
行为因操作系统而异.在Windows上,客户端成功完成transferTo方法.传输的字节数等于输入文件大小.
long bytesTransferred = fileChannel.transferTo(0, inputFile.length(), socketChannel);
另一方面,服务器报告较少的接收字节数.
long transferFromByteCount = fileChannel.transferFrom(socketChannel, 0, inputFile.length());
在Linux上,即使输入文件大小为4Gb,客户端上的传输也是2Gb.两种配置都有足够的空间.
在Windows上,我能够使用以下解决方法之一传输130Mb文件:1)增加服务器上的接收缓冲区大小和2)在客户端中添加线程睡眠方法.这使我认为当所有字节都发送到套接字发送缓冲区而不是服务器时,客户端上的transferTo方法完成.这些字节是否使它成为服务器是不能保证的,这会给我的用例带来问题.
在Linux上,我能够通过单个transferTo调用传输的最大文件大小是2Gb,但至少客户端报告发送到服务器的正确字节数.
我的问题:客户确保将文件保证交付到服务器,跨平台的最佳方式是什么?在Windows上使用什么机制来模拟sendfile()?
这是代码:
客户端 - ZeroCopyClient.java:
import org.apache.commons.io.FileUtils;
import java.io.*;
import java.net.*;
import java.nio.channels.*;
public class ZeroCopyClient {
public static void main(String[] args) throws IOException, InterruptedException {
final File inputFile = new File(args[0]);
FileInputStream fileInputStream = new FileInputStream(inputFile);
FileChannel fileChannel = fileInputStream.getChannel();
SocketAddress socketAddress = new InetSocketAddress("localhost", 8083);
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(socketAddress);
System.out.println("sending " + inputFile.length() + " bytes to " + socketChannel);
long startTime = System.currentTimeMillis();
long totalBytesTransferred = 0;
while (totalBytesTransferred < inputFile.length()) {
long st = System.currentTimeMillis();
long bytesTransferred = fileChannel.transferTo(totalBytesTransferred, inputFile.length()-totalBytesTransferred, socketChannel);
totalBytesTransferred += bytesTransferred;
long et = System.currentTimeMillis();
System.out.println("sent " + bytesTransferred + " out of " + inputFile.length() + " in " + (et-st) + " millis");
}
socketChannel.finishConnect();
long endTime = System.currentTimeMillis();
System.out.println("sent: totalBytesTransferred= " + totalBytesTransferred + " / " + inputFile.length() + " in " + (endTime-startTime) + " millis");
final File outputFile = new File(inputFile.getAbsolutePath() + ".out");
boolean copyEqual = FileUtils.contentEquals(inputFile, outputFile);
System.out.println("copyEqual= " + copyEqual);
if (args.length > 1) {
System.out.println("sleep: " + args[1] + " millis");
Thread.sleep(Long.parseLong(args[1]));
}
}
}
Run Code Online (Sandbox Code Playgroud)
服务器 - ZeroCopyServer.java:
import java.io.*;
import java.net.*;
import java.nio.channels.*;
import org.apache.commons.io.FileUtils;
public class ZeroCopyServer {
public static void main(String[] args) throws IOException {
final File inputFile = new File(args[0]);
inputFile.delete();
final File outputFile = new File(inputFile.getAbsolutePath() + ".out");
outputFile.delete();
createTempFile(inputFile, Long.parseLong(args[1])*1024L*1024L);
System.out.println("input file length: " + inputFile.length() + " : output file.exists= " + outputFile.exists());
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().setReceiveBufferSize(8*1024*1024);
System.out.println("server receive buffer size: " + serverSocketChannel.socket().getReceiveBufferSize());
serverSocketChannel.socket().bind(new InetSocketAddress("localhost", 8083));
System.out.println("waiting for connection");
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("connected. client channel: " + socketChannel);
FileOutputStream fileOutputStream = new FileOutputStream(outputFile);
FileChannel fileChannel = fileOutputStream.getChannel();
long startTime = System.currentTimeMillis();
long transferFromByteCount = fileChannel.transferFrom(socketChannel, 0, inputFile.length());
long endTime = System.currentTimeMillis();
System.out.println("received: transferFromByteCount= " + transferFromByteCount + " : outputFile= " + outputFile.length() + " : inputFile= " + inputFile.length() + " bytes in " + (endTime-startTime) + " millis");
boolean copyEqual = FileUtils.contentEquals(inputFile, outputFile);
System.out.println("copyEqual= " + copyEqual);
serverSocketChannel.close();
}
private static void createTempFile(File file, long size) throws IOException{
RandomAccessFile f = new RandomAccessFile(file.getAbsolutePath(), "rw");
f.setLength(size);
f.writeDouble(Math.random());
f.close();
}
}
Run Code Online (Sandbox Code Playgroud)
更新1:用循环修复的Linux代码.
更新2:我正在考虑的一种可能的解决方法需要客户端 - 服务器合作.在传输结束时,服务器将接收到的数据的长度写回客户端,客户端以阻塞模式读取它.
服务器响应:
ByteBuffer response = ByteBuffer.allocate(8);
response.putLong(transferFromByteCount);
response.flip();
socketChannel.write(response);
serverSocketChannel.close();
Run Code Online (Sandbox Code Playgroud)
客户端块读取:
ByteBuffer response = ByteBuffer.allocate(8);
socketChannel.read(response);
response.flip();
long totalBytesReceived = response.getLong();
Run Code Online (Sandbox Code Playgroud)
结果,客户端等待字节通过发送和接收套接字缓冲区,实际上等待字节存储在输出文件中.没有必要实施带外确认,客户也无需按照第II.A部分的建议等待.以防止https://linuxnetworkstack.files.wordpress.com/2013/03/paper.pdf文件内容是可变的.
"等待"适当的"时间,然后重写相同的文件部分"
更新3:
一个修改的示例,包含@EJP和@ the8472的修复程序,包括长度和文件校验和验证,没有输出跟踪.请注意,计算大文件的CRC32校验和可能需要几秒钟才能完成.
客户:
import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import org.apache.commons.io.FileUtils;
public class ZeroCopyClient {
public static void main(String[] args) throws IOException {
final File inputFile = new File(args[0]);
FileInputStream fileInputStream = new FileInputStream(inputFile);
FileChannel fileChannel = fileInputStream.getChannel();
SocketAddress socketAddress = new InetSocketAddress("localhost", 8083);
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(socketAddress);
//send input file length and CRC32 checksum to server
long checksumCRC32 = FileUtils.checksumCRC32(inputFile);
ByteBuffer request = ByteBuffer.allocate(16);
request.putLong(inputFile.length());
request.putLong(checksumCRC32);
request.flip();
socketChannel.write(request);
long totalBytesTransferred = 0;
while (totalBytesTransferred < inputFile.length()) {
long bytesTransferred = fileChannel.transferTo(totalBytesTransferred, inputFile.length()-totalBytesTransferred, socketChannel);
totalBytesTransferred += bytesTransferred;
}
//receive output file length and CRC32 checksum from server
ByteBuffer response = ByteBuffer.allocate(16);
socketChannel.read(response);
response.flip();
long totalBytesReceived = response.getLong();
long outChecksumCRC32 = response.getLong();
socketChannel.finishConnect();
System.out.println("CRC32 equal= " + (checksumCRC32 == outChecksumCRC32));
}
}
Run Code Online (Sandbox Code Playgroud)
服务器:
import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import org.apache.commons.io.FileUtils;
public class ZeroCopyServer {
public static void main(String[] args) throws IOException {
final File outputFile = new File(args[0]);
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(8083));
SocketChannel socketChannel = serverSocketChannel.accept();
//read input file length and CRC32 checksum sent by client
ByteBuffer request = ByteBuffer.allocate(16);
socketChannel.read(request);
request.flip();
long length = request.getLong();
long checksumCRC32 = request.getLong();
FileOutputStream fileOutputStream = new FileOutputStream(outputFile);
FileChannel fileChannel = fileOutputStream.getChannel();
long totalBytesTransferFrom = 0;
while (totalBytesTransferFrom < length) {
long transferFromByteCount = fileChannel.transferFrom(socketChannel, totalBytesTransferFrom, length-totalBytesTransferFrom);
if (transferFromByteCount <= 0){
break;
}
totalBytesTransferFrom += transferFromByteCount;
}
long outChecksumCRC32 = FileUtils.checksumCRC32(outputFile);
//write output file length and CRC32 checksum back to client
ByteBuffer response = ByteBuffer.allocate(16);
response.putLong(totalBytesTransferFrom);
response.putLong(outChecksumCRC32);
response.flip();
socketChannel.write(response);
serverSocketChannel.close();
System.out.println("CRC32 equal= " + (checksumCRC32 == outChecksumCRC32));
}
}
Run Code Online (Sandbox Code Playgroud)
解决方案是从 fileChannel.transferFrom 检查写入计数器:
import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import org.apache.commons.io.FileUtils;
public class ZeroCopyServer {
public static void main(String[] args) throws IOException {
final File outputFile = new File(args[0]);
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(8083));
SocketChannel socketChannel = serverSocketChannel.accept();
//read input file length and CRC32 checksum sent by client
ByteBuffer request = ByteBuffer.allocate(16);
socketChannel.read(request);
request.flip();
long length = request.getLong();
long checksumCRC32 = request.getLong();
FileOutputStream fileOutputStream = new FileOutputStream(outputFile);
FileChannel fileChannel = fileOutputStream.getChannel();
long totalBytesTransferFrom = 0;
while (totalBytesTransferFrom < length) {
long transferFromByteCount = fileChannel.transferFrom(socketChannel, totalBytesTransferFrom, length-totalBytesTransferFrom);
if (transferFromByteCount <= 0){
break;
}
totalBytesTransferFrom += transferFromByteCount;
}
long outChecksumCRC32 = FileUtils.checksumCRC32(outputFile);
//write output file length and CRC32 checksum back to client
ByteBuffer response = ByteBuffer.allocate(16);
response.putLong(totalBytesTransferFrom);
response.putLong(outChecksumCRC32);
response.flip();
socketChannel.write(response);
serverSocketChannel.close();
System.out.println("CRC32 equal= " + (checksumCRC32 == outChecksumCRC32));
}
}
Run Code Online (Sandbox Code Playgroud)