Test your code

Testing concurrent code

Let’s get to some interesting stuff. How difficult is to test your concurrent code? The answer: very. Still there must be some ways of testing it, right? Well there are things that can help you. One of them is Thread Weaver.

How does it work?

Using byte-code instrumentation it will run your method that you want to test using two threads. The threads will be interleaved; the first thread will stop at the first line of your method, while the second thread will run it to completion, then the first thread will finish. This process is repeated with the second line of your method and so forth. Seems pretty straightforward. An example will make things clear.

public class NumberGenerator {

    private volatile int currentMinute = LocalDateTime.now().get(ChronoField.MINUTE_OF_DAY);
    private AtomicInteger counter = new AtomicInteger(0);

    public String nextNumber() {
        int min = LocalDateTime.now().get(ChronoField.MINUTE_OF_DAY);
        if (currentMinute == min) {
            return currentMinute + "" + counter.incrementAndGet();
        } else {
            counter.set(0);
            currentMinute = LocalDateTime.now().getMinute();
            return currentMinute + "" + counter.incrementAndGet();
        }
    }
}

We are trying to generate a number that should reflect the number of operations per minute each day. We are using a counter that resets itself each minute. If the minute is the same we are incrementing it. This should be thread safe as we don’t want the count to be duplicated for that day. Let’s run this though thread weaver.

public class NumberGeneratorTest {

    private String first;
    private String second;
    private NumberGenerator generator;

    @Test
    public void testNextNumber() throws InterruptedException {
        AnnotatedTestRunner runner = new AnnotatedTestRunner();
        HashSet methods = new HashSet();
        methods.add(NumberGenerator.class.getName() + ".nextNumber");
        runner.setMethodOption(MethodOption.LISTED_METHODS, methods);
        runner.setDebug(false);
        runner.runTests(this.getClass(), NumberGenerator.class);
    }

    @ThreadedBefore
    public void setup() {
        generator = new NumberGenerator();
    }

    @ThreadedMain
    public void main() {
        first = generator.nextNumber();
    }

    @ThreadedSecondary
    public void secondary() {
        second = generator.nextNumber();
    }

    @ThreadedAfter
    public void checkResults() {
        Assertions.assertThat(first).isNotEqualTo(second);
    }
}

Let’s walk through the code. In the test() we are initializing the thread weaver runner, setting up the class and the method we want to test. And then we run the tests. @ThreadedBefore and @ThreadedAfter are similar to @Before and @After from junit, in the sense that the methods annotated with these will run before the threads will start and after both threads are stopped. @ThreadedMain and @ThreadedSecondary are the actual threads that must run concurrently.

First run

Like described above [THREAD-1] will break at first method and [THREAD-2] will start at that point. So both threads are at the same line. The [THREAD-2] will complete and after that [THREAD-1] will resume and complete.

                      public String nextNumber() {
[THREAD-1] [THREAD-2]      int min = LocalDateTime.now().get(ChronoField.MINUTE_OF_DAY);
                           if (currentMinute == min) {
                              return currentMinute + "" + counter.incrementAndGet();
                           } else {
                              counter.set(0);
                              currentMinute = LocalDateTime.now().getMinute();
                              return currentMinute + "" + counter.incrementAndGet();
                          }
                       }

Second run

              public String nextNumber() {
   [THREAD-2]     int min = LocalDateTime.now().get(ChronoField.MINUTE_OF_DAY);
   [THREAD-1]     if (currentMinute == min) {
                     return currentMinute + "" + counter.incrementAndGet();
                  } else {
                    counter.set(0);
                    currentMinute = LocalDateTime.now().getMinute();
                    return currentMinute + "" + counter.incrementAndGet();
                  }
              }

The [THREAD-1] will break at the second line while the [THREAD-2] starts again at the beginning. [THREAD-2] thread will complete and then [THREAD-1] will resume and complete. You got the point. It will do this for all the lines of the method. It should reveal any concurrency issue.

And the results are :

