Click here to Skip to main content
15,879,326 members
Articles / General Programming / Threads

Customizing the TaskScheduler: Queue Your Task Work Items to Run When You Want Them To

Rate me:
Please Sign up or sign in to vote.
4.20/5 (9 votes)
18 Jul 2020MIT10 min read 26.6K   542   8   35
Leveraging some less well known areas of the .NET Task framework to schedule tasks to execute on your own conditions.
This article demonstrates the customization of the task scheduling algorithm .NET uses to run Task items. Here, we use it to put a hard limit on the number of parallel jobs being performed, rather than leaving that to the default scheduler, which is very aggressive.

Task Scheduler

Introduction

The Task framework in .NET and the TAP programming pattern are very powerful tools to add concurrent execution to your application. Often easier to use than threads, they give you at least as much power, if you know how to use them. Some of the best features however, aren't very well documented. Once such feature is the TaskScheduler, which controls when your tasks get executed.

I got into this mess revisting this project but using the Task framework and the built in thread pool. To my horror, .NET allocates (on my system) 2047 potential threads for CPU bound** operations. I'm not sure they're actually created, or if there's just cutouts, but still, an ideally designed application will have exactly as many CPU bound operations going as there are cores (or usually, cores minus one since the primary one is usually concerned with managing everything else, and probably a UI). The bottom line is, if you have operations that tie up your CPU, you don't gain any performance by running more of them than you have cores to execute them. Given that, the .NET value I get back for available threads per process seems obscene. In fact, the more threads you have, the harder the scheduler has to work, meaning you're losing performance with more threads than you can even run concurrently on your CPU. The moral of this is create as few threads as you need, but no fewer.

I'd rather prefer to queue operations once I won't get additional performance out of running more of them. The built in ThreadPool doesn't really care about that, and setting SetMaxThreads() to try to change that impacts the whole process, among other problems. Using the technique I outlined in the previous article works, but it's a bit complicated, and even if it was abstracted, using the Task framework is the way to go these days in .NET. Therefore, I needed something to allow me to use Tasks, but in such a way that I could control how many ran at once. Eventually, I stumbled on TaskScheduler, and even a bit of example code for subclassing it to change the behavior, but otherwise it's not very well documented. Here, I present this technique to you.

** A CPU bound operation is one that ties up your processor, making it crunch on something. Contrast this with an I/O bound operation that spends most of its time waiting on external device interactions like a hard drive's disk reads or a network controller's incoming packets. The distinction is important, because there's good reason to have more I/O bound operations than cores, depending on how many devices you are talking to. An I/O bound operation does not tie up CPU cycles (aside from a miniscule bit of overhead to make it work).

Conceptualizing this Mess

The ThreadPool as I said, allocates a tremendous amount of threads (or at least stand ins for them) up front and basically takes queuing long operations off the table, because you'll usually have a thread available, even if you don't have an idle core to run it on - or at least this is default behavior. This setup on my system gives me roughly 1023 threads per core, per process in the pool. That's ridiculous. Even if my app had that many long running CPU bound tasks for some reason I can't fathom, there's no way I'd want to try to execute them all at once! No, what I'd like to do is allocate a certain number of workers, and then dedicate those workers to fulfilling the tasks as the workers become available, so that tasks are waiting in line to be completed, even as more than one are being completed at once. One of the main reasons for doing this is performance, but also it can be simply a manageability issue. The more of these tasks you are juggling at once, the more unwieldy and resource hungry your app can become. This isn't what we want.

Luckily, the Task framework gives you a way to customize scheduling if you're willing to use TaskFactory.StartNew() instead of Task.Run(). To facilitate this, we have to pass a custom TaskScheduler as an argument to the TaskFactory's constructor. First however, we need a task scheduler implementation to pass it. Microsoft gives us two, and neither does what we want. The example code I linked to however, comes close. With a bit of hammering on it, I was able to recreate a pretty faithful reproduction of the behavior of my previous application, but using this entirely different paradigm.

Basically, it has facilities for queuing and dequeuing tasks, as well as executing them normally or on the current thread. It also includes a member use by debuggers to enumerate the tasks. We'll be implementing most of this here.

Once we do, the rest of the app is straightforward. It just spawns a new task whenever a button is clicked, linking the task to a progress control it adds to the display. The scheduler handles when it runs. We use an anonymous delegate for linking the control which is much simpler than my original application, although this is a special use case, and that application's infrastructure supports far more than scheduling whereas this is a one trick pony. Still, it's a useful pony with a neat trick.

