Click here to Skip to main content
15,881,172 members
Articles / Programming Languages / C#

Batch.Parallel

Rate me:
Please Sign up or sign in to vote.
4.74/5 (12 votes)
13 Dec 2020CPOL2 min read 17.6K   168   18   13
Processing batches of data in parallel
Sometimes Parallel.ForEach is not what we want - instead, we need to process pieces of each collection and have control over when all the processing is complete.

Introduction

Parallel.ForEach is great when you want to process each element if collection in parallel, up to the number of logical processors. However, sometimes we need the ability to process a portion, or batch, of the entire collection, each in its own thread. A use case for this requirement is database transactions -- let's say you have a large number of transactions to insert or update, which can be done in parallel. With Parallel.ForEach, you would have to initialize the connection (granted, they are pooled) and other time and memory consuming activities, like initializing the Linq2SQL context. For example:

C#
Parallel.ForEach(someCollection, item =>
{
  var conn = new SqlConnection(connectionString);
  
  using (var context = new ModelDataContext(conn))
  {
    // Do something with item on the context.
  }
});

This sort of defeats the purpose of processing the collection in parallel. And while you could create a ConcurrentDictionary with the thread ID as a key and the context as the value, that is both silly and not a general solution.

Let's Look at Parallel.ForEach

Let's take a quick look at Parallel.ForEach:

Here's a simple test function that looks at how many threads Parallel.ForEach actually creates to process a simple collection of integers. Notice we have a 1ms delay in each iteration to force the threads into actually doing a little work.

C#
class Program
{
  static ConcurrentDictionary<int, int> threadIdCounts;

  static void Main(string[] args)
  {
    threadIdCounts = new ConcurrentDictionary<int, int>();
    var plr = Parallel.ForEach(Enumerable.Range(0, 1000), DoSomething);

    threadIdCounts.ForEach(kvp => Console.WriteLine($"TID: {kvp.Key}, Count = {kvp.Value}"));
  }

  static void DoSomething(int n)
  {
    DoWork();
  }

  static void DoWork()
  {
    int tid = Thread.CurrentThread.ManagedThreadId;

    if (!threadIdCounts.TryGetValue(tid, out int count))
    {
      threadIdCounts[tid] = 0;
    }

    threadIdCounts[tid] = count + 1;
    Thread.Sleep(1);
  }
}

Notice that Parallel.Task ended up creating 5 threads even though my laptop only has four logical cores:

C#
TID: 1, Count = 189
TID: 3, Count = 189
TID: 4, Count = 189
TID: 5, Count = 189
TID: 6, Count = 244
Press any key to continue . . .

Also, Parallel.ForEach blocks the calling thread until all the tasks are complete, which is also something you may not wish to do.

Introducing BatchParallel

BatchParallel is an extension method that splits the collection into n / numProcessors sub-collections, then invokes the action for each sub-collection, adding an additional task for any remainder.

C#
// Process a subset of the collection on separate threads.
public static Task[] BatchParallel<T>(this IEnumerable<T> collection, 
       Action<IEnumerable<T>> action, bool singleThread = false)
{
  int processors = singleThread ? 1 : Environment.ProcessorCount;
  int n = collection.Count();
  int nPerProc = n / processors;
  Task[] tasks = new Task[processors + 1];

  processors.ForEach(p => tasks[p] = 
             Task.Run(() => action(collection.Skip(p * nPerProc).Take(nPerProc))));

  int remainder = n - nPerProc * processors;
  var lastTask = Task.Run(() => 
      action(collection.Skip(nPerProc * processors).Take(remainder)));
  tasks[processors] = lastTask;

  return tasks;
}

Furthermore, it returns the Task collection so you choose when you want to await on the completion of the tasks. There is also an option to run all the tasks on a single thread, which I find makes debugging a lot easier.

Usage Example

This example shows both Parallel.ForEach and the BatchParallel usage:

C#
class Program
{
  static ConcurrentDictionary<int, int> threadIdCounts;

  static void Main(string[] args)
  {
    Console.WriteLine("Parallel.ForeEach example:");
    threadIdCounts = new ConcurrentDictionary<int, int>();
    var plr = Parallel.ForEach(Enumerable.Range(0, 1000), DoSomething);
    threadIdCounts.ForEach(kvp => Console.WriteLine($"TID: {kvp.Key}, Count = {kvp.Value}"));

    Console.WriteLine("\r\nBatchParallel example:");
    threadIdCounts = new ConcurrentDictionary<int, int>();
    var tasks = Enumerable.Range(0, 1000).BatchParallel(batch => DoSomething(batch));
    Task.WaitAll(tasks);
    threadIdCounts.ForEach(kvp => Console.WriteLine($"TID: {kvp.Key}, Count = {kvp.Value}"));
  }

  static void DoSomething(int n)
  {
    DoWork();
  }

  static void DoSomething<T>(IEnumerable<T> batch)
  {
    // Do setup stuff

    // The process the batch.
    batch.ForEach(n => DoWork());
  }

  static void DoWork()
  {
    int tid = Thread.CurrentThread.ManagedThreadId;
  
    if (!threadIdCounts.TryGetValue(tid, out int count))
    {
      threadIdCounts[tid] = 0;
    }

    threadIdCounts[tid] = count + 1;
    Thread.Sleep(1);
  }
}

Result

C#
Parallel.ForeEach example:
TID: 1, Count = 244
TID: 3, Count = 189
TID: 4, Count = 189
TID: 5, Count = 189
TID: 6, Count = 189

BatchParallel example:
TID: 3, Count = 250
TID: 4, Count = 250
TID: 5, Count = 250
TID: 6, Count = 250
Press any key to continue . . .

