Rx缓冲区自第一个新组元素以来超时

bad*_*aly 4 system.reactive rx-java

对Rx世界来说很新,我需要实现以下行为:

我需要observable来收集值并在我至少有N个项目时将它们作为列表发出,或者如果从发出第一个项目开始经过了T个时间量.

我一次又一次地阅读文档,非常确定它会使用

buffer(timespan, unit, count[, scheduler])
Run Code Online (Sandbox Code Playgroud)

但问题是这里的时间跨度取决于最后一组项目.

如果可能的话,我还需要能够冲洗(强制发射)当前缓冲区,有些项目需要立即处理.我是否正确地假设在这种情况下我需要第二个observable,在每个项目之前执行处理并合并两者?

任何的想法?

Ps:我在Java工作,但我不需要Java代码,解释就足够了.

谢谢!

aka*_*okd 7

这个问题的缓冲方面可以通过多播欺骗来实现,但我发现为它编写运算符要容易得多,因此数据和上下文位于同一个可访问的位置:

public final class OperatorBufferFirst<T> implements Operator<List<T>, T> {
    final Scheduler scheduler;
    final long timeout;
    final TimeUnit unit;
    final int maxSize;
    public OperatorBufferFirst(
            long timeout, TimeUnit unit, 
            Scheduler scheduler, int maxSize) {
        this.timeout = timeout;
        this.unit = unit;
        this.scheduler = scheduler;
        this.maxSize = maxSize;
    }

    @Override
    public Subscriber<? super T> call(
            Subscriber<? super List<T>> t) {
        BufferSubscriber<T> parent = new BufferSubscriber<>(
                new SerializedSubscriber<>(t), 
                timeout, unit, 
                scheduler.createWorker(), maxSize);
        t.add(parent);
        return parent;
    }

    static final class BufferSubscriber<T> 
    extends Subscriber<T> {
        final Subscriber<? super List<T>> actual;
        final Scheduler.Worker w;
        final long timeout;
        final TimeUnit unit;
        final int maxSize;
        final SerialSubscription timer;

        List<T> buffer;
        long index;


        public BufferSubscriber(
                Subscriber<? super List<T>> actual, 
                long timeout, 
                TimeUnit unit, 
                Scheduler.Worker w, 
                int maxSize) {
            this.actual = actual;
            this.timeout = timeout;
            this.unit = unit;
            this.w = w;
            this.maxSize = maxSize;
            this.timer = new SerialSubscription();
            this.buffer = new ArrayList<>();
            this.add(timer);
            this.add(w);
        }

        @Override
        public void onNext(T t) {
            List<T> b;
            boolean startTimer = false;
            boolean emit = false;
            long idx;
            synchronized (this) {
                b = buffer;
                b.add(t);
                idx = index;
                int n = b.size();
                if (n == 1) {
                    startTimer = true;
                } else
                if (n < maxSize) {
                    return;
                } else {
                    buffer = new ArrayList<>();
                    index = ++idx;
                    emit = true;
                }
            }

            if (startTimer) {
                final long fidx = idx;
                timer.set(w.schedule(() -> timeout(fidx), timeout, unit));
            }
            if (emit) {
                timer.set(Subscriptions.unsubscribed());
                actual.onNext(b);
            }
        }

        @Override
        public void onError(Throwable e) {
            actual.onError(e);
        }

        @Override
        public void onCompleted() {
            timer.unsubscribe();
            List<T> b;
            synchronized (this) {
                b = buffer;
                buffer = null;
                index++;
            }
            if (!b.isEmpty()) {
                actual.onNext(b);
            }
            actual.onCompleted();
        }

        public void timeout(long idx) {
            List<T> b;
            synchronized (this) {
                b = buffer;
                if (idx != index) {
                    return;
                }
                buffer = new ArrayList<>();
                index = idx + 1;
            }

            actual.onNext(b);
        }
    }

    public static void main(String[] args) {
        TestScheduler s = Schedulers.test();

        PublishSubject<Integer> source = PublishSubject.create();

        source.lift(new OperatorBufferFirst<>(1, TimeUnit.SECONDS, s, 3))
        .subscribe(System.out::println, Throwable::printStackTrace, 
                () -> System.out.println("Done"));

        source.onNext(1);
        source.onNext(2);
        source.onNext(3);

        source.onNext(4);
        s.advanceTimeBy(1, TimeUnit.SECONDS);

        source.onNext(5);
        source.onNext(6);

        s.advanceTimeBy(1, TimeUnit.SECONDS);
        s.advanceTimeBy(1, TimeUnit.SECONDS);

        source.onNext(7);
        source.onCompleted();
    }
}
Run Code Online (Sandbox Code Playgroud)

它将值累积到一个列表中,并为第一个元素启动一个定时任务,或者如果它已满,则发出缓冲区.

至于刷新,这通常不能简单地完成,你必须与操作员建立一个协议,如果输入的T值是某种特殊类型,我们就说刷新.例如,你有一个T类型的FLUSH常量,每当操作符遇到这个时,它应该发出当前缓冲区:

synchronized (this) {
    b = buffer;
    idx = index;
    if (t != FLUSH) {
        b.add(t);
        int n = b.size();
        if (n == 1) {
            startTimer = true;
        } else
        if (n < maxSize) {
            return;
        } else {
            buffer = new ArrayList<>();
            index = ++idx;
            emit = true;
        }
    } else {
        buffer = new ArrayList<>();
        index = ++idx;
        emit = true;
    }
}
Run Code Online (Sandbox Code Playgroud)