Coding this Mess

The User Interface

First let's look at the user interface. The meat of it is in the "Enqueue Work" button's click handler:

C#
var wpc = new WorkerProgressControl(_nextTaskId++);
ProgressPanel.SuspendLayout();
ProgressPanel.Controls.Add(wpc);
wpc.Dock = DockStyle.Top;
ProgressPanel.ResumeLayout(true);
new TaskFactory(_taskSched).StartNew(()=> { 
    // TODO: replace with actual work
    for(var i = 0;i<50;++i)
    {
        Thread.Sleep(100);
        BeginInvoke(new Action(()=> {
            wpc.Value = i * 2;
        }));
    }
    wpc.Value = 100;
});

_taskSched is our custom task scheduler instance. Note how we're passing it to TaskFactory's constructor on the bolded line. This allows us to swap out the standard scheduling behavior with ours that constrains the number of concurrent tasks. In the delegate, there's the "work" that our long running task performs. Note how we're updating the UI periodically within it, using BeginInvoke(). This is to report our progress in a thread safe manner. This is a bit different than how the last article's app worked, but it's easier this time than it was in the previous article.

Our WorkerProgressControl is basically just a progress bar and a label like in the last article. We dock them as we add more tasks to create a list. Unlike the last app, we can let the C# compiler manage our lookup for our control by using its anonymous method hoisting feature. Basically, we just reference wpc from inside the anonymous method and it works like magic - like it always does.

The nice thing here is, there's no work here done to schedule the task. All of that is handled by _taskSched. Our UI code doesn't have to care, which keeps it dead simple. A user interface should be glue, and not much more. Let's keep the scheduling logic where it belongs!

The Task Scheduler

Speaking of our custom TaskScheduler, let's visit it now. We use it to track the current tasks, get the next task, execute tasks, and gather various statistics on tasks. Much of this is a shameless lift of Microsoft's example code I linked to in the introduction, but I've modified it to gather more statistics and to allow you to change the amount of concurrency allowed for during the lifetime of the object, not just on creation. Let's dive in, starting with our member fields:

C#
// Indicates whether the current thread is processing work items.
[ThreadStatic]
static bool _currentThreadIsProcessingItems;

// The list of tasks to be executed.
readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks)

// The maximum concurrency level allowed by this scheduler.
int _maximumTaskCount = Math.Max(1,Environment.ProcessorCount-1);

// Indicates the currently processing work items.
int _pendingTaskCount = 0;

// Indicates the waiting task count.
int _waitingTaskCount = 0;

First, we have a flag that indicates whether the current thread is processing work items. This is set on a thread by thread basis and checked when we try to "inline" a task to run on the same thread. Otherwise, it's not much use to us.

Second, we have a linked list that stores our tasks. We lock all access to it, using lock. I thought this was terrible at first, and I was ready to replace the whole thing with a slimmer synchronization facility, but when I did some reading, I found out that in this case, due to thread access patterns, the lock performs as good or better than most of the other options in this scenario, and better than anything non-trivial. At one point, I probably would have vowed to only use it at gunpoint but in the end, here we are. The linked list is appropriate because tasks are added as often as they are removed, and removal happens at the first node.

Now, we have our maximum task count, which it tries to set to the ideal number based on the hardware of your machine. Basically it wants to use all your cores minus one, with a minimum of one.

Finally, we have our _pendingTaskCount and _waitingTaskCount which track the number of items running** and the number of items waiting.

** Sort of. We actually get this decremented some time before the task is finished running. There's a way to track the end of the task and get it on the nose, but it's far less efficient. Consider this value to be an estimate.

Now let's put these fields to work. The first non-trivial member we encounter is QueueTask(), which is implemented thus:

C#
// Add the task to the list of tasks to be processed. If there aren't enough
// delegates currently queued or running to process tasks, schedule another.
lock (_tasks)
{
    _tasks.AddLast(task);
    ++_waitingTaskCount;
    if (_pendingTaskCount < _maximumTaskCount)
    {
        ++_pendingTaskCount;
        _NotifyThreadPoolOfPendingWork();
    }
}

All it's doing is taking a lock on the list, adding a task and updating our statistics. If it's less than the number of tasks we allow for, we increase the pending tasks and notify the thread pool, which brings us to _NotifyThreadPoolOfPendingWork():