tw1

 Success! Our code is thread-safe. Or not? Since this is time related the else branch will be activated only when the minute changes. So if you test starts and ends in the same minute it will never reach this branch. And this is the where a problem exists. If two threads enter the else branch when the minute changes the counter will be reset at twice and we will get a duplicate number.

           public String nextNumber() {
             int min = LocalDateTime.now().get(ChronoField.MINUTE_OF_DAY);
             if (currentMinute == min) {
                 return currentMinute + "" + counter.incrementAndGet();
             } else {
 [THREAD-1]      counter.set(0);
 [THREAD-2]      currentMinute = LocalDateTime.now().getMinute();
              return currentMinute + "" + counter.incrementAndGet();
               }
             }

 [THREAD-1] breaks before the counter has been reset. [THREAD-2] will reset the counter and increment it. [THREAD-1] will resume and reset the counter again. After that it will increment it.  The same number will be generated twice. So first we need to check the else branch as well. We cannot run the test at the moment the minute will change, this is almost impossible and unpractical. Let’s refactor the code in order to be able to test it.

public class NumberGenerator {

    private AtomicInteger counter = new AtomicInteger(0);

    private final TimeMachine timeMachine;

    public NumberGenerator(TimeMachine timeMachine) {
        this.timeMachine = timeMachine;
    }

    public String nextNumber() {
        int min = timeMachine.getCurrentMinute();
        if (timeMachine.getCurrentMinute() == min) {
            return min + "" + counter.incrementAndGet();
        } else {
            counter.set(0);
            return timeMachine.getCurrentMinute() + "" + counter.incrementAndGet();
        }
    }
}
public class TimeMachine {
  public int getCurrentMinute() {
     return LocalDateTime.now().get(ChronoField.MINUTE_OF_DAY);
  }
}

OK. We introduced a hook to be able to change the time. The TimeMachine class will do the trick. It can be mocked. We could not use any joda time and java.time classes since they are immutable. And moreover Thread Weaver does not work with PowerMock or Mockito(or at least I could not make them to work). The next step is to test our assumption. For this I will expand a little bit the test in order to control the position the first thread will stop.

Code Positions

They represent a position in the instrumented code. Basically you select the line you want to breakpoint the first thread, which also represents the point of entry of the second thread in the method. In our case it will be something like this.

               public String nextNumber() {
    [THREAD-2]       int min = timeMachine.getCurrentMinute();
                     if (timeMachine.getCurrentMinute() == min) {
                         return min + "" + counter.incrementAndGet();
                    } else {
    [THREAD-1]        counter.set(0);
                      return timeMachine.getCurrentMinute() + "" + counter.incrementAndGet();
                    }
              }

[THREAD-1] will run until the breakpoint is reached. At that moment the counter is 0. [THREAD-1] has not entered the counter.set() method! [THREAD-2] will start now, will reset the counter and increment it to 1. TimeMachine.getCurrentMinute() will return the same minute(let’s say 2) for both threads, that is the assumption we started from. When [THREAD-2] will complete it will return 21. Now [THREAD-1] will resume, it will reset the counter, the increment it and will return 21 as well. That’s not what we want!

               public String nextNumber() {
    [THREAD-2]       int min = timeMachine.getCurrentMinute();
                     if (timeMachine.getCurrentMinute() == min) {
                         return min + "" + counter.incrementAndGet();
                    } else {
    [THREAD-1]        counter.set(0);
                      return timeMachine.getCurrentMinute() + "" + counter.incrementAndGet();
                    }
              }

