Click here to Skip to main content
15,905,877 members
Articles / Programming Languages / XML

Using System.Threading.Tasks and BlockingCollections to FTP Multiple Files at the Same Time

Rate me:
Please Sign up or sign in to vote.
4.11/5 (2 votes)
19 Apr 2011CPOL4 min read 18.2K   2   3
How to use System.Threading.Tasks and BlockingCollections to FTP multiple files at the same time

I recently needed to write an application that would loop through a queue of files and FTP them to our Content Delivery Network for streaming. Users upload files, and our administrators can mark some of them as urgent. Urgent ones need to jump to the front of the queue, otherwise everything should be ordered by broadcast date. My initial code was basically a loop that looked something like this:

C#
While (GetFtpQueue().Count > 0)
{
    // Gather all the info from the db,  Ftp the file, 
    // then clean up (move the source file, update the db, etc.
}

It worked beautifully while we were just uploading small audio files, but as soon as we started adding a lot of video files to the queue, it became so slow that it might take 2 hours or more to upload a single video file. So, we experimented with Filezilla to see how many concurrent uploads we could add before the overall speed of each upload started to drop. We found that at our location, 4 simultaneous FTP uploads seemed to hit the sweet spot: instead of uploading 1 file at 500 kb/s, we could upload all four and each one would still be at that speed, quadrupling our throughput.

I read up on using the new Threading classes in .NET 4.0, and began refactoring my FTP application. I decided to use the Task Factory to manage the threads, in conjunction with a BlockingCollection to create a classic Producer/Consumer pattern. My first attempt looked a lot like this:

C#
int maxThreads = 4;
var filesToFtp = new BlockingCollection<FtpItem>(maxThreads);
var processedFiles = new BlockingCollection<FtpItem>();

