Java: RX buffer while subscribe

    final class BufferWhileSubscriber extends Subscriber {

        final Subscriber<? super List> actual;

        List buffer = new ArrayList();

        BufferWhileSubscriber(Subscriber<? super List> actual) {
            this.actual = actual;
        }

        @Override
        public void onNext(T t) {
            buffer.add(t);
            if (buffer.size() > 1 && boundaryPredicate.call(buffer)) {
                actual.onNext(buffer.subList(0, buffer.size() - 1));

                T lastElement = buffer.get(buffer.size() - 1);
                buffer = Lists.newLinkedList();
                buffer.add(lastElement);
            }
        }

        @Override
        public void onError(Throwable e) {
            buffer = null;
            actual.onError(e);
        }

        @Override
        public void onCompleted() {
            List b = buffer;
            buffer = null;
            if (!b.isEmpty()) {
                actual.onNext(b);
            }
            actual.onCompleted();
        }
    }
Advertisements