The test that describes this use case is below :

  public class NumberGeneratorTest {

    private String first;
    private String second;
    private NumberGenerator generator;

    ThreadedTestRunner runner = new ThreadedTestRunner();

    @Test
    public void testThreadedTests() {
        runner.runTests(getClass(), NumberGenerator.class);
    }

    @ThreadedTest
    public void testConcurrencyIssue() throws NoSuchMethodException,NoSuchFieldException {
        TimeMachine timeMachine = MockedTimeMachine.getTimeMachineMock();
        generator = new NumberGenerator(timeMachine);
        RunResult result = InterleavedRunner.interleave(new FirstThread(), new SecondThread(), 
                                                        Arrays.asList(getCodePosition()));
        result.throwExceptionsIfAny();
    }

    private class FirstThread extends MainRunnableImpl {

        @Override
        public Class getClassUnderTest() {
            return NumberGenerator.class;
        }

        @Override
        public NumberGenerator getMainObject() {
            return generator;
        }

        @Override
        public void terminate() throws Exception {
            Assertions.assertThat(first).isNotEqualTo(second);
        }

        @Override
        public void run() throws Exception {
            first = generator.nextNumber();
        }
    }

    private class SecondThread extends SecondaryRunnableImpl<NumberGenerator, FirstThread> {

        private NumberGenerator localNumberGenerator;

        @Override
        public void initialize(FirstThread main) throws Exception {
            localNumberGenerator = main.getMainObject();
        }

        @Override
        public void run() throws Exception {
            second = localNumberGenerator.nextNumber();
        }
    }

     private CodePosition getCodePosition() throws NoSuchMethodException, NoSuchFieldException {
        ClassInstrumentation instr = Instrumentation.getClassInstrumentation(NumberGenerator.class);
        Method nextNumber = NumberGenerator.class.getDeclaredMethod("nextNumber");
        Field counter = NumberGenerator.class.getDeclaredField("counter");
        Method resetCounter = counter.getType().getDeclaredMethod("set", int.class);
        return instr.beforeCall(nextNumber, resetCounter);
    }
}

Let’s break down the code.

 @ThreadedTest
    public void testConcurrencyIssue() throws NoSuchMethodException {
        TimeMachine timeMachine = MockedTimeMachine.getTimeMachineMock();
        generator = new NumberGenerator(timeMachine);
        RunResult result = InterleavedRunner.interleave(new FirstThread(), new SecondThread(), 
                                                        Arrays.asList(getCodePosition()));
        result.throwExceptionsIfAny();
    }

Pretty straightforward what’s going on here. We are interleaving the threads using the breakpoint we built. Also the mock is called here.

Then we create the threads. What’s important here to notice is that the assertions are happening on the terminate method of the first thread. (remember the running order)

    @Override
    public void terminate() throws Exception {
       Assertions.assertThat(first).isNotEqualTo(second);
    }

We get the code position where we want to break the first thread. Using reflection we get the methods we need and instrument the place we want to break. Our case is in the nextNumber() method before calling counter.set().

     private CodePosition getCodePosition() throws NoSuchMethodException, NoSuchFieldException {
        ClassInstrumentation instr = Instrumentation.getClassInstrumentation(NumberGenerator.class);
        Method nextNumber = NumberGenerator.class.getDeclaredMethod("nextNumber");
        Field counter = NumberGenerator.class.getDeclaredField("counter");
        Method resetCounter = counter.getType().getDeclaredMethod("set", int.class);
        return instr.beforeCall(nextNumber, resetCounter);
     }

The last part but extremely important, creating the mocks. Here we must know how the threads interleave in order to be able to mock properly.

public class MockedTimeMachine {

    public static TimeMachine getTimeMachineMock() {
        TimeMachine timeMachine = createMock(TimeMachine.class);
        //the first thread enters
        //      int min = timeMachine.getCurrentMinute();
        // and gets the minute = 1
        expect(timeMachine.getCurrentMinute()).andReturn(1);
        //the first thread checks if the minute has changed
        //      if (timeMachine.getCurrentMinute() == min) {
        //see it has changed = 2
        expect(timeMachine.getCurrentMinute()).andReturn(2);

        //the second thread enters
        //      int min = timeMachine.getCurrentMinute();
        // and gets the minute = 1
        expect(timeMachine.getCurrentMinute()).andReturn(1);
        //the second thread checks if the minute has changed
        //      if (timeMachine.getCurrentMinute() == min) {
        //see it has changed = 2
        expect(timeMachine.getCurrentMinute()).andReturn(2);

        // second thread enters timeMachine.getCurrentMinute() + "" + counter.incrementAndGet();
        expect(timeMachine.getCurrentMinute()).andReturn(2);
        // first thread enters timeMachine.getCurrentMinute() + "" + counter.incrementAndGet();
        expect(timeMachine.getCurrentMinute()).andReturn(2);

        replay(timeMachine);
        return timeMachine;
    }
}

