Click here to Skip to main content
15,885,875 members
Articles / Programming Languages / C#

Old Thread Tricks with the New Concurrent Library

Rate me:
Please Sign up or sign in to vote.
4.67/5 (2 votes)
10 Mar 2015CPOL8 min read 15.4K   235   18   2
Using the ConcurrentDictionary to track threaded job progress and status

Simple Thread Tracker

Introduction

I needed a threaded application to do the processing that multiple instances of the same application were currently doing on multiple machines. This uses the ConcurrentDictionary to keep track of status and state of the Threads/Jobs.

The obvious answer seemed to be using threads. At the end, I got more than about an 8 or 10 fold improvement on throughput. The application ended up CPU bound, which was what I wanted, machine resources used to the maximum. I thought the pattern was pretty cool though, so I figured I would make a template of it and put it here. The new .NET Concurrent Library made this simple to do, particularly the ConcurrentDictionary. (Let's thank Steven Toub right here.)

It is mostly as simple as I could make it, but I wanted to make it complete. It's not that simple... Yah, and I did add a few bells, but it's all good code. It is meant to be a template for something quite useful in production. I am sure there are other ways to do it... which makes me curious.

Background

The whole idea of this is that you have a class (c_Job) that does a process off in its own thread and a class (c_JobTracker) saved in a ConcurrentDictionary that mirrors that c_Job class and gets told the job status by the c_Job class. Actually, the cJob class could be in the ConcurrentDictionary, but there would be a chance of significant overhead as the ConcurrentDictionary made thread safe updates to the classes it contained. On the other hand, the c_JobTracker class is lightweight and only rarely needs to be updated by the cJob instance, so you are not going to lose much when a c_Job instance is talking back to the ConcurrentDictionary. In the production application, the c_Job instance only updates the c_Jobtracker in the ConcurrentDictionary upon completion (no error or error) and when it has enough information about the job it is doing to set the overtime value of the job to a more realistic value. That way, very little time is lost from the ConcurrentDictionary, which is pretty efficient anyway. Note that you can update the c_JobTracker instances from within the c_Job if needed. Just try to minimize it. Really, the cJob instances can update the c_JobTracker instances in the ConcurrentDictionary during processing as needed and the overhead still should not be bad.

Parts is Parts

  1. Windows application that this all works in... could be a Service.
  2. The c_Main class is what Windows calls that is where all the processing occurs. It could be in a Windows Service if you were in Production. It is instantiated on startup, but not started until user starts it.
    c_Main is started in is own thread so that the Windows Form/Service is still responsive. It can add jobs or stop processing in an orderly fashion. You can request it to stop, but it will wait until the already started Jobs are completed or times out.
    c_Main starts a while loop that, in order:
    1. Looks in the ConncurrentDictionary for completed jobs, with or without errors and reports them and removes them from the ConcurrentDictionary.
    2. Looks in the ConncurrentDictionary for jobs that have gone overtime and reports them. It never attempts to kill a thread, a problematic action.
    3. Looks for new jobs to do and queues them, if not enough jobs are running (or could be if not enough jobs in queue).
    4. Starts jobs, if not enough jobs are running and there are jobs in the queue.

      The job is just calculating a Fibonacci value to simulate a significant processing job that would do Db and File IO for a few seconds or more. Seed values come from the Windows application instead of jobs from a database as would be expected in a production application. A production version of this would presumably have many more steps, that would be reported back to show where an error occurred if one did.

  3. The c_Job class is what does the work. It is started in a thread and runs until it stops. When started, a c_JobTracker instance is also made and added to the c_Main.ConcurrentDictionary. The c_Job instance reports its state and status to its corresponding c_JobTracker instance in the c_Main.ConncurrentDictionary. It must report back at completion, error or not. It can report back its state at any time, do an update or even return a result, but those would mostly be an unexpected usage.
  4. The c_JobTracker class is for keeping track of JobId and the known state and status of the c_Job it is tracking. It has the start time of the c_Job it is tracking and so can be used to check if the c_Job is overtime.
  5. The c_Utility class contains the static Fibonacci calculation method to simulate work as well as a file logging method and a method to write to the Event Viewer. Note that you may have to run this once as Administrator (start Visual Studio in Administrator Mode) to make the Event Viewer code work.
  6. The c_Settings class is a singleton to hold settings for the c_Main class. Updating values in this class can Pause or Stop the c_Main instance and can adjust its behavior while running.

Using the Code

The attached Windows project is a bit robust, but it is carefully tested. The idea is that you "Start" the processing loop. Then you "Add" seed values to be calculated to Fibonacci values. A maximum number of threads to use can be set and is updated when the Add button is clicked. You can keep adding seed values. It ignores invalid entries.

On start up nothing is happening, but the c_Main class is instantiated. Select the Start button to start the c_Main class that contains the Polling while loop. It is looking for seed values now. You can change the number of threads/c_Jobs in the text box. Select the Add button to make the seed values available to the processing loop. c_Main checks for validity and displays the values it is going to start c_Jobs to process. Watch your Task Manager... Feed seeds to the program at any time. The Stop button tells it to quit Polling for new seeds/jobs. The Close button will check to see that all jobs are complete before closing.

How about some code. I'll trim it, but the whole intent is that the project code should be fairly readable. It's as short as I could make it, but it is meant to be more functional than short. This is just the main class that:

  1. Looks for completed Jobs
  2. Looks for overtime Jobs
  3. Looks for jobs to start
  4. Looks for new jobs ready to be started
C#
public class c_Main
{
    public static int _iInterlockedActiveJobCounter;
    public static ConcurrentDictionary<int, c_JobTracker> _concurrentDictJobs = 
					new ConcurrentDictionary<int, c_JobTracker>();
    
    private Form1 ParentForm;
    private List<c_Job> lstcJob;
	// First stop for a job after "Db"
    private Queue<c_Job> queueWaitingJobs = null; 
    private List<c_Job> lstActiveJobs = null;
    private c_Settings cSettings = null;
    public string strSeeds { get; set; }
    private int iJobId { get; set; }
    public int iJobCounter { get; set; }
    public int iSecondsToOverTime { get; set; }

    public c_Main ()
    {
        cSettings = c_Settings.Instance;
        cSettings.initializeSettings();
        lstcJob = new List<c_Job>();
        queueWaitingJobs = new Queue<c_Job>();
        lstActiveJobs = new List<c_Job>();
        strSeeds = String.Empty;
        iJobCounter = 0;
    }

    public void init(Form1 ParentForm, int iSecondsToOverTime)
    {
        this.ParentForm = ParentForm;
        this.iSecondsToOverTime = iSecondsToOverTime;
    }

    public void stopProcessing()
    {
        cSettings.bStop = true;
    }

    public void setMaximumJobs(int iMinJobsThatShouldBeActive)
    {
        cSettings.MinJobsThatShouldBeActive = iMinJobsThatShouldBeActive;
    }

    public void start()
    {
        ParentForm.SetTextMessage("\n\r\n\rStarting Main Class.");
        int iWhileCounter = 0;
        int iWretch = 1;
        while(cSettings.bRunProcessingLoop == true)
        {
            ParentForm.SetTextJobsWorking(c_Main._iInterlockedActiveJobCounter.ToString());
            iWhileCounter++;
            ParentForm.SetTextWhileCounter(iWhileCounter.ToString());
            
            // 1. Look for completed Jobs
            List<int> lstIDsToRemove = new List<int>(); // could be list of class
            List<int> lstIDsTocheckForOverTime = new List<int>(); // could be list of class
            StringBuilder sb1 = new StringBuilder();

            if (this.cSettings.bStop == true) // If told to Stop
            { 
                if(c_Main._concurrentDictJobs.Count == 0) and all started c_Jobs are complete
                {
                    cSettings.bRunProcessingLoop = false;
                    ParentForm.SetTextMessage("\n\r\n\rTh Th That's All Folks.");
                    continue;
                }
            }

            foreach (var pair in c_Main._concurrentDictJobs.ToArray())
            {
                if (pair.Value.State == "completed")
                {
                    long lSpanTicks = pair.Value.lTicksEnd - pair.Value.lTicksStart;
                    TimeSpan ts = new TimeSpan(lSpanTicks);
                    int iSeconds = (ts.Minutes * 60) + ts.Seconds;
                    string sMillaSecondsSinceStart = ts.Milliseconds.ToString();
                    if (sMillaSecondsSinceStart.Length > 3) // pretty it up
                        sMillaSecondsSinceStart = sMillaSecondsSinceStart.Substring(0, 3);
                    string sSecondsSinceStart = iSeconds.ToString() + "." + sMillaSecondsSinceStart;

                    //c_Utility.writeToFile(pair.Value.strJobTicksToString(), "..\\..\\TheLog.txt");

                    lstIDsToRemove.Add(pair.Value.JobId);
                    ParentForm.SetTextResult("\r\n" + pair.Value.JobId.ToString() + 
			". Fibonacci value of "
                        + pair.Value.iSeed.ToString() + " is :" + 
			pair.Value.lResult.ToString() + " in :"
                        + sSecondsSinceStart.ToString() + " seconds.");
                }
                else // if job is not completed, plan to check it for over time
                {
                    lstIDsTocheckForOverTime.Add(pair.Value.JobId);
                }
            }
            foreach (int iJobId in lstIDsToRemove)
            {
                c_JobTracker cJobTracker;
                c_Main._concurrentDictJobs.TryRemove(iJobId, out cJobTracker);
            }
            if (c_Main._concurrentDictJobs.Count == 0)
                iWretch++;

            // 2. Look for over time Jobs
            foreach (int iJobId in lstIDsTocheckForOverTime)
            {
                c_JobTracker cJobTracker = null;
                if (c_Main._concurrentDictJobs.TryGetValue(iJobId, out cJobTracker) == true)
                {
                    int iSecondsSinceStart = new TimeSpan(DateTime.Now.Ticks - 
                    	cJobTracker.lTicksStart).Seconds; //.. OK, it isn't used
                    double lSecondsSinceStart = new TimeSpan
			(DateTime.Now.Ticks - cJobTracker.lTicksStart).TotalSeconds;
                    iSecondsSinceStart = (int)lSecondsSinceStart;					
                    if (iSecondsSinceStart < cJobTracker.iSecondsToOverTime) // not late, do nothing
                        continue;
                }

                cJobTracker.iOverTimeCounter++;

                if (cJobTracker.iOverTimeCounter == 1) // log informational warning
                {
                    //c_Utility.writeEvent("Time Out Warning for JobId:" + 
                    //cJobTracker.JobId.ToString() + ".");
                    ParentForm.SetTextMessage("\n\r\n\rTime Out Warning for JobId:" + 
                    	cJobTracker.JobId.ToString() + ".");
                }
                else if (cJobTracker.iOverTimeCounter == 10) // might be time for email
                {
                    //c_Utility.writeEvent("Time Out Error for JobId:" + 
                    //cJobTracker.JobId.ToString() + ".", 5);
                    ParentForm.SetTextMessage("\n\r\n\rTime Out Error for JobId:" + 
                    	cJobTracker.JobId.ToString() + ".");
                }
            }

            if (this.cSettings.bStop == false)
            {
                // 3. Start Jobs if needed
                if (c_Main._concurrentDictJobs.Count < cSettings.MinJobsThatShouldBeActive)
                {
                    while ((queueWaitingJobs.Count > 0) && 
                           (c_Main._concurrentDictJobs.Count < cSettings.MinJobsThatShouldBeActive))
                    {
                        c_Job cJob = queueWaitingJobs.Dequeue();
                        cJob.Status = "unstarted";
                        cJob.lTicksStart = DateTime.Now.Ticks;
                       
                        // cJob has correct lTicksStart so cJobTracker will
                        c_JobTracker cJobTracker = new c_JobTracker(cJob);
                        if (c_Main._concurrentDictJobs.TryAdd(cJob.JobId, cJobTracker) == false)
                        {
                            c_Utility.writeEvent("Failed to process JobId:" + 
					cJob.ToString(), 5); // hosed
                            continue;
                        }
                        Interlocked.Increment
			(ref c_Main._iInterlockedActiveJobCounter); // Because it is cool

                        Task task = new Task(new Action(cJob.ProcessJob), 
					TaskCreationOptions.LongRunning);
                        //Task task = new Task(delegate 
			{ this.lstActiveJobs[ii].ProcessInfo(); }, TaskCreationOptions.LongRunning);
                        task.Start();
                        iJobCounter++; // Hey, someone might want to know
                        iWretch = 0;
                    }
                }

                // 4. Poll for and Enque new jobs (from Db)
                StringBuilder sb = new StringBuilder();
				// this simulates a database call looking for jobs
                if (strSeeds.Trim().Length > 0) 
                {
                    int iSeed = 0;
                    string[] strarr = strSeeds.Split(' ');
                    strSeeds = "";
                    foreach (string str in strarr)
                    {
                        if (Int32.TryParse(str, out iSeed) == true)
                        {
                            if(iSeed > 55)
                            {
                                ParentForm.SetTextMessage("\n\r\n\rA Seed like " + 
                                iSeed.ToString() + " would take too long, so I'm going to skip it.");
                                continue;
                            }

                            int iSecondsAddedForExtraLongJob = 0;
                            if (iSeed > 35) // anything can be adjusted during processing
                                iSecondsAddedForExtraLongJob = (iSeed - 30) * 2;

                            iJobId++;
                            // Do not start a job. You want to be able to limit the number of threads 
		            // running, so enqueue the job into a buffer rather than starting it. 
                            // This allows throttling.
                            queueWaitingJobs.Enqueue(new c_Job(iJobId, iSeed, 
                            	this.iSecondsToOverTime + iSecondsAddedForExtraLongJob));
                            sb.Append(iSeed.ToString() + ", ");
                        }
                    }
                }
                if (sb.ToString() != String.Empty)
                    ParentForm.SetTextMessage("\n\r\n\rNew Seed values received-" + sb.ToString());
            }
            Thread.Sleep(cSettings.iSleepBetweenPolls);
            if ((c_Main._concurrentDictJobs.Count == 0) && (iWretch == 1))
            {
                ParentForm.SetTextMessage("\n\r\n\rCome on. I'm bored. Add some more seed values.");
                iWretch++;
            }
        } // end while
        ParentForm.SetTextMessage("\n\r\n\rClose it already. It's over...");
    }
} // end class

