Click here to Skip to main content
15,888,320 members
Articles / Programming Languages / C#
Tip/Trick

Queue Processor

Rate me:
Please Sign up or sign in to vote.
4.96/5 (14 votes)
25 May 2015CPOL3 min read 26.4K   728   38   8
Implementation of an event queue for delayed processing

Introduction

This tip describes the implementation of an generic queue processor that can be used in systems where we need multithreaded processing of events. This approach can be useful when we have some time-consuming processing and we do not want to stop the main processing thread.

Background

This approach uses C# generic type class. We implemented this manager as an abstract class that has to be overloaded with methods that do the real processing. We also used the class serialization and deserialization for objects queue storage.

Using the Code

The core class of this processor is QueueProcessor. We implement our new queue processor with our event class and constructor with two input parameters:

  • QueueFilename: filename for storing queue when process stops
  • Processing interval: interval of processing the queue
C#
public class MyQueue : QueueProcessor<Data>
    {
        public MyQueue(String queueFileName, 
               double processingIntervalms) : base(queueFileName, processingIntervalms)
        {
        }

If we do not define the queue filename, our queue processor will not be saving queue to filesystem and all not processed events will got lost when service stops.

QueueProcessor is an abstract class and we have to implement two methods:

  • ProcessEvent: Method that really does the processing (e.g., saving to database, calling webservice,...)
    C#
    protected override bool ProcessEvent(Data eventToProcess)
    {
        // do some fake long processing
        log.Debug("Start processing: " + eventToProcess.EventData);
        Thread.Sleep(1000);
        log.Debug("End processing: " + eventToProcess.EventData);
        // event successfully processed
        return true;
    }
    

    In this example, we do some sleeping just to show delayed processing. With the return result of this method, we notify the QueueProcessor to remove the event or not.

  • QueueLengthChanged: This method is called whenever the queue length changed - this method can be used for monitoring the queue length.
    C#
    protected override void QueueLengthChanged(int newQueueLen)
    {
       log.Debug("Queue length: " + newQueueLen.ToString());
    }
    

In the example, we just log the current queue length.

The main program loop in the example - firstly initialize the MyQueue processor:

C#
MyQueue myQueue = new MyQueue(QUEUE_FILENAME, QUEUE_PROCSSING_INTERVAL);

After that, we read lines from user input and create a new event to be processed:

C#
while (true)
{
      userInput = Console.ReadLine();
      if (userInput.ToLower().Equals(LOOP_END_CMD))
        break;
      else
        myQueue.AddEvent(new Data() { TimeStamp = DateTime.Now, EventData = userInput });
}

The important thing to do when the program stops is to dispose the queue processor. When we dispose it, all not processed events in the queue are serialized with another class DataSerializer.

C#
myQueue.Dispose();

DataSerializer class has methods for serializing object to file or string and list of object to file or string. The DataSerializer includes the type information into serialization so it is possible to serialize lists with different types of objects.

Events or data class-es must have public setter for all properties and a constructor that takes no argument.

See our example class:

C#
public class Data
{
       public DateTime TimeStamp { get; set; }

       public String EventData { get; set; }
}    

How Does It Work

The main data storage in the QueueProcesor is one list with our event objects.

C#
private List<T> eventList = new List<T>();

When we call the method AddEvent, the event is added to the list at the end and processing of the list is started in a new thread - so that the method returns fast.

C#
public void AddEvent(T newEvent)
{
      lock (this.locker)
      {
         this.eventList.Add(newEvent);
      }
    // start ratings list processing in thread
    Thread processThread = new Thread(processingPendingList);
    processThread.Start();
}

The list processing is done with a method called processingPendingList. We use Monitor.TryEnter with a mutex object to check if the processing is already running - if it is running, we do nothing.

C#
if (Monitor.TryEnter(processingLocker))
{
     ........
     ........
     ........
     Monitor.Exit(processingLocker); 
}

If we successfully entered the processing, we firstly take the first element from the events list - this is done locking the list locker.

C#
// first element on list
T firstEvent = null;
lock (this.locker)
{
    if (this.eventList.Count > 0)
          firstEvent = this.eventList[0];
}  

If there are no elements in the list, we end here. If we have some event to process, we call the abstract method ProcessEvent. If processing returns with true, we remove the first element from the list. If processing fails, we stop processing the event list because processing is done in list order.

C#
// processing of 
Boolean success = false;
success = ProcessEvent(firstEvent);
// processing succeeded
if (success)
{
     lock (this.locker)
     {
        // remove first element from list
        this.eventList.RemoveAt(0);
        // notify queuelen change
        QueueLengthChanged(this.eventList.Count);
      }
 }
 else
 {
      // processing not succeeded - end here
       break;
 }

Points of Interest

I found this approach for delayed processing very useful in different project, mainly server based. I used different implementations of queue processors for database storage, webservice call and other time consuming tasks that must not stop the server processing.

History

  • 5/25/2015 - Initial release

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer
Slovenia Slovenia
Senior C# .NET developer in gaming industry

Specialties
C#, XML, WebServices, WCF

Comments and Discussions

 
QuestionQueue issues I see in potential Pin
jbrentonprivate26-May-15 7:24
jbrentonprivate26-May-15 7:24 
QuestionLock, ConncurentQueue & C. Pin
Paolo Senigagliesi26-May-15 4:34
Paolo Senigagliesi26-May-15 4:34 
QuestionUsing ConcurrentQueue<T> instead of List<T> Pin
Max Vagner25-May-15 23:11
Max Vagner25-May-15 23:11 
AnswerRe: Using ConcurrentQueue<T> instead of List<T> Pin
Damijan Vodopivec26-May-15 0:29
Damijan Vodopivec26-May-15 0:29 
GeneralRe: Using ConcurrentQueue<T> instead of List<T> Pin
ArchAngel12326-May-15 10:31
ArchAngel12326-May-15 10:31 
TryPeek - read the documentation

Also, should note that "enough" is rarely sufficient when it comes to multi threading and parallel processing - hence the suggestions to use hardened and tested frameworks rather than rolling your own unless you have extensive experience in the field - good luck
GeneralMy vote of 5 Pin
Sachin Mahandule25-May-15 22:14
professionalSachin Mahandule25-May-15 22:14 
QuestionIs this like a producer/consumer app? Pin
Dewey25-May-15 16:10
Dewey25-May-15 16:10 
AnswerRe: Is this like a producer/consumer app? Pin
Damijan Vodopivec25-May-15 20:54
Damijan Vodopivec25-May-15 20:54 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.