// Stage #1: Producer
var initFtp = Task.Factory.StartNew(() =>
{
    try
    {
        While (GetFtpQueue().Count > 0)
        {
            // Gather all the info from the db and use it to create FtpItem objects
            // Add them to list of filesToFtp, which only allows maxThreads in at a time 
            // (this allows us to have an urgent item jump to the top while 
            // current items are still FTPing)
            filesToFtp.Add(new FtpItem { ... };
        }
    }
    finally { filesToFtp .CompleteAdding(); }
});

// Stage #2 Consumer of initFtpTask and Producer for Cleanup Task
var process = Task.Factory.StartNew(() =>

{
    try
    {
        foreach(var file in filesToFtp.GetConsumingEnumerable()
        {
            // Ftp the file
            // Add to list of processedFiles
            processedFiles.Add(file);
        }
    }
    finally { processedFiles.CompleteAdding(); }
});

// Stage #3
var cleanup = Task.Factory.StartNew(() =>
{
    foreach(var file in processedFiles.GetConsumingEnumerable()
    {
        // Clean up (move the source file, update the db, etc.
    }
});

Task.WaitAll(initFtp, process, cleanup);

Initially, this looked quite promising. I wrote a bare bones version of it like the one above that just did thread.sleep to simulate work and iterated through a list of ints. I was able to verify that each "stage" was running on its own thread, that it never allowed more than 4 items through at a time, that I could add items to the front of the queue and get them processed next, and that it never tried to 'cleanup' a file until that file had passed through both stage 1 and stage 2. However, I did notice that the elapsed time was the same as when I ran a similar unit test in a simple while loop. It might be obvious to you why this is, but at the time I put it down to a limitation of the unit test and pushed my new code to production. The first thing I noticed was that it wasn't any faster. Not even slightly. It took me hours of staring at the code to finally figure out why my multi threaded code was not running any faster, but the answer is simple: I only created one consumer of filesToFtp. I had incorrectly assumed that because I was creating up to 4 ftpItems at a time, and the FTP process was running on its own thread, that it would consume as many as it could, but the reality is that in the code above, while each of the three stages are running on their own thread, the whole process was still happening in series, since stage 1 doesn't create 4 items at once, it creates them one after the other, stage 2 does begin working before stage 1 is complete (as soon as there is an item to consume), but then it will be busy Ftping that first item until that item is fully uploaded, only then will it grab the next file.

To resolve this problem, I simply wrapped stage 2 in a for loop, and created an IList of Tasks to wait on:

C#
int maxThreads = 4;
var filesToFtp = new BlockingCollection<FtpItem>(maxThreads);
var processedFiles = new BlockingCollection<FtpItem>();
IList<Task> tasks = new List<Task>();

// Stage #1: Producer
tasks.Add(Task.Factory.StartNew(() =>
{
    try
    {
        While (GetFtpQueue().Count > 0)
        {
            // Gather all the info from the db and use it to create FtpItem objects
            // Add them to list of filesToFtp, which only allows maxThreads in at a time 
            // (this allows us to have an urgent item jump to the top while 
            // current items are still FTPing)
            filesToFtp.Add(new FtpItem { ... };
        }
    }
    finally { filesToFtp .CompleteAdding(); }
}));

// Start multiple instances of the ftp process
for (int i = 0; i < maxThreads; i++)
{
    // Stage #2 Consumer of initFtpTask and Producer for Cleanup Task
    tasks.Add(Task.Factory.StartNew(() =>
    {
	try
	{
		foreach(var file in filesToFtp.GetConsumingEnumerable()
		{
			// Ftp the file
			// Add to list of processedFiles
			processedFiles.Add(file);
		}
	}
	finally { processedFiles.CompleteAdding(); }
	}));
}

// Stage #3
tasks.Add(Task.Factory.StartNew(() =>
{
	foreach(var file in processedFiles.GetConsumingEnumerable()
	{
		// Clean up (move the source file, update the db, etc.
	}
}));

Task.WaitAll(tasks.ToArray());

I reran the unit test and it was faster! Very nearly 4 times faster in fact. Wahoo! I updated the code, published my changes and sat back. Sure enough, the FTP process finally started to make up some ground. In the mean time, I went back to my unit test and began tweaking. The first thing I noticed was that sometimes I would get a "System.InvalidOperationException: The BlockingCollection<T> has been marked as complete with regards to additions." Luckily, this didn't take a lot of head scratching to figure out: the first thread to reach the 'finally' clause of stage 2 closed the processedFiles collection, leaving the other three threads hanging. A final refactoring resolved the issue:

C#
int maxThreads = 4;
var filesToFtp = new BlockingCollection<FtpItem>(maxThreads);
var processedFiles = new BlockingCollection<FtpItem>();
IList<Task> tasks = new List<Task>();

// maintain a separate list of wait handles for the FTP Tasks, 
// since we need to know when they all complete in order to close 
// the processedFiles blocking collection
IList<Task> ftpProcessTasks = new List<Task>();

// Stage #1: Producer
tasks.Add(Task.Factory.StartNew(() =>
{
	try
	{
		While (GetFtpQueue().Count > 0)
		{
			// Gather all the info from the db and use it to create FtpItem objects
			// Add them to list of filesToFtp, which only allows maxThreads in at a time 
            // (this allows us to have an urgent item jump to the top while 
            // current items are still FTPing)
			filesToFtp.Add(new FtpItem { ... };
		}
	}
	finally { filesToFtp .CompleteAdding(); }
}));

// Start multiple instances of the ftp process
for (int i = 0; i < maxThreads; i++)
{
	// Stage #2 Consumer of initFtpTask and Producer for Cleanup Task
	ftpProcessTasks.Add(Task.Factory.StartNew(() =>
	{
		try
		{
			foreach(var file in filesToFtp.GetConsumingEnumerable()
			{
				// Ftp the file
				// Add to list of processedFiles
				processedFiles.Add(file);
			}
		}
	}));
}

// Stage #3
tasks.Add(Task.Factory.StartNew(() =>
{
	foreach(var file in processedFiles.GetConsumingEnumerable()
	{
		// Clean up (move the source file, update the db, etc.
	}
}));


// When all the FTP Threads complete
Task.WaitAll(ftpProcessTasks.ToArray());

// Notify the stage #3 cleanup task that there is no need to wait, there will be no more processedFiles.
processedFiles.CompleteAdding();

// Make sure all the other tasks are complete too.
Task.WaitAll(tasks.ToArray());

Download a working example (just enter your FTP Server details prior to running).

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer (Senior) Salem Web Network
United States United States
Robert Williams has been programming web sites since 1996 and employed as .NET developer since its release in 2002.

Comments and Discussions

 
GeneralMy vote of 5 Pin
tracsaba11-Aug-13 23:08
tracsaba11-Aug-13 23:08 
Questionif any exception during upload, how to cancel all tasks? Pin
TongGuo5-Jul-11 10:33
TongGuo5-Jul-11 10:33 
AnswerRe: if any exception during upload, how to cancel all tasks? Pin
Williarob6-Jul-11 9:17
Williarob6-Jul-11 9:17 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.