Friday, November 11, 2016

Cassandra ZStandrad Compressor

Facebook has released it's new open sourced compression algorithm which is quite efficient and fast at the same time. To read more about it please look here or here.

It's actually so efficient that it seems to be a great candidate for Cassandra table compression.

So to save you all the required work, here I've implemented an Cassandra Zstandard compressor that is ready to use: https://github.com/MatejTymes/cassandra-zstd

Wednesday, June 15, 2016

Missing matched documents on searches and updates reproduction

This blog recently exposed an interesting concurrency caveat related to MongoDB where matching documents won't be found (or updated) if they are being reindexes.

The only part in the entry I was missing is a way how to reproduce this issue. So I decided to create a test which you can test against your version of MongoDB to check if it is still a problem.

Here it is:

package co.uk.matejtymes.mongodb;

import com.mongodb.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;

import static com.mongodb.BasicDBObjectBuilder.start;
import static java.util.Arrays.asList;
import static java.util.UUID.randomUUID;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.assertThat;

public class ReindexFailureTest {

    private static final String STATE_FIELD = "state";

    private static final Random RANDOM = new Random();

    private DBCollection coll;

    @Before
    public void setUp() throws Exception {
        // todo: provide connection details for your mongoDB instance
        MongoClient mongo = new MongoClient("localhost", 27017);
        DB db = mongo.getDB("testDb");

        coll = db.getCollection("indexTest");
    }

    @After
    public void tearDown() throws Exception {
        coll.drop();
    }

    @Test
    public void shouldFindAllMatchingItemsEvenWhenRecalculatingIndex()throws Exception {
        int docCount = 250;
        int concurrentUpdates = 40;
        int attemptsCount = 1_000;

        List<String> stateValues = asList("Active", "Inactive");

        coll.createIndex(new BasicDBObject(STATE_FIELD, 1));

        List<String> allIds = createNDocumentsWithState(docCount, stateValues);

        ExecutorService executor = newFixedThreadPool(concurrentUpdates);
        for (int attempt = 1; attempt <= attemptsCount; attempt++) {
            System.out.println(attempt + ". attempt");

            CountDownLatch beginLatch = new CountDownLatch(concurrentUpdates + 1);
            CountDownLatch endLatch = new CountDownLatch(concurrentUpdates + 1);

            for (int update = 0; update < concurrentUpdates; update++) {
                executor.submit(() -> updateState(pickRandomItem(allIds), stateValues, beginLatch, endLatch));
            }

            List<String> foundIds = findDocumentsInState(stateValues, beginLatch, endLatch);

            Set<String> uniqueIds = new HashSet<>();
            Set<String> duplicateIds = new HashSet<>();
            Set<String> missingIds = new HashSet<>(allIds);

            for (String foundId : foundIds) {
                if (uniqueIds.contains(foundId)) {
                    duplicateIds.add(foundId);
                }
                uniqueIds.add(foundId);
                missingIds.remove(foundId);
            }

            if (!missingIds.isEmpty()) {
                System.err.println(missingIds.size() + ". missingIds: " + missingIds);
            }
            if (!duplicateIds.isEmpty()) {
                System.err.println(duplicateIds.size() + ". duplicateIds: " + duplicateIds);
            }

            assertThat(foundIds, hasSize(allIds.size()));
            assertThat(missingIds, hasSize(0));
            assertThat(duplicateIds, hasSize(0));
        }

        executor.shutdown();
        executor.awaitTermination(3, SECONDS);
    }

    @Test
    public void shouldUpdateAllMatchingItemsEvenWhenRecalculatingIndex()throws Exception {
        int docCount = 250;
        int concurrentUpdates = 40;
        int attemptsCount = 1_000;

        List<String> stateValues = asList("Active", "Inactive");

        coll.createIndex(new BasicDBObject(STATE_FIELD, 1));

        List<String> allIds = createNDocumentsWithState(docCount, stateValues);

        ExecutorService executor = newFixedThreadPool(concurrentUpdates);
        for (int attempt = 1; attempt <= attemptsCount; attempt++) {
            System.out.println(attempt + ". attempt");

            String fieldToUpdate = "field" + attempt;
            Object valueToSet = true;

            CountDownLatch beginLatch = new CountDownLatch(concurrentUpdates + 1);
            CountDownLatch endLatch = new CountDownLatch(concurrentUpdates + 1);

            for (int update = 0; update < concurrentUpdates; update++) {
                executor.submit(() -> updateState(pickRandomItem(allIds), stateValues, beginLatch, endLatch));
            }

            BasicDBObject query = new BasicDBObject(STATE_FIELD, new BasicDBObject("$in", stateValues));
            BasicDBObject update = new BasicDBObject("$set", new BasicDBObject(fieldToUpdate, valueToSet));

            beginLatch.countDown();
            int n = coll.updateMulti(query, update).getN();
            endLatch.countDown();


            List<String> updatedIds = new ArrayList<>();
            coll.find(new BasicDBObject(fieldToUpdate, valueToSet)).forEach(
                    dbObject -> updatedIds.add((String) dbObject.get("_id"))
            );

            Set<String> missingIds = new HashSet<>(allIds);
            missingIds.removeAll(updatedIds);


            if (!missingIds.isEmpty()) {
                System.err.println(missingIds.size() + ". missingIds: " + missingIds);
            }
            if (n != allIds.size()) {
                System.err.println("n = " + n);
            }
            if (updatedIds.size() != allIds.size()) {
                System.err.println("updateIds = " + updatedIds.size());
            }

            assertThat(n, equalTo(allIds.size()));
            assertThat(updatedIds, hasSize(allIds.size()));
            assertThat(missingIds, hasSize(0));
        }

        executor.shutdown();
        executor.awaitTermination(3, SECONDS);
    }