Cool. Now let’s run this. Error as expected.

tw2

And the reason is :

tw4

A fix is needed. We need to make sure only one thread will reset the counter at any point in time and don’t want to block the others from doing their job.

public class NumberGenerator {

    private AtomicInteger counter = new AtomicInteger(0);
    private AtomicBoolean lock = new AtomicBoolean(false);

    private final TimeMachine timeMachine;

    public NumberGenerator(TimeMachine timeMachine) {
        this.timeMachine = timeMachine;
    }

    public String nextNumber() {
        int min = timeMachine.getCurrentMinute();
        if (lock.compareAndSet(false, true)) {
            try {
                if (timeMachine.getCurrentMinute() == min) {
                    return min + "" + counter.incrementAndGet();
                } else {
                    counter.set(0);
                    return timeMachine.getCurrentMinute() + "" + counter.incrementAndGet();
                }
            } finally {
                lock.set(false);
            }
        }
        return min + "" + counter.incrementAndGet();
    }

    public class TimeMachine {
        public int getCurrentMinute() {
            return LocalDateTime.now().get(ChronoField.MINUTE_OF_DAY);
        }
    }
}

An AtomicBoolean should do the trick. As we know compareAndSet() method does not blocks other threads, basically it’s a race condition. Whoever gets first at the end wins. This is possible due to compare and swap instruction available for the modern processors. The winning thread will reset the counter if needed or just keep increment it.

Let’s put this to the test. By running again the same test we get :

tw1

That’s what we want.

Breakpoints

The code positions help us in setting up points in code where we want to create breakpoints. If you need control on those breakpoints in the sense that you want to trigger some events after they have been reached. The thread will be suspended until the breakpoint is resumed. This is mirroring the same behavior as the previous one, but you have control over what can happen after the first thread is suspended.

public class NumberGeneratorTest2 {

    private String first;
    private String second;
    private NumberGenerator generator;

    ThreadedTestRunner runner = new ThreadedTestRunner();

    @Test
    public void testThreadedTests() {
        runner.runTests(this.getClass(), NumberGenerator.class);
    }

    @ThreadedTest
    public void testGenerator() throws Exception {
        TimeMachine timeMachine = MockedTimeMachine.getTimeMachineMock();
        generator = new NumberGenerator(timeMachine);
        Runnable task = () -> first = generator.nextNumber();
        Thread thread1 = new Thread(task);
        ObjectInstrumentation instrumented = Instrumentation.getObjectInstrumentation(generator);
        Breakpoint breakpoint1 = getBreakPoint(instrumented, thread1);
        thread1.start();
        breakpoint1.await();
        second = generator.nextNumber();
        breakpoint1.resume();
        thread1.join();
        Assertions.assertThat(first).isNotEqualTo(second);
    }

    private Breakpoint getBreakPoint(ObjectInstrumentation instrumentation, Thread thread)
                         throws NoSuchMethodException, NoSuchFieldException {
        ClassInstrumentation instr = Instrumentation.getClassInstrumentation(NumberGenerator.class);
        Method nextNumber = NumberGenerator.class.getDeclaredMethod("nextNumber");
        Field counter = NumberGenerator.class.getDeclaredField("counter");
        Method resetCounter = counter.getType().getDeclaredMethod("set", int.class);
        CodePosition codePosition = instr.beforeCall(nextNumber, resetCounter);
        return instrumentation.createBreakpoint(codePosition, thread);
    }
}

As specified in the beginning Thread Weaver is applicable for small components, and once the components grow it will start to show its limitations(false positives, timeout issues, test freeze). But hey that is what unit testing is all about, test small components is isolation. What I didn’t like about it is that does not work with Mockito or Powermock (I guess 2 instrumentation frameworks are too much); it just give you some strange errors when you create the mock. Something to look into I think.

SOURCE CODE

2 thoughts on “Testing concurrent code”

  1. What’s up,I check your blog named “Testing concurrent code – Learn. Write. Repeat.” like every week.Your story-telling style is witty, keep up the good work! And you can look our website about proxy.

    Like

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.