Thursday, November 7, 2013

Testing multithreaded code

Sometimes you needed to test that your code is thread-safe and can be run from multiple threads at the same time. To help with doing this I wrote an utility class that can make your test code simpler and more readable.

Here is the usage of it. I was testing that SimpleDateFormat is thread safe (actually it is not so this test will fail):
public class SimpleDateFormatTest {

    // this test will fail as SimpleDateFormat is not thread safe
    @Test
    public void shouldWorkConcurrently() throws ExecutionException, InterruptedException {
        // Given
        final DateFormat dateFormat = new SimpleDateFormat();

        final Date date1 = newRandomDate();
        final Date date2 = newRandomDate();
        final Date date3 = newRandomDate();

        ConcurrentExecutor executor = new ConcurrentExecutor();

        Future<String> result1 = executor.addAction(new Callable<String>() {
            public String call() {

                return dateFormat.format(date1);
            }
        });
        Future<String> result2 = executor.addAction(new Callable<String>() {
            public String call() {

                return dateFormat.format(date2);
            }
        });
        Future<String> result3 = executor.addAction(new Callable<String>() {
            public String call() {

                return dateFormat.format(date3);
            }
        });

        // When
        executor.executeAtTheSameTime(); // this will block until all actions are finished

        // Then
        assertThat(result1.get(), is(new SimpleDateFormat().format(date1)));
        assertThat(result2.get(), is(new SimpleDateFormat().format(date2)));
        assertThat(result3.get(), is(new SimpleDateFormat().format(date3)));
    }

    private Date newRandomDate() {
        return new Date(new Random().nextLong());
    }
}
In this test I created a ConcurrentExecutor to which I added 3 actions. Each of those actions calls the format method for one of 3 dates. Then I call the method executeAtTheSameTime which will make sure that all actions will start at the same time. Once the execution is finished I retrieve each result and verify that it is the same as proper single threaded conversion.

One nice thing about this utility is that it is extremely simple to retry the execution. Just rerun the executeAtTheSameTime and the Future results will hold a new value. This way you can put the When and Then section info a for loop and test the execution thread safety multiple times. This is sometimes needed as some multi-threaded issues are not always visible during the first run.

