Java: RX buffer while subscribe

    final class BufferWhileSubscriber extends Subscriber {

        final Subscriber<? super List> actual;

        List buffer = new ArrayList();

        BufferWhileSubscriber(Subscriber<? super List> actual) {
            this.actual = actual;
        }

        @Override
        public void onNext(T t) {
            buffer.add(t);
            if (buffer.size() > 1 && boundaryPredicate.call(buffer)) {
                actual.onNext(buffer.subList(0, buffer.size() - 1));

                T lastElement = buffer.get(buffer.size() - 1);
                buffer = Lists.newLinkedList();
                buffer.add(lastElement);
            }
        }

        @Override
        public void onError(Throwable e) {
            buffer = null;
            actual.onError(e);
        }

        @Override
        public void onCompleted() {
            List b = buffer;
            buffer = null;
            if (!b.isEmpty()) {
                actual.onNext(b);
            }
            actual.onCompleted();
        }
    }
Advertisements

Java: awaiting termination

 

        final ExecutorService executor = Executors.newCachedThreadPool();
        try {
            Future task = executor.submit(() -> {
                doSomething();
            });
            executor.shutdown();
            executor.awaitTermination(timeoutInMinutes, TimeUnit.MINUTES);

            SomeResult result = task.get();

            return Optional.of(result);
        } catch (InterruptedException | ExecutionException e) {
            if (!executor.isShutdown()) {
                executor.shutdownNow();
            }
            Thread.currentThread().interrupt();
        } finally {
            log.debug("Shutting down the service executor");
            if (!executor.isShutdown()) {
                executor.shutdownNow();
            }
        }

Java: streams with zipping

With the appearence of Java 8 there are more and more APIs that return Stream object instead of simple Collections.

For example, with JPA you can just say that as a return type you want to have a Stream<Something>…. and you will have it šŸ˜‰

Repository returning Stream of some objects… how this can be helpful? You may ask…

Well, in some cases it might be helpful, actually. Especially, when there is a constraint on the memory, and we need to process things on the “fly”.

Below is a sample code that uses 2 repositories that return streams, combines these 2 streams into 1 stream of pairs… does some computation and sends the results in the batches.

Example:

import com.google.common.collect.Iterators;
import com.google.common.collect.Maps;
import io.vavr.Function1;
import io.vavr.Function2;
import io.vavr.Tuple;
import io.vavr.Tuple2;

import org.slf4j.MDC;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class SomeService {

    public void doIt(final String currentProcessId) {
        MDC.put(MDC_PROCESS_CORRELATION_ID, currentProcessId);

        log.debug("Fetch current domain objects");
        Stream previousStream = null;
        Stream currentStream =
            repository.findByDomainObjectBySomePropertyOrderByIdAsc(someProperty);

        log.debug("Combine the current and previous values into currentPreviousPictures");
        Stream currentPreviousPictures;
        if (someCondition1) {
            log.debug("Fetch previous domain objects");
            previousStream = repository.findByDomainObjectBySomePropertyOrderByIdAsc(someProperty, previous);

            currentPreviousPictures = zip(currentStream, previousStream);

            if (someCondition2) {
                overrideZeroElementsWithValuesFromPreviousPicture(currentPreviousPictures);
            }
        } else {
            currentPreviousPictures = zip(currentStream, Stream.empty());
        }

        currentPreviousPictures = currentPreviousPictures
            .map(doSomething().andThen(doSomethingMore()))
            .map(saveInRepository());

        log.debug("Process previous and current domain objects in batch");
        Iterators.partition(currentPreviousPictures.iterator(), processBatchSize)
            .forEachRemaining(currentPreviousList -> {

                sendResultsInBatchAndWait(currentPreviousList);

                log.info("Approximate progress: {}%", 100 * allProcessedDomainObjects.get() / maxDomainObjects);
            });

        MDC.remove(MDC_PROCESS_CORRELATION_ID);
    }
}

 

Java: zipping sorted collections

Let’s say we need to combine two sorted lists of numbers into a list of pairs, where a pair contains numbers that are equal.

If there is a number missing in one of the two lists, than we create a pair with the number and a ZERO element.

Example:


list_1 = [1, 3, 4, 6, 10]

list_2 = [1, 2, 3, 4, 5, 7, 10]

zippedList = [ (1, 1), (0, 2), (3, 3), (4, 4), (0, 5), (6, 0), (0, 7), (10, 10) ]

Ā 

We can achive it with following implementation:

Ā 

import io.vavr.Tuple;
import io.vavr.Tuple2;

