Java : Out Of Memory Error when my application runs for longer time

SUD*_*HAN 12 java multithreading amazon-s3

I have a java application where i take files very small file (1KB) but large number of small file like in a minute i.e i am getting 20000 files in a minute. I am taking file and uploading into S3 .

I am running this in 10 parallel threads . Also i have to continuously run this application .

When this application runs for some days i get Out of memory error.

This is the exact error i get

#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (malloc) failed to allocate 347376 bytes for Chunk::new
# Possible reasons:
#   The system is out of physical RAM or swap space
#   In 32 bit mode, the process size limit was hit
# Possible solutions:
#   Reduce memory load on the system
#   Increase physical memory or swap space
#   Check if swap backing store is full
#   Use 64 bit Java on a 64 bit OS
#   Decrease Java heap size (-Xmx/-Xms)
#   Decrease number of Java threads
#   Decrease Java thread stack sizes (-Xss)
#   Set larger code cache with -XX:ReservedCodeCacheSize=
# This output file may be truncated or incomplete.
#
#  Out of Memory Error (allocation.cpp:390), pid=6912, tid=0x000000000003ec8c
#
# JRE version: Java(TM) SE Runtime Environment (8.0_181-b13) (build 1.8.0_181-b13)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.181-b13 mixed mode windows-amd64 compressed oops)
# Core dump written. Default location: d:\S3FileUploaderApp\hs_err_pid6912.mdmp
#
Run Code Online (Sandbox Code Playgroud)

Here is my java classes . I am copying all classes so that it would be easy to investigate .

This is My Java Visual VM report Image

在此处输入图片说明

Adding My Sample Output

在此处输入图片说明

Updating Metaspace Image

在此处输入图片说明