    /* ====================== */
    /* --- helper methods --- */
    /* ====================== */

    private List<String> createNDocumentsWithState(int docCount, List<String> stateValues) {
        List<String> ids = new ArrayList<>();

        for (int i = 0; i < docCount; i++) {
            String id = randomUUID().toString();
            String state = stateValues.get(i % stateValues.size());

            DBObject dbObject = start()
                    .add("_id", id)
                    .add(STATE_FIELD, state)
                    .get();
            coll.insert(dbObject);

            ids.add(id);
        }
        return ids;
    }

    private void updateState(String id, List<String> stateValues, CountDownLatch beginLatch, CountDownLatch endLatch) {
        BasicDBObject query = new BasicDBObject("_id", id);

        String oldStateValue = (String) coll.find(query).next().get(STATE_FIELD);
        String newStateValue = stateValues.stream().filter(state -> !state.equals(oldStateValue)).findFirst().get();

        BasicDBObject update = new BasicDBObject("$set", new BasicDBObject(STATE_FIELD, newStateValue));

        beginLatch.countDown();
        coll.update(query, update);
        endLatch.countDown();
    }

    private List<String> findDocumentsInState(List<String> stateValues, CountDownLatch beginLatch, CountDownLatch endLatch) {
        BasicDBObject query = new BasicDBObject(STATE_FIELD, new BasicDBObject("$in", stateValues));
        Iterator<DBObject> dbObjects = coll.find(query).iterator();

        List<String> foundIds = new ArrayList<>();

        beginLatch.countDown();;
        while (dbObjects.hasNext()) {
            foundIds.add((String) dbObjects.next().get("_id"));
        }
        endLatch.countDown();

        return foundIds;
    }

    private static <T> T pickRandomItem(List<T> values) {
        return values.get(RANDOM.nextInt(values.size()));
    }
}

Friday, April 29, 2016

Executor that notifies you when task finish

Java Executors don't let you know when all tasks are finished or to be more precise, don't block you until the tasks are finished. You could call shutdown() on them and then awaitTermination(), but this way you can't reuse the executor anymore, which is not great. This is why I create a class Runner that can accomplish this. It's used like this:

Runner runner = Runner.runner(10);

runner.runIn(2, SECONDS, runnable);
runner.run(runnable);


runner.waitTillDone(); // blocks until all tasks are finished (or failed)


// and reuse it

runner.runIn(500, MILLISECONDS, callable);

runner.waitTillDone();

runner.shutdownAndAwaitTermination();

The code for it can be found here:

https://github.com/MatejTymes/JavaFixes

Hope this will help

Tuesday, April 19, 2016

End of BigDecimal BSting

BigDecimal is so close to being great until you face few things which make you just scream.
  • values are sometimes not equal (although you would like them to be):
    // yes: -1.2 is not equal to -1.20
    assertThat(new BigDecimal("-1.2").equals(new BigDecimal("-1.20")), is(false));

maybe this seems harmless but once your test will start to fail because the actual and expected domain object (using BigDecimal) are not equal although they are, you will just ask your self: why do we have to go trough this?

Also have you ever been paring with somebody on an interview, where the candidate told you that we should use BigDecimal for this interest rate calculation, but in the end you both decided not to do it - as the interview is not long enough - really BigDecimal adds aditional 10 to 30 minutes to inteview excercise as you have to deal with the equals method - and THIS is the STANDARD!!!
  • equals is bad, but hashCode is even worse
    // yes: hashCodes for -1.2 and -1.20 are not the same as well
    assertThat(new BigDecimal("-1.2").hashCode(), is(not(new BigDecimal("-1.20").hashCode())));

but why would that even matter? Well, if you ever decide to make BigDecimal key for a HashMap, than a situation might occur where you won't find any value for your number, as their hashCode won't match.
  • the ways how to create BigDecimal are not unified at all (depending on your originating value there are few different ways how you create it)
    new BigDecimal("-1.20"); // from string

    BigDecimal.valueOf(-120L, 2); // from long

    new BigDecimal(BigInteger.valueOf(-120L), 2); // from BigInteger

    // you should not create BigDecimal from float or double as you might get really weird value (because of transition from binary to decimal form)

I assumed that this is going to be fixed as this problems were present for many years, but it seems this is the design we'll have to live with.

This is why I decided to create a rewrite of BigDecimal called Decimal which you can find on this page (currently I'm finalizing the implementation):

https://github.com/MatejTymes/JavaFixes

It provides few advantages over BigDecimal
  • unified creation using two possible factory methods (one more readable decimal(...), one shorter d(...))
  • fixed equals
  • fixed hashCode:
    // equals now works
    assertThat(decimal("-1.2").equals(decimal("-1.200")), is(true));
    assertThat(d("-1.2").equals(d("-1.200")), is(true));

    // and surprisingly hashCode as well
    assertThat(d("-1.2").hashCode(), is(d("-1.20").hashCode()));
  • also the creation approach is always the same
    decimal("-1.20"); // from string

    decimal(-120L, 2); // from long

    decimal(BigInteger.valueOf(-120L), 2); // from BigInteger

    // you are not able to create Decimal from float or double but have to transform them into string first - otherwise you might get surprising values
  • you can use underscores in the numbers to make them more readable
    Decimal value = d("-125_550_00.00"); // using underscores as you can use in java numbers

The Decimal is an abstract class currently extended by two implementations: LongDecimal - if value can be backed by long and HugeDecimal - backed by BigInteger (for all other numbers). You can't address them directly, but the library handles the transition between these types seamlessly while you're calling arithmetic operations.

And that's it. Please let me know if you can think of any other improvements, or just what you feel about this. I would be happy to hear your thoughts.