使用retrofit2和rx java2发送高容量POSTS时出现OutOfMemoryException

and*_*ast 3 mobile post android

我有一个应用程序与本地数据库(房间)和服务,POSTs所有"事件"来自数据库使用retrofit 2rxjava.当我发送大量POSTs(即1500+)时,应用程序会抛出一个OutOfMemoryException.我认为这是因为每次客户端发送新POST时它都会启动一个新线程.有没有办法阻止retrofit/ rxJava创建这么多线程?或者等待服务器响应是否更好?这是我的代码:

从本地db检索所有事件的类

public class RetreiveDbContent {

private final EventDatabase eventDatabase;

public RetreiveDbContent(EventDatabase eventDatabase) {
    this.eventDatabase = eventDatabase;
}

@Override
public Maybe<List<Event>> eventsList() {

 return eventDatabase.eventDao().getAllEvents()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
}
}
Run Code Online (Sandbox Code Playgroud)

接下来,我有一个服务,它通过db事件列表迭代并发布所有这些服务.如果后端发回成功,则从本地数据库中删除该事件.

    private void sendDbContent() {

    mRetreiveDbContent.eventsList()
            .subscribe(new MaybeObserver<List<Event>>() {

        @Override
        public void onSubscribe(Disposable d) {
        }

        @Override
        public void onSuccess(final List<Event> events) {


            Timber.e("Size of list from db " + events.size());
            final CompositeDisposable disposable = new CompositeDisposable();

            Observable<Event> eventObservable = Observable.fromIterable(events);
            eventObservable.subscribe(new Observer<Event>() {
                @Override
                public void onSubscribe(Disposable d) {
                    disposable.add(d);
                }

                @Override
                public void onNext(Event event) {
                    Timber.d("sending event from db " + event.getAction());
                    mPresenter.postEvent(Event);
                }

                @Override
                public void onError(Throwable e) {
                    Timber.e("error while emitting db content " + e.getMessage());
                }

                @Override
                public void onComplete() {
                    Timber.d("Finished looping through db list");
                    disposable.dispose();
                }
            });

        }

        @Override
        public void onError(Throwable e) {
            Timber.e("Error occurred while attempting to get db content " + e.getMessage());
        }

        @Override
        public void onComplete() {
            Timber.d("Finished getting the db content");
        }
    });
}
Run Code Online (Sandbox Code Playgroud)

这是我postEvent()deleteEvent()演出者的方法

    public void postEvent(final Event event) {

    mSendtEvent.sendEvent(event)
          .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new DisposableObserver<Response<ResponseBody>>() {
                @Override
                public void onNext(Response<ResponseBody> responseBodyResponse) {

                    switch (responseBodyResponse.code()) {
                        case CREATED_RESPONSE:
                            Timber.d("Event posted successfully " + responseBodyResponse.code());
                            deleteEventFromRoom(event);
                            break;
                        case BAD_REQUEST:
                            Timber.e("Client sent a bad request! We need to discard it!");
                            break;
                    }
                }

                @Override
                public void onError(Throwable e) {
                    Timber.e("Error " + e.getMessage());
                    mView.onErrorOccurred();
                }

                @Override
                public void onComplete() {

                }
            });
}


    public void deleteEventFromRoom(final Event event) {

    final CompositeDisposable disposable = new CompositeDisposable();
    mRemoveEvent.removeEvent(event)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer() {
                @Override
                public void onSubscribe(Disposable d) {
                    disposable.add(d);
                }

                @Override
                public void onNext(Object o) {
                    Timber.d("Successfully deleted event from database " + event.getAction());
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {
                    disposable.dispose();
                }
            });
}
Run Code Online (Sandbox Code Playgroud)

最后是mRemoveEvent交互者

public class RemoveEvent {

private final EventDatabase eventDatabase;

public RemoveEvent(EventDatabase eventDatabase) {
    this.eventDatabase = eventDatabase;
}

@Override
public Observable removeEvent(final Event event) {
    return Observable.fromCallable(new Callable<Object>() {
        @Override
        public Object call() throws Exception {
            return eventDatabase.eventDao().delete(event);
        }
    });
}
}
Run Code Online (Sandbox Code Playgroud)

注意:我是RXJava世界上的新手.先感谢您

M. *_*loo 5

您使用的Observable是不支持背压的.

Fom RxJava github页面:

背压

当数据流通过异步步骤时,每个步骤可以以不同的速度执行不同的操作.为了避免压倒这些步骤,这些步骤通常表现为由于临时缓冲或需要跳过/丢弃数据而增加的内存使用量,所以应用所谓的背压,这是流量控制的一种形式,其中步骤可以表达多少物品准备好了.这允许在通常无法知道上游将向其发送多少项的步骤的情况下约束数据流的存储器使用.

在RxJava,专用可流动类被指定来支持背压和可观察专用于非backpressured操作(短序列,GUI交互等).其他类型,Single,Maybe和Completable不支持背压,也不支持背压; 暂时存放一件物品的空间.

您应该使用Flowable,您正在将所有事件发送到下游以使用所有可用资源进行处理.

这是一个简单的例子:

Flowable.range(1, 1000)
        .buffer(10)//Optional you can process single event
        .flatMap(buf -> {
            System.out.println(String.format("100ms for sending events to server: %s ", buf));
            Thread.sleep(100);
            return Flowable.fromIterable(buf);
        }, 1)// <-- How many concurrent task should be executed
        .map(x -> x + 1)
        .doOnNext(i -> System.out.println(String.format("doOnNext: %d", i)))
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.single(), false, 1)//Overrides the 128 default buffer size
        .subscribe(new DefaultSubscriber<Integer>() {
    @Override
    public void onStart() {
        request(1);
    }

    @Override
    public void onNext(Integer t) {
        System.out.println(String.format("Received response from server for event : %d", t));
        System.out.println("Processing value would take some time");
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //You can request for more data here
        request(1);
    }

    @Override
    public void onError(Throwable t) {
        t.printStackTrace();
    }

    @Override
    public void onComplete() {
        System.out.println("ExampleUnitTest.onComplete");
    }
});
Run Code Online (Sandbox Code Playgroud)

最后一个提示:您不应该立即将整个事件提取到内存中,基本上您在内存中持有所有"数据库事件",考虑分页或类似事件Cursor,每次操作获取100行并在处理它们之后请求下一个100 ,我希望你用JobScheduler或WorkManager API做到这一点

  • 此外,如果表现很重要,一次发送一批事件(例如一页)可能是有益的 (2认同)