Things to Note

Notice that BatchParallel created threads only for the number of logical cores that I have and split the work evenly. You can also pass in an optional parameter as true if you want all the items in the collection to process in parallel, and lastly, the collection of Task objects is returned, giving you the choice as to when to wait for the completion of the tasks.

Remainder Edge Case

As a simple non-unit test example of a case when there is a remainder. Given:

C#
tasks = Enumerable.Range(0, 1003).BatchParallel(batch => DoSomething(batch));

We now see:

C#
BatchParallel with remainder example:
TID: 3, Count = 250
TID: 4, Count = 250
TID: 5, Count = 253
TID: 6, Count = 250
Press any key to continue . . .

Note that the Task library reused thread ID 5 to process the first 250 and the remaining 3.

Additional Extension Methods I'm Using

I'm also using these extension methods:

C#
public static void ForEach<T>(this IEnumerable<T> collection, Action<T> action)
{
  foreach (var item in collection)
  {
    action(item);
  }
}

public static void ForEach(this int n, Action<int> action)
{
  for (int i = 0; i < n; i++)
  {
    action(i);
  }
}

Conclusion

Not sure what to say here, the work should speak for itself. Hopefully, you find this useful, and you can always implement a static method instead of an extension method:

C#
public static class Batch
{
  public static Task[] Parallel<T>(this IEnumerable<T> collection, 
         Action<IEnumerable<T>> action, bool singleThread = false)
  {
    int processors = singleThread ? 1 : Environment.ProcessorCount;
    int n = collection.Count();
    int nPerProc = n / processors;
    Task[] tasks = new Task[processors + 1];

    processors.ForEach(p => tasks[p] = 
               Task.Run(() => action(collection.Skip(p * nPerProc).Take(nPerProc))));

    int remainder = n - nPerProc * processors;
    var lastTask = Task.Run(() => 
        action(collection.Skip(nPerProc * processors).Take(remainder)));
    tasks[processors] = lastTask;

    return tasks;
  }
}

Usage:

C#
tasks = Batch.Parallel(Enumerable.Range(0, 1003), batch => DoSomething(batch));
Task.WaitAll(tasks);

Have fun!

History

  • 13th December, 2020: Initial version

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Architect Interacx
United States United States
Blog: https://marcclifton.wordpress.com/
Home Page: http://www.marcclifton.com
Research: http://www.higherorderprogramming.com/
GitHub: https://github.com/cliftonm

All my life I have been passionate about architecture / software design, as this is the cornerstone to a maintainable and extensible application. As such, I have enjoyed exploring some crazy ideas and discovering that they are not so crazy after all. I also love writing about my ideas and seeing the community response. As a consultant, I've enjoyed working in a wide range of industries such as aerospace, boatyard management, remote sensing, emergency services / data management, and casino operations. I've done a variety of pro-bono work non-profit organizations related to nature conservancy, drug recovery and women's health.

Comments and Discussions

 
QuestionConcurrentDictionary using Thread ID as key? Pin
selvinfehric29-Dec-20 21:16
selvinfehric29-Dec-20 21:16 
QuestionDifference between tasks/Parallel Pin
JvanLangen14-Dec-20 21:36
JvanLangen14-Dec-20 21:36 
AnswerRe: Difference between tasks/Parallel Pin
GerVenson18-Dec-20 6:56
professionalGerVenson18-Dec-20 6:56 
QuestionBatchParallel V Dataflow Pin
George Swan14-Dec-20 21:25
mveGeorge Swan14-Dec-20 21:25 

I tested the extension against a Dataflow pipeline consisting of a BatchBlock and an ActionBlock. As you can see there is very little difference in the times but BatchParallel uses much less memory, probably due to the DataFlow blocks' buffering capacity.

|        Method |    Mean |    Error |   StdDev | Ratio | Rank | Gen 0 | Gen 1 | Gen 2 | Allocated |
|-------------- |--------:|---------:|---------:|--:|-:|--:|--:|--:|----------:|
| BatchParallel | 1.978 s | 0.0039 s | 0.0035 s |  1.00 |    1 |     - |     - |     - |   2.21 KB |
| DataflowBatch | 1.986 s | 0.0127 s | 0.0119 s |  1.00 |    1 |     - |     - |     - |  53.13 KB |

BatchParallel is certainly easier to configure and less error prone than the DataFlow pipeline.



modified 15-Dec-20 3:33am.

QuestionSnippets? Pin
Nelek14-Dec-20 5:19
protectorNelek14-Dec-20 5:19 
AnswerRe: Snippets? Pin
Marc Clifton14-Dec-20 12:24
mvaMarc Clifton14-Dec-20 12:24 
GeneralRe: Snippets? Pin
Nelek14-Dec-20 23:15
protectorNelek14-Dec-20 23:15 
QuestionPartitioner Pin
GerVenson14-Dec-20 0:44
professionalGerVenson14-Dec-20 0:44 
AnswerRe: Partitioner Pin
Marc Clifton14-Dec-20 1:43
mvaMarc Clifton14-Dec-20 1:43 
GeneralRe: Partitioner Pin
GerVenson14-Dec-20 2:04
professionalGerVenson14-Dec-20 2:04 
GeneralRe: Partitioner Pin
Sacha Barber16-Dec-20 7:39
Sacha Barber16-Dec-20 7:39 
QuestionBatch.Parallel, nice extenstion Pin
jpgeschiere13-Dec-20 23:42
jpgeschiere13-Dec-20 23:42 
GeneralMy vote of 5 Pin
Сергій Ярошко13-Dec-20 19:57
professionalСергій Ярошко13-Dec-20 19:57 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.