Stuck in the loop

Write . Fight resistance . Ship!

Don't mess with the backing store

Parallel code is hard.

When we start messing with parallelism we better be prepared to deal with another level of complexity. It is not transparent to the reader how the code behaves when running in a multi-thread environment, what are the moving parts and what changes can cause inconsistencies. Most of the times this is hidden in the language and libraries we use. Besides, it is not easy to test for multi-threading access and most of the times it is simply overlooked.

Things can get worst when something goes wrong unexpectedly and we end up with angry clients waiting for a fix while having to debug code that we are not familiar with either because we didn’t write it or because we don’t clearly remember what we did a couple of weeks ago.

What is the code trying to do?, What is the code really doing?, Why is it behaving this way?

We are faced with all of this questions and none is trivial to answer, especially when you are dealing with big code bases. We need to understand the internals of the code, the implementation details, the used data structures, how does everything work behind the apparently familiar interfaces.

While looking at our metric dashboards we noticed something was wrong with one of our application’s in-memory store. We were seeing big discrepancies between the monitored size of the store and the number of entries that should actually be being inserted into it.

Our service receives data points (samples) from multiple client applications. These samples are temporarily saved in an in-memory store indexed by a Metadata object representing a metric. This object contains for instance the timestamp of when the samples were collected. Before being cleaned, these data points are used to extract some statistical information about the behavior of the client applications.

public void upsert(MetricMetadata metadata, List<Float> samples) throws TimestampAlreadyRetrievedException {
    Objects.requireNonNull(metadata);
    Objects.requireNonNull(samples);

    if (samples.isEmpty()) {
        throw new IllegalArgumentException("Samples must not be empty.");
    }

    try {
        Permit permit = accessBroker.access(metadata.getTimestamp());
        List<Float> curSamples =
                this.samples.putIfAbsent(metadata, Collections.synchronizedList(samples));
        if (curSamples != null) {
            curSamples.addAll(samples);
        }
        permit.release();
        size.accumulateAndGet(samples.size(), Integer::sum);
    } catch (DataAlreadyCommittedException e) {
        throw new TimestampAlreadyRetrievedException();
    }
}

This is the implementation of upsert method, an override of DatapointStore interface, whose job is to update a given metric with new samples. This implementation also used a size accumulator, that exposed the approximate size of the store, in order to reduce locks and lookup time on retrieval.

When we tested the code in a controlled environment we found that when we inserted samples on the store we were actually increasing the counter by more than what we were really inserting in the store. We also noticed that the error was not consistent, sometimes everything worked perfectly while others it was completely wrong.

We started analyzing the code and trying to understand what was going on here, but with this symptoms we suspected of a race condition.

What is a Permit?

I wanted to mention it, although this is not relevant to the real problem, because it can help understand what is the code doing. A Permit is just a lock on the data, a construct used by an AccessBroker to coordinate accesses and commits of sequential data on the store. Basically, the store has an AccessBroker that allows only a single thread to change data related to a specific sequential number at a specific time. We can look at it as a synchronized block over a sequential number which is a timestamp in our case.

What is Collections.synchronizedList(List<>) and what is it doing?

The synchronizedList() method of java.util.Collections class is used to return a synchronized (thread-safe) list backed by the specified list. In order to guarantee serial access.

SynchronizedList uses the array passed as parameter as its backing store. This means that any kind of access to this array’s reference again will not be thread-safe and we can not be sure we are not interfering with other running threads.

In our case, when one thread inserts a batch of samples, it can possibly be adding them to an array that is being simultaneously used by another thread to increase the store size.

[thread-1] Permit permit = accessBroker.access(metadata.getTimestamp());
[thread-1] List<Float> curSamples =
      this.samples.putIfAbsent(metadata, Collections.synchronizedList(samples));
[thread-1] permit.release();
[thread-2] Permit permit = accessBroker.access(metadata.getTimestamp());
[thread-2] ...
[thread-2] curSamples.addAll(samples);
[thread-1] size.accumulateAndGet(samples.size(), Integer::sum);

When thread-1 finally gets to update the counter with its samples' size it already as thread-2 samples inserted into the array by curSamples.addAll() which is using the same instance as a backing store.

When we use synchronizedList all accesses to the backing list must be done through the returned list. The fix for this issue was rather simple. We just created a new array instance when we have a first time insert for a specific key.

List<Float> curSamples = this.samples.putIfAbsent(
              metadata, Collections.synchronizedList(new ArrayList<>(samples)));