C#
ThreadPool.UnsafeQueueUserWorkItem((object state) =>
{
    // Note that the current thread is now processing work items.
    // This is necessary to enable inlining of tasks into this thread.
    _currentThreadIsProcessingItems = true;
    try
    {
        // Process all available items in the queue.
        while (true)
        {
            Task item;
            lock (_tasks)
            {
                // When there are no more items to be processed,
                // note that we're done processing, and get out.
                if (0 == _tasks.Count)
                {
                    --_pendingTaskCount;
                    break;
                }

                // Get the next item from the queue
                item = _tasks.First.Value;
                _tasks.RemoveFirst();
                --_waitingTaskCount;
            }

            // Execute the task we pulled out of the queue
            base.TryExecuteTask(item);
                  
        }
    }
    // We're done processing items on the current thread
    finally { _currentThreadIsProcessingItems = false; }
}, null);

This is a bit more substantial. First, we call UnsafeQueueUserWorkItem(). You might be wondering why we're not calling QueueUserWorkItem() and I wondered that too, but then I found this. We aren't calling ExecutionContext.Capture() which is an expensive call. The downside is this matters when your code runs in a restricted environment because it can elevate privileges. Performance is the only reason I could find for discovering this call in the example code from Microsoft. Looking at the rest of this code of theirs, it was implemented better than I expected for an example, so I'm sticking with this call. Presumably the performance payoff is worth it though with long running tasks I wouldn't think so, or perhaps more importantly, there might be gotchas I'm unaware of with the more traditional method in this scenario. I can't be sure so ironically, I'm playing it safe by sticking to UnsafeQueueUserWorkItem()!

Inside the anonymous method, we have to do our work, so we set the per-thread flag indicating we're busy processing, and then we loop through the tasks. Note we're locking inside the loop. We do that so we can keep the base.TryExecuteTask(item) outside of the lock, and this is important for performance and to prevent potential deadlocks. Inside the lock, we update our statistics, get the time, and remove it from the task list. Finally, we exit the lock. Once we're done, we set our per-thread flag back to idle.

We didn't have to use the system thread pool here. You can schedule your tasks to execute on raw threads or even the same thread if you want it to always block. It all depends on how you implement TaskScheduler.

Next, we have TryExecuteInline() which attempts to execute the task on the current thread:

C#
// If this thread isn't already processing a task, we don't support inlining
if (!_currentThreadIsProcessingItems) return false;

// If the task was previously queued, remove it from the queue
if (taskWasPreviouslyQueued)
    // Try to run the task.
    if (TryDequeue(task))
        return base.TryExecuteTask(task);
    else
        return false;
else
    return base.TryExecuteTask(task);

First, what we're doing here is checking if we're currently processing, because for this to work, we have to be, or we'll never get an opportunity from this thread in the first place. If we are, then we see if the task was already in the queue. If so, we remove and run it immediately, otherwise if it's not in the queue we return false. Finally, if it wasn't previously queued, we simply run the task. The removal is so we don't run the task twice.

TryDequeue() simply tries to remove an item:

C#
lock (_tasks)
{
    if (_tasks.Remove(task))
    {
        --_waitingTaskCount;
        return true;
    }
    return false;
}

The only other thing we're doing aside from locking the list and removal is updating the waiting statistic.

Finally, we have GetScheduledTasks():

C#
bool lockTaken = false;
try
{
    Monitor.TryEnter(_tasks, ref lockTaken);
    if (lockTaken) return _tasks;
    else throw new NotSupportedException();
}
finally
{
    if (lockTaken) Monitor.Exit(_tasks);
}

The thing about this method is that it's intended for debuggers, and we can't block on the debugger thread so we simply try to enter the lock, and if we can't, then we throw. Oddly enough, the debugger can handle an exception better than a blocking call, and the documentation for the method says as much, in so many words.

And that's it. There's really not a lot to it. It's just a matter of knowing what to look for.

History

  • 18th July, 2020 - Initial submission

License

This article, along with any associated source code and files, is licensed under The MIT License


Written By
United States United States
Just a shiny lil monster. Casts spells in C++. Mostly harmless.

