Click here to Skip to main content
15,891,473 members
Articles / Programming Languages / C#
Tip/Trick

Creating ConcurrentPartitionedDictionary

Rate me:
Please Sign up or sign in to vote.
5.00/5 (4 votes)
11 Nov 2014CPOL6 min read 16.4K   82   10   5
A suggested solution to create serializable Partition based ConcurrentDictionary (reducing chances of OOM errors, reduced memory consumption and adding capability to hold bigger KVP set in Dictionary).

Introduction

This article presents a generic solution, called ConcurrentPartitionedDictionary, which completes the previously posted article named ParititonedDictionary.

The presented solution creates a dictionary by adding KVPs to dedicated partitions. Such design achieves the following advantages:

  1. Less chances of OOM errors during dictionary resize operation.
  2. Capacity to hold even bigger KVP sets in memory which is otherwise not possible with C# ConcurrentDictionary.
  3. Less runtime memory consumption (partitions resize re-allocates lesser memory).
  4. Ability to define partition count at construction time (min value: 2, max value = int.MaxValue).
  5. In built Serialization/Deserialization capability (based on DataContractSerializer).
  6. Faster concurrent data population/reading/removing (based on some simulation on my laptop, however, users must perform such tests before going into production).

Background

I have listed most of the information in my previous article, please refer to Creating ParititonedDictionary. However, in this solution, I have changed the partitioning approach. The idea behind this new design is based on the following thoughts:

  1. Concurrent access to dictionary must not lock whole dictionary.
  2. If different keys accessed/inserted/removed by different threads, then creating memory-partitions will increase the speed as only the corresponding block of memory will be locked.
  3. Dictionary resize will be performed on individual partitions which are smaller than the whole dictionary itself, thus, achieving speed during such operations.
  4. Creation and use of this dictionary must be similar to .NET dictionary.
  5. User must not be bothered about how to design a good partitioned scheme based on the nature of possible key values.
  6. User must have the liberty to choose number of partitions a dictionary can have, to fine tune the performance based on his dataset.

This new design obviates the need of partition function, however, ambitious developer may like to have the ability to insert their own partitioning scheme and I would be happy to hear from them. And if they could prepare another such article with new design, I would appreciate their effort. Of course, what better than sharing new ideas.

Achieving Partitions

In the world of C#, object class is the mother of all. Exploiting this fact and the availability of GetHashCode() function makes things a bit easier. As GetHashCode() always returns int, creating number of required partitions is achieved by modulo operation. To make things simple, let's consider an example:

Let's say someone requires the dictionary to be partitioned into 10 parts and dictionary key type is TKey. Once I have an instance of TKey (say defined by variable name key), all I would do is: ((uint)key.GetHashCode()) % 10. Where % is modulo operator and uint casting is required due to negative int values.

Thus, the Ctor of ConcurrentPartititonedDictionary is O(n) operation (as Ctor creates n dictionaries), where n is number of partitions required. In the proposed solution, I have imposed a minimum limit of 2 on partitions, otherwise, better to have a C# ConcurrentDictionary than to have proposed dictionary with single partition.

Code Dissection

The Ctor of ConcurrentPartitionedDictionary is written as follows (please find comments in code block for each operation):

C#
public ConcurrentPartititonedDictionary(int totalParititonRequired = 2,
                                        int initialCapacity = 0,
                                        IEqualityComparer<TKey> keyEqualityComparer = null)
{
    //Min limit of 2 on partitions
    _totalPartition = Math.Max(2, totalParititonRequired);
    
    //Dictionary Array of size equal to desired partitions
    _internalDict = new Dictionary<TKey, TValue>[_totalPartition];
    
    //Min limit of 0 on capacity
    initialCapacity = Math.Max(0, initialCapacity);

    if (initialCapacity != 0)
    {
        //If capacity is non-zero, divide capacity equally
        int initCapa = (int)Math.Ceiling(((double)initialCapacity) / _totalPartition);
        Parallel.For(0, _totalPartition, i =>
        {
            _internalDict[i] = new Dictionary<TKey, TValue>(initCapa, keyEqualityComparer);
        });
    }
    else
    {
        //Otherwise, create dictionary of default capacity
        Parallel.For(0, _totalPartition, i =>
        {
            _internalDict[i] = new Dictionary<TKey, TValue>(keyEqualityComparer);
        });
    }
}

