重新部署时Google Cloud Pub子内存泄漏(基于Netty)

Fur*_*kul 6 java tomcat memory-leaks web-services google-cloud-pubsub

我的tomcat网络服务使用Android的实时开发者通知,这需要Google Cloud Pub Sub。它工作正常,所有通知都将立即收到。唯一的问题是,它使用了过多的RAM,这导致计算机的响应速度比预期的要慢,并且在取消部署应用程序后没有释放它。它使用HttpServlet(特别是Jersey提供contextInitializedcontextDestroyed设置和清除引用的方法),并且注释pub-sub代码实际上减少了很多内存使用。

这是用于订阅-取消订阅Android订阅通知的代码。

package com.example.webservice;

import com.example.webservice.Log;
import com.google.api.core.ApiService;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.common.collect.Lists;
import com.google.pubsub.v1.ProjectSubscriptionName;

import java.io.FileInputStream;

public class SubscriptionTest
{
    // for hiding purposes
    private static final String projectId1 = "api-000000000000000-000000";
    private static final String subscriptionId1 = "realtime_notifications_subscription";
    private static final String TAG = "SubscriptionTest";

    private ApiService subscriberService;
    private MessageReceiver receiver;

    // Called when "contextInitialized" is called.
    public void initializeSubscription()
    {
        Log.w(TAG, "Initializing subscriptions...");
        try
        {
            GoogleCredentials credentials1 = GoogleCredentials.fromStream(new FileInputStream("googlekeys/apikey.json"))
                    .createScoped(Lists.newArrayList("https://www.googleapis.com/auth/cloud-platform"));
            ProjectSubscriptionName subscriptionName1 = ProjectSubscriptionName.of(projectId1, subscriptionId1);

            // Instantiate an asynchronous message receiver
            receiver =
                    (message, consumer) ->
                    {
                        consumer.ack();

                        // do processing
                    };

            // Create a subscriber for "my-subscription-id" bound to the message receiver
            Subscriber subscriber1 = Subscriber.newBuilder(subscriptionName1, receiver)
                    .setCredentialsProvider(FixedCredentialsProvider.create(credentials1))
                    .build();

            subscriberService = subscriber1.startAsync();
        }
        catch (Throwable e)
        {
            Log.e(TAG, "Exception while initializing async message receiver.", e);
            return;
        }
        Log.w(TAG, "Subscription initialized. Messages should come now.");
    }

    // Called when "contextDestroyed" is called.
    public void removeSubscription()
    {
        if (subscriberService != null)
        {
            subscriberService.stopAsync();
            Log.i(TAG, "Awaiting subscriber termination...");
            subscriberService.awaitTerminated();
            Log.i(TAG, "Subscriber termination done.");
        }

        subscriberService = null;
        receiver = null;
    }
}
Run Code Online (Sandbox Code Playgroud)

这是取消部署应用程序后的语句。(名称可能不匹配,但这并不重要)

org.apache.catalina.loader.WebappClassLoaderBase.checkThreadLocalMapForLeaks The web application 
[example] created a ThreadLocal with key of type [java.lang.ThreadLocal] 
(value [java.lang.ThreadLocal@2cb2fc20]) and a value of type 
[io.grpc.netty.shaded.io.netty.util.internal.InternalThreadLocalMap] 
(value [io.grpc.netty.shaded.io.netty.util.internal.InternalThreadLocalMap@4f4c4b1a]) 
but failed to remove it when the web application was stopped. 
Threads are going to be renewed over time to try and avoid a probable memory leak.
Run Code Online (Sandbox Code Playgroud)

从我观察到的结果来看,Netty正在创建一个静态ThreadLocal,并强烈引用了该值InternalThreadLocalMap,这似乎导致了此消息的出现。我试图通过使用类似这样的代码来删除它(可能是过大了,但是到目前为止,没有答案对我有用,而且似乎也不起作用)

    InternalThreadLocalMap.destroy();
    FastThreadLocal.destroy();
    for (Thread thread : Thread.getAllStackTraces().keySet())
    {
        if (thread instanceof FastThreadLocalThread)
        {
            // Handle the memory leak that netty causes.
            InternalThreadLocalMap map = ((FastThreadLocalThread) thread).threadLocalMap();
            if (map == null)
                continue;

            for (int i = 0; i < map.size(); i++)
                map.setIndexedVariable(i, null);
            ((FastThreadLocalThread) thread).setThreadLocalMap(null);
        }
    }
Run Code Online (Sandbox Code Playgroud)

在取消部署(或停止启动)之后,如果我单击Find leaks(显然),tomcat将检测到内存泄漏。问题是,由于显然未正确关闭订阅,因此未释放已使用的RAM和CPU。重新部署该应用程序会使使用的RAM在每个操作上进一步增加,例如,如果它首先使用200 MB内存,则在第二次部署后,它会增加到400、600、800,直到计算机的速度降低到足以死掉为止,它都会无限增加。

这是一个严重的问题,我不知道如何解决,stop方法是按定义调用的,awaitTerminated也称为立即执行的(意味着接口实际上已停止侦听),但是它不会释放其背后的RAM。

到目前为止,我只看到了有关python客户端的问题(ref 1ref 2),但似乎没有人提到Java客户端,而且我对使用这种结构有点希望。

我开了一个问题,这个问题也是如此。

我应该怎么做才能解决这个问题?任何帮助表示赞赏,非常感谢。

小智 0

我不知道它是否能完全解决您的问题,但您似乎因不关闭 FileInputStream 而泄漏了一些内存。

第一个选项是将 FileInputStream 提取到一个变量中,并在读取完内容后对其调用 close() 方法。

使用此类流的第二个(也是更好的)选项是使用 try-with-resources。由于FileInputStream实现了AutoCloseable接口,因此在退出try-with-resources时会自动关闭。

例子:

try (FileInputStream stream = new FileInputStream("googlekeys/apikey.json")) {
    GoogleCredentials credentials1 = GoogleCredentials.fromStream(stream)
            .createScoped(Lists.newArrayList("https://www.googleapis.com/auth/cloud-platform"));
    // ...
} catch (Exception e) {
    Log.e(TAG, "Exception while initializing async message receiver.", e);
    return;
}
Run Code Online (Sandbox Code Playgroud)