Comments and Discussions

 
GeneralMy vote of 5 Pin
Member 1551695810-Feb-22 3:26
Member 1551695810-Feb-22 3:26 
QuestionHow to run a "Task" Object on a Custom Scheduler? Pin
themightylechuck23-Nov-20 18:05
themightylechuck23-Nov-20 18:05 
AnswerRe: How to run a "Task" Object on a Custom Scheduler? Pin
honey the codewitch23-Nov-20 20:19
mvahoney the codewitch23-Nov-20 20:19 
GeneralRe: How to run a "Task" Object on a Custom Scheduler? Pin
themightylechuck23-Nov-20 22:28
themightylechuck23-Nov-20 22:28 
GeneralRe: How to run a "Task" Object on a Custom Scheduler? Pin
honey the codewitch24-Nov-20 1:25
mvahoney the codewitch24-Nov-20 1:25 
GeneralRe: How to run a "Task" Object on a Custom Scheduler? Pin
themightylechuck24-Nov-20 1:38
themightylechuck24-Nov-20 1:38 
GeneralRe: How to run a "Task" Object on a Custom Scheduler? Pin
honey the codewitch24-Nov-20 2:04
mvahoney the codewitch24-Nov-20 2:04 
Questionget a error Pin
basilizak29-Sep-20 12:37
basilizak29-Sep-20 12:37 
AnswerRe: get a error Pin
honey the codewitch29-Sep-20 19:29
mvahoney the codewitch29-Sep-20 19:29 
GeneralRe: get a error Pin
basilizak30-Sep-20 4:46
basilizak30-Sep-20 4:46 
GeneralRe: get a error Pin
honey the codewitch1-Oct-20 5:14
mvahoney the codewitch1-Oct-20 5:14 
BugRe: get a error Pin
honey the codewitch1-Oct-20 5:29
mvahoney the codewitch1-Oct-20 5:29 
Questioncross platform? Pin
Member 1258610821-Jul-20 2:48
Member 1258610821-Jul-20 2:48 
AnswerRe: cross platform? Pin
honey the codewitch21-Jul-20 3:25
mvahoney the codewitch21-Jul-20 3:25 
QuestionReally nice set of articles but a terminology question. Pin
asiwel20-Jul-20 7:39
professionalasiwel20-Jul-20 7:39 
AnswerRe: Really nice set of articles but a terminology question. Pin
honey the codewitch20-Jul-20 7:51
mvahoney the codewitch20-Jul-20 7:51 
GeneralRe: Really nice set of articles but a terminology question. Pin
asiwel20-Jul-20 8:24
professionalasiwel20-Jul-20 8:24 
GeneralRe: Really nice set of articles but a terminology question. Pin
honey the codewitch20-Jul-20 8:34
mvahoney the codewitch20-Jul-20 8:34 
GeneralRe: Really nice set of articles but a terminology question. Pin
asiwel20-Jul-20 9:00
professionalasiwel20-Jul-20 9:00 
Thank you.

BTW: The paragraph I recalled that left me thinking adrift was this one from your previous "A Thread Pooling and Task Queuing Demonstration Using Message Passing" project:
"
Quote:
Technically, due to the limitations of a Windows Form app (it already spins its own loop out of the UI thread), you can't use the UI thread on the same thread as the client thread, so we really should be subtracting an extra core and therefore worker from the default pool size, but in the demo it doesn't bother. In a console app or a windows service, you can spin the client loop on the main thread, either in Main() or in OnStart() respectively and do the client logic from inside there. This is how I recommend doing it, when possible. There's a way to use the WinForms UI thread but it's fraught with limitations and complications. If you really want to do it, you can adapt the code from this article. You'd need the workers to communicate with the UI thread the client runs on using window messages instead of one of our message queues from above.


particularly "...it already spins its own loop out of the UI thread)..." I just was not quite sure what that meant.
GeneralRe: Really nice set of articles but a terminology question. Pin
honey the codewitch20-Jul-20 9:06
mvahoney the codewitch20-Jul-20 9:06 
GeneralRe: Really nice set of articles but a terminology question. Pin
honey the codewitch20-Jul-20 9:08
mvahoney the codewitch20-Jul-20 9:08 
GeneralRe: Really nice set of articles but a terminology question. Pin
asiwel20-Jul-20 9:15
professionalasiwel20-Jul-20 9:15 
GeneralRe: Really nice set of articles but a terminology question. Pin
honey the codewitch20-Jul-20 10:36
mvahoney the codewitch20-Jul-20 10:36 
GeneralRe: Really nice set of articles but a terminology question. Pin
honey the codewitch22-Jul-20 5:15
mvahoney the codewitch22-Jul-20 5:15 
GeneralRe: Really nice set of articles but a terminology question. Pin
asiwel22-Jul-20 7:02
professionalasiwel22-Jul-20 7:02 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.