创建像环形缓冲区这样的批处理的 Observable(需要建议)

vac*_*ach 2 java reactive-programming disruptor-pattern rx-java

在为这里描述的问题找不到合适的解决方案后,我决定实施这个。

然而,我缺乏使用 monad 的经验,而像 Lift(..) 这样的东西对我来说仍然有点神奇......

我打开它的目的是让那些在 rxjava 之上实现了一些自定义东西的人可以就如何实现这一点给我建议。

现在这是什么,这是界面。

在此处输入图片说明

我想这对你们大多数人来说都是不言自明的,但为了确保我会举一个例子。

想象一下我们有一个订阅者(消费者)实际上对数据库进行持久化,很明显,如果你给它 1 或 1000 个对象来持久化,差异不会是 1000 的因数,它将是 10 的因数或更少,这意味着它是一个消费者可以通过加载......所以一次推送一个项目是愚蠢的,而你可以一次坚持很多,另一方面,等待一批 N 个元素填满直到你坚持(一个第二个你可能会得到 1000 个元素,你可能没有得到,所以假设我们不知道传入数据的频率)...

所以我们现在拥有的是Observable.batch()它会要求一些 N 大小的批次,我们经常会等待而不工作......另一方面,我们拥有Disruptor它完全符合我们想要的但不提供Observable......的漂亮界面Disruptor将处理单个元素,当您处理它时,它将收集所有传入的元素,下次您将获得一批由于您的消费者忙于处理最后一个值而收集的所有内容......

目前我想我将Observable.from()用来实现这个或lift()......

请分享您对此的想法,也许我不知道已经有可用的解决方案,或者我即将以错误的方式实现它......

aka*_*okd 5

这是一个操作符,它将批处理堆积在异步边界后面的值:

public final class OperatorRequestBatcher<T> 
implements Operator<List<T>, T> {
    final Scheduler scheduler;
    public OperatorRequestBatcher(Scheduler scheduler) {
        this.scheduler = scheduler;
    }
    @Override
    public Subscriber<? super T> call(Subscriber<? super List<T>> t) {
        Scheduler.Worker w = scheduler.createWorker();
        RequestBatcherSubscriber<T> parent = 
                new RequestBatcherSubscriber<>(t, w);

        t.add(w);
        t.add(parent);

        return parent;
    }

    static final class RequestBatcherSubscriber<T> 
    extends Subscriber<T> implements Action0 {
        final Subscriber<? super List<T>> actual;
        final Scheduler.Worker w;
        final Queue<T> queue;
        final AtomicInteger wip;

        volatile boolean done;
        Throwable error;

        public RequestBatcherSubscriber(
                Subscriber<? super List<T>> actual, 
                Scheduler.Worker w) {
            this.actual = actual;
            this.w = w;
            this.wip = new AtomicInteger();
            this.queue = new SpscLinkedArrayQueue<>(256);
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            queue.offer(t);
            schedule();
        }

        @Override
        public void onError(Throwable e) {
            if (done) {
                return;
            }
            error = e;
            done = true;
            schedule();
        }

        @Override
        public void onCompleted() {
            done = true;
            schedule();
        }

        void schedule() {
            if (wip.getAndIncrement() == 0) {
                w.schedule(this);
            }
        }

        @Override
        public void call() {
            int missed = 1;

            final Queue<T> q = queue;
            final Subscriber<? super List<T>> a = actual;
            final AtomicInteger wip = this.wip;

            for (;;) {

                List<T> list = new ArrayList<>();

                for (;;) {
                    boolean d = done;
                    T v = q.poll();
                    boolean e = v == null;

                    if (isUnsubscribed()) {
                        q.clear();
                        return;
                    }

                    if (d) {
                        Throwable err = error;
                        if (err != null) {
                            a.onError(err);
                            return;
                        } else
                        if (e) {
                            if (!list.isEmpty()) {
                                a.onNext(list);
                            }
                            a.onCompleted();
                            return;
                        }
                    }

                    if (e) {
                        break;
                    }

                    list.add(v);
                }

                if (!list.isEmpty()) {
                    a.onNext(list);
                }

                missed = wip.addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }
    }

    public static void main(String[] args) {
        PublishSubject<Integer> ps = PublishSubject.create();
        TestScheduler sch = Schedulers.test();

        ps.lift(new OperatorRequestBatcher<>(sch))
        .subscribe(System.out::println, Throwable::printStackTrace, 
                () -> System.out.println("Done"));

        ps.onNext(1);
        ps.onNext(2);

        sch.advanceTimeBy(1, TimeUnit.MILLISECONDS);

        ps.onNext(3);

        sch.advanceTimeBy(1, TimeUnit.MILLISECONDS);

        ps.onNext(4);
        ps.onNext(5);
        ps.onNext(6);
        ps.onCompleted();

        sch.advanceTimeBy(1, TimeUnit.MILLISECONDS);
    }
}
Run Code Online (Sandbox Code Playgroud)

但是请注意,您在 API 中描述的是一种热 Observable:冷源不会跨多个订阅者进行协调。为此,您需要创建一个自定义ConnectableObservable.

publish()可能适用,disruptForEachSubscriberpublish().observeOn()fordisruptForAllSubscriber不太可能,因为observeOn将请求一堆值publish并将其解释为成功处理 N 个批次。