Java: streams with zipping

With the appearence of Java 8 there are more and more APIs that return Stream object instead of simple Collections.

For example, with JPA you can just say that as a return type you want to have a Stream<Something>…. and you will have it 😉

Repository returning Stream of some objects… how this can be helpful? You may ask…

Well, in some cases it might be helpful, actually. Especially, when there is a constraint on the memory, and we need to process things on the “fly”.

Below is a sample code that uses 2 repositories that return streams, combines these 2 streams into 1 stream of pairs… does some computation and sends the results in the batches.

Example:

import com.google.common.collect.Iterators;
import com.google.common.collect.Maps;
import io.vavr.Function1;
import io.vavr.Function2;
import io.vavr.Tuple;
import io.vavr.Tuple2;

import org.slf4j.MDC;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class SomeService {

    public void doIt(final String currentProcessId) {
        MDC.put(MDC_PROCESS_CORRELATION_ID, currentProcessId);

        log.debug("Fetch current domain objects");
        Stream previousStream = null;
        Stream currentStream =
            repository.findByDomainObjectBySomePropertyOrderByIdAsc(someProperty);

        log.debug("Combine the current and previous values into currentPreviousPictures");
        Stream currentPreviousPictures;
        if (someCondition1) {
            log.debug("Fetch previous domain objects");
            previousStream = repository.findByDomainObjectBySomePropertyOrderByIdAsc(someProperty, previous);

            currentPreviousPictures = zip(currentStream, previousStream);

            if (someCondition2) {
                overrideZeroElementsWithValuesFromPreviousPicture(currentPreviousPictures);
            }
        } else {
            currentPreviousPictures = zip(currentStream, Stream.empty());
        }

        currentPreviousPictures = currentPreviousPictures
            .map(doSomething().andThen(doSomethingMore()))
            .map(saveInRepository());

        log.debug("Process previous and current domain objects in batch");
        Iterators.partition(currentPreviousPictures.iterator(), processBatchSize)
            .forEachRemaining(currentPreviousList -> {

                sendResultsInBatchAndWait(currentPreviousList);

                log.info("Approximate progress: {}%", 100 * allProcessedDomainObjects.get() / maxDomainObjects);
            });

        MDC.remove(MDC_PROCESS_CORRELATION_ID);
    }
}

 

Advertisements