Points of Interest

Now this application does very little that is real useful, but it should be a good framework for building a larger application that does do useful stuff in threads.

The polling while loop and everything else possible is put in the c_Main class which runs in its own processing thread. That allows the Form or Service to be responsive. Each c_Job is isolated in its own thread and it's only communication back to the main processing thread is through the c_JobTracker ConcurrentDictionary. Oh, that and the Interlocked variable, but that's just there for ... not sure, but it is there.

You can adjust the number of threads/c_Jobs to be started in the settings. You can even adjust it dynamically from outside the c_Main class, because it is just a property of that class. That also allows you to tell the main thread to Pause looking for new jobs or to Stop the polling loop.

Looking at the Task Manager, you can get a good look at when you are maxing out performance. If you are Disk or Network limited though, you will likely need to look at the Resource Monitor (in the Task Manager) or the Performance Monitor. You will be able to see when something maxes out. Just move up the number of jobs that can run at a time up until you see things slow down. (A good wild guess is that when doing real work with file and Db IO, an i7 is likely to handle maybe 30 jobs/threads at a time.)

Note that there was actually pretty much no use for the _iInterlockedActiveJobCounter, because the ConcurrentDictionary.Count value could have provided that value, but what the heck. The Concurrent Library is amazing and does some previously annoying stuff with ease.