This is my Main class

  public class UploadExecutor {
    private static Logger _logger = Logger.getLogger(UploadExecutor.class);

    public static void main(String[] args) {
        _logger.info("----------STARTING JAVA MAIN METHOD----------------- ");
        /*
         * 3 C:\\Users\\u6034690\\Desktop\\TWOFILE\\xml
         * a205381-tr-fr-production-us-east-1-trf-auditabilty
         */
        final int batchSize = 100;
        while (true) {
            String strNoOfThreads = args[0];
            String strFileLocation = args[1];
            String strBucketName = args[2];
            int iNoOfThreads = Integer.parseInt(strNoOfThreads);
            S3ClientManager s3ClientObj = new S3ClientManager();
            AmazonS3Client s3Client = s3ClientObj.buildS3Client();
            try {
                FileProcessThreads fp = new FileProcessThreads();
                File[] files = fp.getFiles(strFileLocation);
                try {
                    _logger.info("No records found will wait for 10 Seconds");
                    TimeUnit.SECONDS.sleep(10);
                    files = fp.getFiles(strFileLocation);
                    ArrayList<File> batchFiles = new ArrayList<File>(batchSize);
                    if (null != files) {
                        for (File path : files) {
                            String fileType = FilenameUtils.getExtension(path.getName());
                            long fileSize = path.length();
                            if (fileType.equals("gz") && fileSize > 0) {
                                batchFiles.add(path);
                            }
                            if (batchFiles.size() == batchSize) {
                                BuildThread BuildThreadObj = new BuildThread();
                                BuildThreadObj.buildThreadLogic(iNoOfThreads, s3Client, batchFiles, strFileLocation,
                                        strBucketName);
                                _logger.info("---Batch One got completed---");
                                batchFiles.clear();
                            }
                        }
                    }
                    // to consider remaining or files with count<batch size
                    if (!batchFiles.isEmpty()) {
                        BuildThread BuildThreadObj = new BuildThread();
                        BuildThreadObj.buildThreadLogic(iNoOfThreads, s3Client, batchFiles, strFileLocation,
                                strBucketName);
                        batchFiles.clear();
                    }
                } catch (InterruptedException e) {
                    _logger.error("InterruptedException: " + e.toString());
                }
            } catch (Throwable t) {
                _logger.error("InterruptedException: " + t.toString());
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

This is the class where i build Threads and shutdown executor . So for every run i create new Executor service .

   public class BuildThread {

    private static Logger _logger = Logger.getLogger(BuildThread.class);

    public  void buildThreadLogic(int iNoOfThreads,AmazonS3Client s3Client, List<File> records,String strFileLocation,String strBucketName) {


        _logger.info("Calling buildThreadLogic method of BuildThread class");

        final ExecutorService executor = Executors.newFixedThreadPool(iNoOfThreads);
        int recordsInEachThraed = (int) (records.size() / iNoOfThreads);
        int threadIncr=2;
        int recordsInEachThreadStart=0;
        int recordsInEachThreadEnd=0;

        for (int i = 0; i < iNoOfThreads; i++) {
            if (i==0){
                recordsInEachThreadEnd=recordsInEachThraed;
            }
            if (i==iNoOfThreads-1){
                recordsInEachThreadEnd=records.size();
            }

            Runnable worker = new UploadObject(records.subList(recordsInEachThreadStart, recordsInEachThreadEnd), s3Client,strFileLocation,strBucketName);
            executor.execute(worker);
            recordsInEachThreadStart=recordsInEachThreadEnd;
            recordsInEachThreadEnd=recordsInEachThraed*(threadIncr);
            threadIncr++;
        }

        executor.shutdown();
        while (!executor.isTerminated()) {
        }

        _logger.info("Existing buildThreadLogic method");
    }

}
Run Code Online (Sandbox Code Playgroud)

And this is the class where i upload my Files into S3 and have run method

   public class UploadObject implements Runnable {

    private static Logger _logger;
    List<File> records;
    AmazonS3Client s3Client;
    String fileLocation;
    String strBucketName;

    UploadObject(List<File> list, AmazonS3Client s3Client, String  fileLocation, String strBucketName) {
        this.records = list;
        this.s3Client = s3Client;
        this.fileLocation=fileLocation;
        this.strBucketName=strBucketName;

        _logger = Logger.getLogger(UploadObject.class);
    }

    public void run() {
        uploadToToS3();
    }

    public void uploadToToS3() {
        _logger.info("Number of record to be uploaded  in current thread: : " + records.size());
        TransferManager tm = new TransferManager(s3Client);
        final MultipleFileUpload upload = tm.uploadFileList(strBucketName, "", new File(fileLocation), records);
        try {
            upload.waitForCompletion();
        } catch (AmazonServiceException e1) {
            _logger.error("AmazonServiceException " + e1.getErrorMessage());
            System.exit(1);
        } catch (AmazonClientException e1) {
            _logger.error("AmazonClientException " +  e1.getMessage());
            System.exit(1);
        } catch (InterruptedException e1) {
            _logger.error("InterruptedException " +  e1.getMessage());
            System.exit(1);
        } finally {
            _logger.info("--Calling TransferManager ShutDown--");
            tm.shutdownNow(false);

        }

        CleanUp CleanUpObj=new CleanUp();
        CleanUpObj.deleteUploadedFile(upload,records);
    }
}
Run Code Online (Sandbox Code Playgroud)

This class used to create S3 client manager

public class S3ClientManager {

    private static Logger _logger = Logger.getLogger(S3ClientManager.class);

    public  AmazonS3Client buildS3Client() {

        _logger.info("Calling buildS3Client method of S3ClientManager class");
        AWSCredentials credential = new ProfileCredentialsProvider("TRFAuditability-Prod-ServiceUser").getCredentials();
        AmazonS3Client s3Client = (AmazonS3Client) AmazonS3ClientBuilder.standard().withRegion("us-east-1")
                .withCredentials(new AWSStaticCredentialsProvider(credential)).withForceGlobalBucketAccessEnabled(true)
                .build();
        s3Client.getClientConfiguration().setMaxConnections(5000);
        s3Client.getClientConfiguration().setConnectionTimeout(6000);
        s3Client.getClientConfiguration().setSocketTimeout(30000);
        _logger.info("Exiting  buildS3Client method of S3ClientManager class");
        return s3Client;
    }
}
Run Code Online (Sandbox Code Playgroud)

This is where i get files .

public class FileProcessThreads {

    public  File[] getFiles(String fileLocation)  {

        File dir = new File(fileLocation);
        File[] directoryListing = dir.listFiles();
        if (directoryListing.length > 0)
            return directoryListing;
       return null;

    }
} 
Run Code Online (Sandbox Code Playgroud)

Ser*_*Ten 11

很抱歉没有解决关于内存泄漏的原始问题,但是您的方法对我来说似乎完全是有缺陷的。的System.exit()调用UploadObject可能是资源泄漏的原因,但这仅仅是开始。Amazon S3 TransferManager已经具有内部执行程序服务,因此您不需要自己的多线程控制器。我看不到如何授予每个文件一次上传一次的权限。您进行多次上载调用,然后删除所有文件,而不考虑上载过程中是否出现故障,因此文件不在S3中。您尝试在执行程序之间分发文件,这是不必要的。在之上添加更多线程TransferManager ExecutorService不会提高您的性能,只会导致崩溃。

我会采用不同的方法。

首先是一个非常简单的主类,它仅启动一个工作线程并等待其完成。

public class S3Uploader {

    public static void main(String[] args) throws Exception {
        final String strNoOfThreads = args[0];
        final String strFileLocation = args[1];
        final String strBucketName = args[2];

        // Maximum number of file names that are read into memory
        final int maxFileQueueSize = 5000;

        S3UploadWorkerThread worker = new S3UploadWorkerThread(strFileLocation, strBucketName, Integer.parseInt(strNoOfThreads), maxFileQueueSize);

        worker.run();

        System.out.println("Uploading files, press any key to stop.");
        System.in.read();

        // Gracefully halt the worker thread waiting for any ongoing uploads to finish
        worker.finish();

        // Exit the main thread only after the worker thread has terminated
        worker.join();

    }

}
Run Code Online (Sandbox Code Playgroud)

辅助线程将使用Semaphore来限制发送到的上传次数TransferManager,一个自定义文件名队列FileEnqueue以从源目录中不断读取文件,并使用a ProgressListener跟踪每次上传的进度。如果循环用尽了要从源目录读取的文件,它将等待十秒钟并重试。甚至文件队列也可能是不必要的。仅列出while工作线程循环内的文件就足够了。

import java.io.File;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;

import com.amazonaws.AmazonClientException;

import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import com.amazonaws.services.s3.transfer.Upload;

public class S3UploadWorkerThread extends Thread {

    private final String sourceDir;
    private final String targetBucket;
    private final int maxQueueSize;
    private final AmazonS3Client s3Client;

    private Semaphore uploadLimiter;
    private boolean running;

    public final long SLEEP_WHEN_NO_FILES_AVAILABLE_MS = 10000l; // 10 seconds

    public S3UploadWorkerThread(final String sourceDir, final String targetBucket, final int maxConcurrentUploads, final int maxQueueSize) {
        this.running = false;
        this.sourceDir = sourceDir.endsWith(File.separator) ? sourceDir: sourceDir + File.separator;
        this.targetBucket = targetBucket;
        this.maxQueueSize = maxQueueSize;
        this.s3Client = S3ClientManager.buildS3Client();
        this.uploadLimiter = new Semaphore(maxConcurrentUploads);
    }

    public void finish() {
        running = false;
    }

    @Override
    public void run() {
        running = true;

        final Map<String, Upload> ongoingUploads = new ConcurrentHashMap<>();
        final FileEnqueue queue = new FileEnqueue(sourceDir, maxQueueSize);
        final TransferManager tm = TransferManagerBuilder.standard().withS3Client(s3Client).build();

        while (running) {
            // Get a file name from the in memory queue
            final String fileName = queue.poll();
            if (fileName!=null) {
                try {
                    // Limit the number of concurrent uploads
                    uploadLimiter.acquire();
                    File fileObj = new File(sourceDir + fileName);
                    // Create an upload listener
                    UploadListener onComplete = new UploadListener(fileObj, queue, ongoingUploads, uploadLimiter);
                    try {
                        Upload up = tm.upload(targetBucket, fileName, fileObj);
                        up.addProgressListener(onComplete);
                        // ongoingUploads is  used later to wait for ongoing uploads in case a finish() is requested
                        ongoingUploads.put(fileName, up);
                    } catch (AmazonClientException e) {
                        System.err.println("AmazonClientException " +  e.getMessage());
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                // poll() returns null when the source directory is empty then wait for a number of seconds
                try {
                    Thread.sleep(SLEEP_WHEN_NO_FILES_AVAILABLE_MS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } // fi
        } // wend

        // Wait for ongoing uploads to finish before exiting ending the worker thread
        for (Map.Entry<String,Upload> e : ongoingUploads.entrySet()) {
            try {
                e.getValue().waitForCompletion();
            } catch (AmazonClientException | InterruptedException x) {
                System.err.println(x.getClass().getName() + " at " + e.getKey());
            }
        } // next

        tm.shutdownNow();
    }
}
Run Code Online (Sandbox Code Playgroud)

上的UploadListener发布许可会Semaphore在上传完成后通知文件队列,并跟踪正在进行的上传,如果用户请求停止订购,则必须等待这些上传。使用,ProgressListener您可以分别跟踪每个成功或失败的上传。

import java.io.File;

import java.util.Map;
import java.util.concurrent.Semaphore;

import com.amazonaws.event.ProgressEvent;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.services.s3.transfer.Upload;

public class UploadListener implements ProgressListener {

    private final File fileObj;
    private final FileEnqueue queue;
    private final Map<String, Upload> ongoingUploads;
    private final Semaphore uploadLimiter;

    public UploadListener(File fileObj, FileEnqueue queue, Map<String, Upload> ongoingUploads, Semaphore uploadLimiter) {
        this.fileObj = fileObj;
        this.queue = queue;
        this.ongoingUploads = ongoingUploads;
        this.uploadLimiter = uploadLimiter;
    }

    @Override
    public void progressChanged(ProgressEvent event) {

        switch(event.getEventType()) {

        case TRANSFER_STARTED_EVENT :
            System.out.println("Started upload of file " + fileObj.getName());
            break;

        case TRANSFER_COMPLETED_EVENT:
            /* Upon a successful upload:
             * 1. Delete the file from disk
             * 2. Notify the file name queue that the file is done
             * 3. Remove it from the map of ongoing uploads
             * 4. Release the semaphore permit
             */
            fileObj.delete();
            queue.done(fileObj.getName());
            ongoingUploads.remove(fileObj.getName());
            uploadLimiter.release();
            System.out.println("Successfully finished upload of file " + fileObj.getName());
            break;

        case TRANSFER_FAILED_EVENT:
            queue.done(fileObj.getName());
            ongoingUploads.remove(fileObj.getName());
            uploadLimiter.release();
            System.err.println("Failed upload of file " + fileObj.getName());
            break;

        default:
            // do nothing
        }       
    }
}
Run Code Online (Sandbox Code Playgroud)

这是文件队列的bolierplate示例:

import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.ConcurrentSkipListSet;

public class FileEnqueue {

    private final String sourceDir;
    private final ConcurrentSkipListSet<FileItem> seen;
    private final ConcurrentSkipListSet<String> processing;
    private final int maxSeenSize;

    public FileEnqueue(final String sourceDirectory, int maxQueueSize) {
        sourceDir = sourceDirectory;
        maxSeenSize = maxQueueSize;
        seen = new ConcurrentSkipListSet<FileItem>();
        processing = new ConcurrentSkipListSet<>();
    }

    public synchronized String poll() {
        if (seen.size()==0)
            enqueueFiles();
        FileItem fi = seen.pollFirst();
        if (fi==null) {
            return null;
        } else {
            processing.add(fi.getName());
            return fi.getName();
        }
    }

    public void done(final String fileName) {
        processing.remove(fileName);
    }

    private void enqueueFiles() {
        final FileFilter gzFilter = new GZFileFilter();
        final File dir = new File(sourceDir);

        if (!dir.exists() ) {
            System.err.println("Directory " +  sourceDir + " not found");
        } else if (!dir.isDirectory() ) {
            System.err.println(sourceDir + " is not a directory");
        } else {

            final File [] files = dir.listFiles(gzFilter);

            if (files!=null) {

                // How many more file names can we read in memory
                final int spaceLeft = maxSeenSize - seen.size();

                // How many new files will be read into memory
                final int maxNewFiles = files.length<maxSeenSize ? files.length : spaceLeft;

                for (int f=0, enqueued=0; f<files.length && enqueued<maxNewFiles; f++) {
                    File fl = files[f];
                    FileItem fi = new FileItem(fl);
                    // Do not put into the queue any file which has been already seen or is processing
                    if (!seen.contains(fi) && !processing.contains(fi.getName())) {
                        seen.add(fi);
                        enqueued++;
                    }
                } // next
            }
        } // fi
    }

    private class GZFileFilter implements FileFilter {

        @Override
        public boolean accept(File f) {
            final String fname = f.getName().toLowerCase();         
            return f.isFile() && fname.endsWith(".gz") && f.length()>0L;
        }   
    }

}
Run Code Online (Sandbox Code Playgroud)

最后是您的S3ClientManager:

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;

public class S3ClientManager {

    public static AmazonS3Client buildS3Client() {

        AWSCredentials credential = new ProfileCredentialsProvider("TRFAuditability-Prod-ServiceUser").getCredentials();
        AmazonS3Client s3Client = (AmazonS3Client) AmazonS3ClientBuilder.standard().withRegion("us-east-1")
                .withCredentials(new AWSStaticCredentialsProvider(credential)).withForceGlobalBucketAccessEnabled(true)
                .build();
        s3Client.getClientConfiguration().setMaxConnections(5000);
        s3Client.getClientConfiguration().setConnectionTimeout(6000);
        s3Client.getClientConfiguration().setSocketTimeout(30000);

        return s3Client;
    }
}
Run Code Online (Sandbox Code Playgroud)

更新30/04/2019添加FileItem类

import java.io.File;
import java.util.Comparator;

public class FileItem implements Comparable {

    private final String name;
    private final long dateSeen;

    public FileItem(final File file) {
        this.name = file.getName();
        this.dateSeen = System.currentTimeMillis();
    }

    public String getName() {
        return name;
    }

    public long getDateSeen() {
        return dateSeen;
    }

    @Override
    public int compareTo(Object otherObj) {
        FileItem otherFileItem = (FileItem) otherObj;
        if (getDateSeen()==otherFileItem.getDateSeen())
            return getName().compareTo(otherFileItem.getName());
        else if (getDateSeen()<otherFileItem.getDateSeen())
            return -1;
        else
            return 1;
    }

    @Override
    public boolean equals(Object otherFile) {
        return getName().equals(((FileItem) otherFile).getName());
    }

    @Override
    public int hashCode() {
        return getName().hashCode();
    }

    public static final class CompareFileItems implements Comparator {

        @Override
        public int compare(Object fileItem1, Object fileItem2) {
            return ((FileItem) fileItem1).compareTo(fileItem2);
        }
    }

}
Run Code Online (Sandbox Code Playgroud)


saw*_*ing 3

您使用的是哪个版本的 Java?您为垃圾收集器设置的参数是什么?最近,我遇到了运行默认设置的 Java 8 应用程序的问题,随着时间的推移,它们会耗尽服务器可用的所有内存。我通过向每个应用程序添加以下参数来修复此问题:

  • -XX:+UseG1GC- 使应用程序使用 G1 垃圾收集器。
  • -Xms32M- 将最小堆大小设置为 32mb
  • -Xmx512M- 将最大堆大小设置为 512mb
  • -XX:MinHeapFreeRatio=20- 扩大堆大小时自由设置最小堆比率
  • -XX:MaxHeapFreeRatio=40- 缩小堆大小时自由设置最大堆比率

请注意,在配置这些参数之前,您应该了解应用程序的内存要求和行为,以避免出现重大性能问题。

发生的情况是 Java 会不断从服务器分配更多内存,直到达到最大堆大小。之后,它将运行垃圾收集以尝试释放其内存中的空间。这意味着我们有 16 个微服务的大小随着时间的推移而自然增加,而没有垃圾收集,因为它们从未达到默认的最大值 4GB。在此之前,服务器就耗尽了可供应用程序使用的 RAM,并且开始发生 OutOfMemory 错误。这在我们的应用程序中尤其明显,该应用程序每天读取和解析超过 400,000 个文件。

此外,由于 Java 8 中的默认垃圾收集器是并行垃圾收集器,因此应用程序永远不会将内存返还给服务器。更改这些设置使我们的微服务能够更有效地管理其内存,并通过返还不再需要的内存使它们在服务器上正常运行。

是我发现帮助我解决问题的文章。它更详细地描述了我上面所说的一切。