Following dictionary related operations are implemented in the solution:

C#
public bool TryAdd(TKey key, TValue value);

public bool TryRemove(TKey key, out TValue value);

public bool TryGetValue(TKey key, out TValue value);

public TValue GetOrAdd(TKey key, TValue value);

public bool TryUpdate(TKey key, TValue newValue, TValue comparisonValue);

public TValue this[TKey key] { get; set; }

public bool ContainsKey(TKey key);

public void Clear();

public long Count { get; }

public bool TryAdd(KeyValuePair<TKey, TValue> item);

public IEnumerable<KeyValuePair<TKey, TValue>> GetEnumerable();

In order to fulfill the need of serialization/deserialization, the solution provides instance methods named Serialize and DeserializeAndMerge. The implementation of these methods is DataContractSerializer based, however, one can easily modify these methods to adopt other possible means for such operation. In its current form, these methods are capable to serialize/deserialize data either as XML or as JSON. The implementation of Serialize is shown below:

C#
public void Serialize(FileInfo serialFile,
                      SerializationType serialType = SerializationType.Json)
{
    serialFile.Refresh();
    if (!serialFile.Exists)
    {
        serialFile.Directory.Create();
    }

    //This is important to stop Adding/Removing items during
    //Serialization process
    for(int i = 0; i< _totalPartition; i++)
    {
        Monitor.Enter(_internalDict[i]);
    }

    if (serialType == SerializationType.Xml)
    {
        using (var writer = new FileStream(serialFile.FullName, FileMode.Create, FileAccess.Write))
        {
            var ser = new DataContractSerializer(_internalDict.GetType());
            ser.WriteObject(writer, _internalDict);
        }
    }
    else
    {
        using (var writer = new FileStream(serialFile.FullName, FileMode.Create, FileAccess.Write))
        {
            var ser = new DataContractJsonSerializer(_internalDict.GetType());
            ser.WriteObject(writer, _internalDict);
        }
    }

    //Releasing all the locks
    for(int i = 0; i< _totalPartition; i++)
    {
        Monitor.Exit(_internalDict[i]);
    }
}

DeserializeAndMerge method after deserializing the data, merges all KVP pairs to the current instance. Merging of data is important because GetHashCode() return values may change (Object.GetHashCode Method) which will change the partition of KVP. Is such a situation, even if the KVP exists, TryGetValue will return false and indexer property will throw errors. One can also develop DeserializeAndMergeWithUpdate method if required, in order to update exiting key values. For such function, instead of calling TryAdd, one needs to call Indexer set property. Anyway, the current implementation of this method is as follows:

C#
public void DeserializeAndMerge(FileInfo serialFile, 
        SerializationType serialType = SerializationType.Json)
{
    serialFile.Refresh();
    if (!serialFile.Exists)
    {
        throw new FileNotFoundException("Given serialized file doesn't exist!");
    }

    Dictionary<TKey, TValue>[] deserialDict;
    if (serialType == SerializationType.Xml)
    {
        using (var reader = new FileStream(serialFile.FullName, FileMode.Open, FileAccess.Read))
        {
            var ser = new DataContractSerializer(_internalDict.GetType());
            deserialDict = (Dictionary<TKey, TValue>[])ser.ReadObject(reader);
        }
    }
    else
    {
        using (var reader = new FileStream(serialFile.FullName, FileMode.Open, FileAccess.Read))
        {
            var ser = new DataContractJsonSerializer(_internalDict.GetType());
            deserialDict = (Dictionary<TKey, TValue>[])ser.ReadObject(reader);
        }
    }

    Parallel.For(0, deserialDict.Length, index =>
    {
        Parallel.ForEach(deserialDict[index], kvp => TryAdd(kvp));
    });
}

