'Hitting Synchronization LockException when resizing concurrent dictionary

Would anyone know why I hit the SynchronizationLockException when attempting a resize operation?

Based on the documentation for this error, I understand this happens when the current thread doesn't own the lock but based on the TryResize function, I think this thread should be owning all the locks

To illustrate the bug, I deliberately kept the load factor equal to 0 so that after the very first add operation, the implementation will attempt a resize operation

Here is the test I ran:

public async Task ThreeThreadAdd()
{
MyConcurrentDictionary<int, int> dict = new MyConcurrentDictionary<int, int>();

var task1 = Task.Run(() => dict.TryAdd(1, 1));
var sameBucketAsTask1 = Task.Run(() => dict.TryAdd(11, 1));

var task2 = Task.Run(() => dict.TryAdd(2, 2));

await Task.WhenAll(task1, sameBucketAsTask1, task2);

Assert.AreEqual(3, dict.Count());
}

Here is the implementation:

namespace DictionaryImplementations
{
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading;

    public class MyConcurrentDictionary<TKey, TValue>
    {
internal class Entry<TKey, TValue>
    {
        internal TKey key;
        internal TValue value;
    }
        internal class Table
        {
            internal readonly object[] locks;

            internal readonly List<Entry<TKey, TValue>>[] buckets;

            internal Table(object[] locks, List<Entry<TKey, TValue>>[] buckets)
            {
                this.locks = locks;
                this.buckets = buckets;
            }
        }

        private double loadFactor;

        private int count;

        private volatile int bucketSize;

        private volatile Table table;

        public MyConcurrentDictionary()
        {   
            //// starting with size 2 to illustrate resize issue 
            int size = 2;
            this.bucketSize = size;

            object[] locks = new object[size];
            for (int i = 0; i < locks.Length; i++)
            {
                locks[i] = new object();
            }

            List<Entry<TKey, TValue>>[] buckets = new List<Entry<TKey, TValue>>[size];
            for (int i = 0; i < buckets.Length; i++)
            {
                buckets[i] = new List<Entry<TKey, TValue>>();
            }

            Table table = new Table(locks, buckets);

            this.table = table;

            this.loadFactor = 0;
        }

        private void TryAcquireLocks(int inclusiveStart, int exclusiveEnd)
        {
            for (int i = inclusiveStart; i < exclusiveEnd; i++)
            {
                while (!Monitor.TryEnter(this.table.locks[i], 100))
                {
                    continue;
                }
            }
        }

        private void ReleaseLocks(int inclusiveStart, int exclusiveEnd)
        {
            for (int i = inclusiveStart; i < exclusiveEnd; i++)
            {
                Monitor.Exit(this.table.locks[i]);
            }
        }

        /// <returns>true if the k/v pair was added, false if key already exists</returns>
        public bool TryAdd(TKey key, TValue value)
        {
            int hashCode = key.GetHashCode();

            // is the volatile read safe?
            int index = hashCode % this.bucketSize;

            // acquire the lock
            this.TryAcquireLocks(index, index + 1);

            try
            {
                foreach (var entry in this.table.buckets[index])
                {
                    if (entry.key.Equals(key))
                    {
                        return false;
                    }
                }

                Entry<TKey, TValue> newEntry = new Entry<TKey, TValue>()
                {
                    key = key,
                    value = value
                };

                this.table.buckets[index].Add(newEntry);

                Interlocked.Increment(ref this.count);
                return true;
            }
            finally
            {
                this.ReleaseLocks(index, index + 1);

                // attempt resize operation
                this.TryResize();
            }
        }

        public bool TryRemove(TKey key, out TValue oldValue)
        {
            oldValue = default(TValue);

            int hashCode = key.GetHashCode();

            // is this volatile read safe?
            int index = hashCode % this.bucketSize;

            // acquire the lock
            this.TryAcquireLocks(index, index + 1);

            try
            {
                bool found = false;
                int entryIndex = 0;
                foreach (var entry in this.table.buckets[index])
                {
                    if (!entry.key.Equals(key))
                    {
                        entryIndex++;
                    }
                    else
                    {
                        found = true;
                        break;
                    }
                }

                if (!found)
                {
                    return false;
                }

                this.table.buckets[index].RemoveAt(entryIndex);

                // `volatile` doesn't work in this hashmap model since we have locks for each bucket
                // since increment isn't an atomic operation, using `volatile` alone will not help
                Interlocked.Decrement(ref this.count);
                return true;
            }
            finally
            {
                this.ReleaseLocks(index, index + 1);
            }
        }

        public int Count()
        {
            // `Interlock` should flush all caches so that we observe latest value
            return this.count;
        }

        public bool ContainsKey(TKey key)
        {
            int hashCode = key.GetHashCode();

            int index = hashCode % this.bucketSize;

            // acquire the lock
            // in this case, we need to take a lock to guard against collection being modified
            this.TryAcquireLocks(index, index + 1);
            try
            {
                List<Entry<TKey, TValue>> bucket = this.table.buckets[index];
                return bucket.Any(item => item.key.Equals(key));
            }
            finally
            {
                this.ReleaseLocks(index, index + 1);
            }
        }

