Introduction
A few months ago, I had a project to develop an SMS notification gateway center capable of prioritizing incoming messages from a few sources and sending them to mobile operators. In the heart of this system, I decided to build my own component for queuing and prioritization. So, I started to search in the internet for some code to implement the required functionality - a prioritized queue.
Background
Because SMS can be used from highly capable and demanding systems like a cards authorization system, bank system etc., it has to be very fast, reliable, and simple. So, my approach was to KISS (keep it simple and stupid) as possible.
I mainly develop web applications, and there are no demands for multithreaded programming there. And so, this was my first significant attempt to create a real demanding multithreaded application.
The purpose of this queue will be to be the "mediator" between a few threads receiving incoming notifications and a few other threads responsible for sending queued messages to SMS operators. The big trouble was the limited number of SMSs per second which the mobile operators accept. So, I had to "throttle" SMSs in order to comply with the mobile operator requirements.
Sounds, pretty easy, but now another issue arises.. what will happen if a system sends 100,000 marketing SMS (spam) and I have a 20 SMS per second limit.. and a different system sends three SMSs that are urgent? That is where prioritization comes in the story.
A few days testing, writing, deleting, and questions.. and a class was born...
Using the code
As you can see, the the class includes and uses the standard Queue
available in .NET, and for prioritization, I decided to use SortedDictionary
.
private SortedDictionary<P, Queue<V>> list =
new SortedDictionary<P, Queue<V>>();
In simple worlds, I decided to use a sorted dictionary of standard queues. In the code below, list
is the "shared" object containing data, and I have to make sure my class is thread safe. Normally, Queue and Dequeue are thread "unsafe" operations. So, I implemented a standard semaphore practice with:
Monitor.Pulse(list);
Monitor.Wait(list);
and
lock (list) {}
I'm not going to describe how Monitor.Pulse
, Wait
.. or lock{}
work.. Please use "Google" if you don't know what those do..
Let's see how the magic works in the Dequeue()
method..
First of all, we will lock our object to ensure thread safety. Now, let's see how we are going to hang if there is no object within our SortedDictionary
container.
while (list.Count() == 0 && !_FreeCurrentDequeue)
{
Monitor.Wait(list);
}
If there are no items in the SortedDictionary
, the program will stop and release the container with Monitor.Wait(list)
. This will be until somebody executes Monitor.Pulse(list)
.
This somebody in our case will be the Enqueue()
method, immediately after inserting some objects in our queue.
q.Enqueue(value);
Monitor.Pulse(list);
Once our Monitor.Wait(list)
is released, we are going to extract from the queue. Of course, we should start extracting from the queue available in the dictionary in the first place (the higher priority). Every time you try to extract some object from the class, you should get a queue with the highest priority. For this purpose, we will find the first queue with the highest priority within the SortedDictionary
:
pair = list.First();
Immediately after finding the queue, we are going to extract the object in this queue..
V v = pair.Value.Dequeue();
Of course, in order to be able to process queues with lower priority, we will remove empty queues from the SortedDictionary
.
if (pair.Value.Count == 0)
list.Remove(pair.Key);
This will ensure on the next execution of the Dequeue
method that list.First()
will return the next low priority queue.. simply because we've just removed the highest priority one. And so on and so forth..
But there is a catch that if some thread hangs on the Dequeue
method, and if there are no items inside the queues, the thread will hang forever.. not very nice.. if you want, for example, to stop your application, you will have to terminate it a not very nice fashion...
So, I decided to add a global variable and a _FreeCurrentDequeue
function. If you want to release Dequeue
, you just have to call this method, and all the Dequeue
methods will be released.
public V Dequeue()
{
KeyValuePair<P, Queue<V>> pair;
lock (list)
{
_FreeCurrentDequeue = false;
while (list.Count() == 0 && !_FreeCurrentDequeue)
{
Monitor.Wait(list);
}
if (_FreeCurrentDequeue) return default(V);
pair = list.First();
V v = pair.Value.Dequeue();
if (pair.Value.Count == 0)
list.Remove(pair.Key);
return v;
}
}
and here is the releasing method...
public void PulseAndFreeDequeue() {
lock (list) {
_FreeCurrentDequeue = true;
Monitor.Pulse(list);
}
}
One thing you should care about is if you call PulseAndFreeDequeue()
, an empty object will be returned, and this may cause exceptions in the extracting module.. so ... just keep that in mind.
And, here is the Enqueue
method:
public void Enqueue(P priority, V value)
{
lock (list)
{
Queue<V> q;
if (!list.TryGetValue(priority, out q))
{
q = new Queue<V>();
list.Add(priority, q);
}
q.Enqueue(value);
Monitor.Pulse(list);
}
}
That's it .. I hope you like my first article.
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.