One must note the following points:

  1. If during serialization, total partitions were N and while calling DeserializeAndMerge, total partitions are M, the data will be merged to this M partitioned dictionary without any error. Thus, one can also see these methods pairs as partition resizing operation (NOTE: Do not change the TKey and TValue types, of course it won't work).
  2. If TKey and/or TValue are custom classes/struct or any other complex DataType, then to avoid any error during serialization/deserialization, one must prudently make use of DataContract and DataMember attributes and if required, KnownTypeAttribute. Below are some useful links to understand how DataContractSerialization works.

Comparision with C# ConcurrentDictionary

The code below not only shows the use of ConcurrentPartitionedDictionary but also does some performance tests against C# ConcurrentDictionary (CD). To perform these tests:

  • First, I created a shuffled array of integer values between 0 and 10M.
  • Then, I created ConcurrentPartitionedDictionary<int, int> with 10 partitions and a normal ConcurrentDictionary<int, int>.
  • Using Parallel.ForEach loop, I added these 10M shuffled values as Keys and Values, lookup all keys using get Indexer and finally deleting all KVPs.
  • For each such operation, I used Stopwatch to measure time in milliseconds.
  • In addition, using GC.GetTotalMemory(true), I measured memory consumption before and after dictionary creation to find out the memory consumed by the dictionary instance.
  • All these statistics are then printed to the console blackboard.

The code snippet to perform above mentioned steps is as follows:

C#
using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using PartitionedDictionary;

namespace Test
{
    class Program
    {
        //Change this value to change total KVP pairs to test
        private const int totalRandomValues = 10000000;

        static void Main(string[] args)
        {
            //Total number of partitions desired
            var totalPartition = 10;

            var partitionedDictionary = new ConcurrentPartititonedDictionary<int, int>(totalPartition);
            var concurrentDictionary = new ConcurrentDictionary<int, int>();

            Console.WriteLine("Preparing Random Array of 10M values...please wait...");
            var randomGenerator = new Random();
            var listForShuffledArray = new List<KeyValuePair<double, int>>(totalRandomValues);
            foreach (var i in GetInt())
            {
                listForShuffledArray.Add
                   (new KeyValuePair<double, int>(randomGenerator.NextDouble(), i));
            }
            Console.WriteLine("Array is ready...suffling it...please wait...");

            var arr = listForShuffledArray.AsParallel().OrderBy
                          (x => x.Key).Select(x => x.Value).ToArray();
            listForShuffledArray.Clear();
            listForShuffledArray = null;

            GC.Collect();
            GC.WaitForPendingFinalizers();

            Console.WriteLine("");
            Console.WriteLine
                 ("#########################################################################");
            Console.WriteLine
                 ("########################## Concurrent Dictionary ########################");
            Console.WriteLine
                 ("#########################################################################");
            Console.WriteLine("");

            var sizeBeforeDictPopulation = GC.GetTotalMemory(true);

            var sw = Stopwatch.StartNew();
            Parallel.ForEach(arr, i =>
            {
                concurrentDictionary[i] = i;
            });
            Console.WriteLine("Total millisecs to populate 10M KVPs: " + sw.Elapsed.TotalMilliseconds);
            Console.WriteLine("Count after KVP adding: " + concurrentDictionary.Count);
            var dictSize = GC.GetTotalMemory(true) - sizeBeforeDictPopulation;
            sw = Stopwatch.StartNew();
            Parallel.ForEach(arr, i =>
            {
                int l;
                concurrentDictionary.TryGetValue(i, out l);
            });
            Console.WriteLine("Total millisecs to read 10M KVPs: " + sw.Elapsed.TotalMilliseconds);
            sw = Stopwatch.StartNew();
            Parallel.ForEach(arr, i =>
            {
                int l;
                concurrentDictionary.TryRemove(i, out l);
            });
            Console.WriteLine("Total millisecs to remove 10M KVPs: " + sw.Elapsed.TotalMilliseconds);
            Console.WriteLine("Total MB used: " + (dictSize >> 20));
            concurrentDictionary = null;

            GC.Collect();
            GC.WaitForPendingFinalizers();

            Console.WriteLine
                    ("#########################################################################");
            Console.WriteLine("");
            Console.WriteLine("");
            Console.WriteLine
                    ("#########################################################################");
            Console.WriteLine
                    ("########################## Partitoned Dictionary ########################");
            Console.WriteLine
                    ("#########################################################################");
            Console.WriteLine("");

            sizeBeforeDictPopulation = GC.GetTotalMemory(true);

            Console.WriteLine("Total Partitions: " + totalPartition);
            sw = Stopwatch.StartNew();
            Parallel.ForEach(arr, i =>
            {
                partitionedDictionary[i] = i;
            });
            Console.WriteLine("Total millisecs to add 10M KVPs: " + sw.Elapsed.TotalMilliseconds);
            Console.WriteLine("Count after KVP adding: " + partitionedDictionary.Count);
            dictSize = GC.GetTotalMemory(true) - sizeBeforeDictPopulation;

            sw = Stopwatch.StartNew();
            Parallel.ForEach(arr, i =>
            {
                int l;
                partitionedDictionary.TryGetValue(i, out l);
            });
            Console.WriteLine("Total millisecs to read 10M KVPs: " + sw.Elapsed.TotalMilliseconds);

            sw = Stopwatch.StartNew();
            Parallel.ForEach(arr, i =>
            {
                int l;
                partitionedDictionary.TryRemove(i, out l);
            });
            Console.WriteLine("Total millisecs to remove 10M KVPs: " + sw.Elapsed.TotalMilliseconds);
            Console.WriteLine("Total MB used: " + (dictSize >> 20));
            Console.WriteLine("Count after KVP removal: " + partitionedDictionary.Count);
            Console.WriteLine
                    ("#########################################################################");
            Console.ReadLine();
        }

        public static IEnumerable<int> GetInt(int i = 0, int j = totalRandomValues)
        {
            while (i < j)
            {
                yield return i++;
            }
        }
    }
}

Following is the config of my poor laptop running on Windows 7:

Image 1

Below is the snapshot of the program output. Following points can be noted:

  • Memory consumption was 266MBytes (PartitionDictionary) Vs 521MBytes (ConcurrentDictionary)
  • KVP population time was 3.587 Secs (PartitionDictionary) Vs 19.263 Secs (ConcurrentDictionary)
  • KVP lookup time was 1.204 Secs (PartitionDictionary) Vs 1.991 Secs (ConcurrentDictionary)
  • KVP removal time was 2.482 Secs (PartitionDictionary) Vs 3.618 Secs (ConcurrentDictionary)

However, one must not forget that these stats are based on the runtime randomness in the inserted KVPs. As we say, every good programmer MUST always be suspicious about any performance claim and must do her/his part of due diligence before accepting such claims (what about adding sorted data? Hmm!!!).

Image 2

Points of Interest

In this article, we learnt that dividing dictionary into smaller ones (I call those partitions, as I was inspired by ORACLE DB partitioning based on TIMESTAMP and was looking for something similar to use in C# but bit more generic. As a matter of fact, I was recently working on a project in which we need to hold several huge KVP sets in the memory during app runtime as the app was extremely performance critical and time to time we were recording OOM errors in our logs). And we see (hope users of this lib will also make an accord), that such design can be beneficial performance wise, also, it can help reducing memory consumption (specially when either memory is costly for you or your app is designed for x86 platforms and you have no choice than keeping runtime memory limited to mere 2-2.5 GBytes, LA aware, else 1-1.5 GBytes only).

I am all ears to all the folks out there who might show their interest in reading the plethora above. Please do let me know any suggestion/correction in the code above, I would sincerely appreciate your efforts.

History

  • This is V1 of the suggested solution.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Architect
France France
An open-minded, passionate, adaptive and resourceful software solution developer. He daydreams of code and spend nights coding his thoughts.

Comments and Discussions

 
SuggestionImprovement options Pin
LarsOutzen12-Nov-14 12:08
LarsOutzen12-Nov-14 12:08 
GeneralRe: Improvement options Pin
D Sarthi Maheshwari13-Nov-14 10:53
professionalD Sarthi Maheshwari13-Nov-14 10:53 
NewsDownload Links Updated Pin
D Sarthi Maheshwari12-Nov-14 9:46
professionalD Sarthi Maheshwari12-Nov-14 9:46 
QuestionMy vote of 5! Pin
Volynsky Alex12-Nov-14 4:08
professionalVolynsky Alex12-Nov-14 4:08 
QuestionMissing Pictures and Download File Pin
gclpixel11-Nov-14 22:14
gclpixel11-Nov-14 22:14 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.