If you would like to add this utility into your project here is the actual implementation of ConcurrentExecutor (please note that it is in java 1.7 so if you're using some older version you might miss some generics definitions):
public class ConcurrentExecutor {

    private List<Callable<?>> actions = new ArrayList<>();
    private List<Object> results = new ArrayList<>();

    private volatile boolean finished = false;

    public <T> Future<T> addAction(Callable<T> callable) {
        actions.add(callable);
        int actionIndex = actions.size() - 1;
        return new FutureResult<>(actionIndex);
    }

    public void executeAtTheSameTime() {
        try {
            finished = false;
            results.clear();

            ExecutorService executor = Executors.newFixedThreadPool(actions.size());

            CyclicBarrier barrier = new CyclicBarrier(actions.size());
            CountDownLatch doneCountDown = new CountDownLatch(actions.size());

            List<Future<?>> futureResults = new ArrayList<>();
            for (Callable<?> action : actions)
            {
                futureResults.add(executor.submit(new ActionCallable<>(action, barrier, doneCountDown)));
            }
            doneCountDown.await();

            for (Future<?> futureResult : futureResults)
            {
                try
                {
                    results.add(futureResult.get(500, TimeUnit.MILLISECONDS));
                }
                catch (Exception e)
                {
                    throw new RuntimeException("not able to retrieve result from thread execution", e);
                }
            }

            executor.shutdownNow();

            finished = true;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    class ActionCallable<T> implements Callable<T> {

        private final Callable<T> action;
        private final CyclicBarrier startBarrier;
        private final CountDownLatch doneCountDown;

        ActionCallable(Callable<T> action, CyclicBarrier startBarrier, CountDownLatch doneCountDown) {
            this.action = action;
            this.startBarrier = startBarrier;
            this.doneCountDown = doneCountDown;
        }

        @Override
        public T call() throws Exception {
            T result;

            startBarrier.await();
            result = action.call();
            doneCountDown.countDown();

            return result;
        }
    }

    class FutureResult<T> implements Future<T> {

        private final int resultIndex;

        public FutureResult(int resultIndex) {
            this.resultIndex = resultIndex;
        }

        @Override
        public boolean isDone() {
            return finished;
        }

        @Override
        @SuppressWarnings("unchecked")
        public T get() throws InterruptedException, ExecutionException {
            if (!isDone()) {
                throw new IllegalStateException("execution has not yet finished");
            }
            return (T) results.get(resultIndex);
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            throw new RuntimeException("not implemented by intention");
        }

        @Override
        public boolean isCancelled() {
            throw new RuntimeException("not implemented by intention");
        }

        @Override
        public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            throw new RuntimeException("not implemented by intention");
        }
    }
}

Thursday, October 31, 2013

Testing QuickFix/J messages

Some short time ago we've been using QuickFix/J for sending and receiving messages. The library was ok, but to write tests for it was kind of painfull. For example our verification of created messages looked like this:
assertThat(message, instanceOf(AllocationInstruction.class));
assertThat(message.getHeader().getString(SenderSubID.FIELD), is("senderSubId-123"));
assertThat(message.getString(AllocID.FIELD), is("allocId-123"));
assertThat(message.getInt(AllocType.FIELD), is(AllocType.CALCULATED));
assertThat(message.getChar(AllocTransType.FIELD), is(AllocTransType.NEW));
assertThat(message.getDecimal(Shares.FIELD), is(new BigDecimal("1000000")));

assertThat(message.getInt(NoAllocs.FIELD), is(2));

AllocationInstruction.NoAllocs allocationGroup;
allocationGroup = (AllocationInstruction.NoAllocs) message.getGroup(1, new AllocationInstruction.NoAllocs());
assertThat(allocationGroup.getString(IndividualAllocID.FIELD), is("1"));
assertThat(allocationGroup.getString(AllocAccount.FIELD), is("acc-01"));
assertThat(allocationGroup.getDecimal(AllocShares.FIELD), is(new BigDecimal("250000")));

allocationGroup = (AllocationInstruction.NoAllocs) message.getGroup(2, new AllocationInstruction.NoAllocs());
assertThat(allocationGroup.getString(IndividualAllocID.FIELD), is("2"));
assertThat(allocationGroup.getString(AllocAccount.FIELD), is("acc-02"));
assertThat(allocationGroup.getDecimal(AllocShares.FIELD), is(new BigDecimal("750000")));
The code was not very readable and we also had to catch FieldNotFound exception each time we were doing this. To resolve this inconvenience I've created my own hamcrest matcher that can be used like this:
assertThat(message, isFIXMessage()
        .ofType(AllocationInstruction.class)
        .with(header().with(SenderSubID.FIELD, "senderSubId-123"))
        .with(AllocID.FIELD, "allocId-123")
        .with(AllocType.FIELD, AllocType.CALCULATED)
        .with(AllocTransType.FIELD, AllocTransType.NEW)
        .with(Shares.FIELD, "1000000")
        .with(NoAllocs.FIELD, 2)
        .with(group(1, NoAllocs.FIELD)
                .with(IndividualAllocID.FIELD, 1)
                .with(AllocAccount.FIELD, "acc-01")
                .with(AllocShares.FIELD, new BigDecimal("250000"))
        )
        .with(group(2, NoAllocs.FIELD)
                .with(IndividualAllocID.FIELD, 2)
                .with(AllocAccount.FIELD, "acc-02")
                .with(AllocShares.FIELD, new BigDecimal("750000"))
        )
);
The source code can be found/is stored under my project: QuickFixUtils.