Java: streams with zipping

With the appearence of Java 8 there are more and more APIs that return Stream object instead of simple Collections.

For example, with JPA you can just say that as a return type you want to have a Stream<Something>…. and you will have it 😉

Repository returning Stream of some objects… how this can be helpful? You may ask…

Well, in some cases it might be helpful, actually. Especially, when there is a constraint on the memory, and we need to process things on the “fly”.

Below is a sample code that uses 2 repositories that return streams, combines these 2 streams into 1 stream of pairs… does some computation and sends the results in the batches.


import io.vavr.Function1;
import io.vavr.Function2;
import io.vavr.Tuple;
import io.vavr.Tuple2;

import org.slf4j.MDC;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public class SomeService {

    public void doIt(final String currentProcessId) {
        MDC.put(MDC_PROCESS_CORRELATION_ID, currentProcessId);

        log.debug("Fetch current domain objects");
        Stream previousStream = null;
        Stream currentStream =

        log.debug("Combine the current and previous values into currentPreviousPictures");
        Stream currentPreviousPictures;
        if (someCondition1) {
            log.debug("Fetch previous domain objects");
            previousStream = repository.findByDomainObjectBySomePropertyOrderByIdAsc(someProperty, previous);

            currentPreviousPictures = zip(currentStream, previousStream);

            if (someCondition2) {
        } else {
            currentPreviousPictures = zip(currentStream, Stream.empty());

        currentPreviousPictures = currentPreviousPictures

        log.debug("Process previous and current domain objects in batch");
        Iterators.partition(currentPreviousPictures.iterator(), processBatchSize)
            .forEachRemaining(currentPreviousList -> {


      "Approximate progress: {}%", 100 * allProcessedDomainObjects.get() / maxDomainObjects);