Actually, using this Fibonacci method as the "work" is bad, because its performance curve is exponential, not linear like most processing jobs would be. Any set of normal business processing jobs are likely to be "difficult" or twice as difficult or 5 times as difficult, not exponentially more difficult like the Fibonacci calculation.

There are two ways shown for starting the processing thread as a Task. There are many other ways.

In Production, this would have another setting in c_Main.c_Setting to stop the outer while loop.

Do ignore the code behind the curtain that allows writing to the Form from the threads. It would be unlikely to be used in Production.

Here the ConcurrentDictionary is a static member, but it does not necessarily need to be, because there is only one instance of the class it is in.

The c_Settings class is set up as a singleton... Why not...

Note that you may have to run this once as Administrator (start Visual Studio in Administrator Mode) to make the Event Viewer code work.

Hey, this thing makes an interesting CPU performance test.

Comment: This application states that as a fairly experienced developer, I am a huge fan of application metadata (comments and self documenting code) to promote readability and maintainability. Code should read like a book. A good book.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer (Senior) Magellan Health Services
United States United States
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.

Comments and Discussions

 
GeneralNicely done Pin
HuntrCkr11-Mar-15 0:14
HuntrCkr11-Mar-15 0:14 
GeneralRe: Nicely done Pin
Michael Breeden11-Mar-15 0:38
Michael Breeden11-Mar-15 0:38 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.