import java.util.Iterator;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class ZippedIterator implements Iterator<Tuple2> {

    private final Iterator iter1;
    private final Iterator iter2;
    private final T zeroElement;

    private T lastElement1 = null;
    private T lastElement2 = null;

    public static  Iterator<Tuple2>
    zipAllIterators(Iterator iter1, Iterator iter2, T zeroElement) {
        if (!iter1.hasNext() && !iter2.hasNext()) {
            return Stream.<Tuple2>empty().iterator();
        } else {
            return new ZippedIterator(iter1, iter2, zeroElement);
        }
    }

    public static  Stream<Tuple2>
    zipAll(Stream stream1, Stream stream2, T zeroElement, boolean parallel) {
        Iterator<Tuple2> tuple2Iterator =
            zipAllIterators(stream1.iterator(), stream2.iterator(), zeroElement);
        Iterable<Tuple2> iterable = () -> tuple2Iterator;
        return StreamSupport.stream(iterable.spliterator(), parallel);
    }

    public ZippedIterator(Iterator iter1, Iterator iter2, T zeroElement) {
        this.iter1 = iter1;
        this.iter2 = iter2;
        this.zeroElement = zeroElement;
    }

    @Override
    public boolean hasNext() {
        return hasNext1() || hasNext2();
    }

    @Override
    public Tuple2 next() {
        T e1 = next1();
        T e2 = next2();
        if (e1 != null && e2 != null) {
            int r = e1.compareTo(e2);
            if (r == 0) {
                lastElement1 = null;
                lastElement2 = null;
                return Tuple.of(e1, e2);
            } else if (r  0
                lastElement2 = null;
                return Tuple.of(zeroElement, e2);
            }
        }
        if (e1 == null) {
            lastElement2 = null;
            return Tuple.of(zeroElement, e2);
        }
        lastElement1 = null;
        return Tuple.of(e1, zeroElement);
    }

    private boolean hasNext1() {
        return lastElement1 != null || iter1.hasNext();
    }

    private boolean hasNext2() {
        return lastElement2 != null || iter2.hasNext();
    }

    private T next1() {
        return lastElement1 != null ? lastElement1 : (!iter1.hasNext() ? null : (lastElement1 = iter1.next()));
    }

    private T next2() {
        return lastElement2 != null ? lastElement2 : (!iter2.hasNext() ? null : (lastElement2 = iter2.next()));
    }

} 

Ā 

We can check this implementation with a few tests:

Ā 

import static io.vavr.Tuple.of;
import static org.hamcrest.Matchers.contains;
import static org.junit.Assert.assertThat;

import io.vavr.Tuple2;
import org.junit.Test;

import java.util.stream.Collectors;
import java.util.stream.Stream;

public class ZippedIteratorTest {

    private Integer zero = 0;

    @Test
    public void testZipper() throws Exception {
        Stream s1 = Stream.of(4, 5, 10, 11);
        Stream s2 = Stream.of(1, 2, 4, 6, 8, 10, 15);

        Stream<Tuple2> zipped = zipper(s1, s2, zero);

        assertThat(zipped.collect(Collectors.toList()), contains(
            of(zero, 1),
            of(zero, 2),
            of(4, 4),
            of(5, zero),
            of(zero, 6),
            of(zero, 8),
            of(10, 10),
            of(11, zero),
            of(zero, 15)
        ));
    }

    @Test
    public void testZipperRigthEmpty() throws Exception {
        Stream s1 = Stream.of(4, 5, 10, 11);
        Stream s2 = Stream.empty();

        Stream<Tuple2> zipped = zipper(s1, s2, zero);

        assertThat(zipped.collect(Collectors.toList()), contains(
            of(4, zero),
            of(5, zero),
            of(10, zero),
            of(11, zero)
        ));
    }

    @Test
    public void testZipperLeftEmpty() throws Exception {
        Stream s1 = Stream.empty();
        Stream s2 = Stream.of(4, 5, 10, 11);

        Stream<Tuple2> zipped = zipper(s1, s2, zero);

        assertThat(zipped.collect(Collectors.toList()), contains(
            of(zero, 4),
            of(zero, 5),
            of(zero, 10),
            of(zero, 11)
        ));
    }

    private Stream<Tuple2> zipper(Stream s1, Stream s2,
                                                    Integer nullObject) {
        return zipAll(s1, s2, nullObject, false);
    }
}

Java: JAXB generation

Generating classes from XML schema:

<plugin>
  <groupId>org.codehaus.mojo</groupId>
  <artifactId>jaxb2-maven-plugin</artifactId>
  <version>${jaxb2.version}</version>
  <executions>
    <execution>
    <phase>none</phase>
    <id>xjc</id>
    <goals>
      <goal>xjc</goal>
    </goals>
    </execution>
  </executions>
  <configuration>
    <!-- The package of your generated sources -->
    <packageName>me.sample.package.with.xml.schema</packageName>
    <sources>
      <source>src/main/schema</source>
    </sources>
    <outputDirectory>src/main/java</outputDirectory>
    <clearOutputDir>false</clearOutputDir>
  </configuration>
</plugin>

Additionally, we need a folder with our schema in the project:

jaxb

Now, we can generate the classes from the schema types…

jaxb_maven_plugin

These classes will land in the request package declared in the plugin.

Java: fasterxml ObjectMapping

Quite often I tend to run into a problem of mapping data to Containers.

Example:

Map results = mapper.readValue(jsonSource,
   new TypeReference<Map>() { } );
// why extra work? Java Type Erasure will prevent type detection otherwise

 

More info on mapping JSON to conrete types can be foundĀ here

Java: functional programming trap

A code that makes no sense!!

Sadly, fixing the two paradigmes of OOP and FP in Java gives really akward results, that can be seen below:

@Transactional(readOnly = false)
    public Try createAccount(String userId) {
        return
            userService.getUser(userId)
                .flatMapTry(
                    accountUser -> Try.of(() -> {
                        Account account = new Account(UUID.randomUUID().toString(), 0);

                        return accountRepository.save(account);
                    })
                );
    }

This code runs, but having a @Transactional annotation with type Try at the same time leads to a bug. The code will never rollback in case of a failure, due to the fact that all failures are hidden in the Try type.

This time one of my favourite types Try just does not do the job…