        private void TryResize()
        {
            double currentLoad = (this.count * (1.0)) / this.bucketSize;
            if (currentLoad < this.loadFactor)
            {
                return;
            }

            // locks are re-entrant for the same thread. So, we should not deadlock when acquiring same lock
            this.TryAcquireLocks(0, this.bucketSize);

            // store a reference to the locks array before the resize
            var prevLockReference = this.table.locks;
            try
            {
                int newBucketSize = this.bucketSize * 2;
                object[] newLocks = new object[newBucketSize];

                Array.Copy(this.table.locks, newLocks, this.table.locks.Length);
                for (int i = this.table.locks.Length; i < newBucketSize; i++)
                {
                    newLocks[i] = new object();
                }

                List<Entry<TKey, TValue>>[] newBuckets = new List<Entry<TKey, TValue>>[newBucketSize];
                for (int i = 0; i < newBuckets.Length; i++)
                {
                    newBuckets[i] = new List<Entry<TKey, TValue>>();
                }

                // re-compute distribution
                foreach (List<Entry<TKey, TValue>> bucket in this.table.buckets)
                {
                    foreach (Entry<TKey, TValue> entry in bucket)
                    {
                        int hashCode = entry.key.GetHashCode();
                        int newIndex = hashCode % newBucketSize;

                        newBuckets[newIndex].Add(new Entry<TKey, TValue>() { key = entry.key, value = entry.value });
                    }
                }

                Table newTable = new Table(newLocks, newBuckets);

                // perform new assignments
                // volatile reads will flush the cache
                this.bucketSize = newBucketSize;
                this.table = newTable;
            }
            finally
            {
                for (int i = 0; i < prevLockReference.Length; i++)
                {
                    Monitor.Exit(prevLockReference[i]);
                }
            }
        }
    }
}


Solution 1:[1]

That's a nice brain teaser. I will leave all the constructive criticism about your code for the end.

The bug hunt

The bug is in TryResize. A thread comes in trying to resize when the bucket size is, say, 2. It gets to the entry of your critical section:

// locks are re-entrant for the same thread. So, we should not deadlock when acquiring same lock
this.TryAcquireLocks(0, this.bucketSize);

It acquires both locks and goes on its merry way to resize the dictionary. The logic is all fine, you copy the locks to a new array, reallocate buckets, re-compute the distribution...

Then a Spoiler thread comes along and also TryResizes. It gets to the critical section, invokes TryAcquireLocks(0, 2), tries to acquire the first lock and hangs.

In the meantime the first thread finishes recalculation, assigns this.bucketSize = 4, reassigns the internal table along with its locks and enters finally to release both locks.

Now the Spoiler thread wakes up, because it can now acquire lock number 0. It loops again, looks at the new table since it's correctly volatile and acquires lock number 1. But here's the kicker -- the Spoiler thread never witnessed the this.bucketSize reassignment. It is not aware that there are twice as many locks to acquire now, since it's executing TryAcquireLocks(0, 2). So it only acquires the 2 first locks in the table!

And that's it, not only is the critical section's precondition violated, when this Spoiler thread executes finally it will try to release all 4 locks, since the loop there explicitly goes up to the Length of the lock table. But it doesn't own the new 2 locks, only the original first 2, so you get a SynchronizationLockException.

An immediate fix is to introduce a new method that will always acquire all locks, even if their count increases between calls:

private void TryAcquireAllLocks()
{
    for (int i = 0; i < this.bucketSize; i++)
    {
        while (!Monitor.TryEnter(this.table.locks[i], 100))
        {
            continue;
        }
    }
}

And then replace the bugged line in TryResize with

this.TryAcquireAllLocks();

The "please don't put this on production" section, a.k.a. just use ConcurrentDictionary

One of the reasons why you should probably never implement such complicated structures by yourself is this bug you just got. It's very non-trivial to track it down, and it takes a lot of reading to understand all the code that you've written and convince someone it's correct.

Your code already contains a lot of bugs waiting to happen. You're prone to the same error as old versions of Monitor.Enter, where your thread can die while you're holding a lock and deadlock the application -- what happens, when a thread acquired part of the locks that it needs to perform some operation and then dies? It'll never release them, and no one will ever get to use the dictionary again! I also don't get why you're passing in a timeout to Monitor.Enter if you always try to reacquire the lock right after.

If you're writing this code as an exercise, great, do so, test it, and then post it to Code Review StackExchange to get some quality feedback. Actually sounds like a great exercise.

But please, for the sake of us all, don't use your own implementation in production. The BCL version is well audited by experts whose only job is to make sure their standard implementations work, there's no way your custom code is going to be more robust than theirs.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 V0ldek