Click here to Skip to main content
15,867,308 members
Please Sign up or sign in to vote.
0.00/5 (No votes)
Hey guys,
In our office we are working on large development of data processing.
The purpose of the project is to receive a large amount of text files via FTP servers,
and then execute on each file certain validations and at the end of the process to insert all the data into the SQL server.

Each file can contain millions of records.

Each entry in the file contains sections of information that are located according
to a predetermined position where each segment is validated according to a business model.

If a particular section is not validated, we insert the error neatly into the DataTable and at the end of the run process the data is saved in SQL using the Bulking method.

Here is the structure of the DataTable:
C#
dtErrors = new DataTable();
dtErrors.Columns.Add(TblErrors_Consts.IdRow, typeof(int));
dtErrors.Columns.Add(TblErrors_Consts.OperatorId, typeof(int));
dtErrors.Columns.Add(TblErrors_Consts.FileTypeId, typeof(int));
dtErrors.Columns.Add(TblErrors_Consts.FileName, typeof(string));
dtErrors.Columns.Add(TblErrors_Consts.ErrorRow, typeof(int));
dtErrors.Columns.Add(TblErrors_Consts.ErrorCodeId, typeof(int));
dtErrors.Columns.Add(TblErrors_Consts.AddDate, typeof(DateTime));


And this is the method that is responsible for entering the record into a DataTable:
C#
public void AddError(FileDetailsBE fd, int ErrorCode, int lineCounter = 0)
{
	MethodBase m = MethodBase.GetCurrentMethod();
	try
	{
		DataRow dr = dtErrors.NewRow();
		dr[TblErrors_Consts.OperatorId] = fd.ParentSafe.OperatorId;
		dr[TblErrors_Consts.FileTypeId] = fd.FileTypeId;
		dr[TblErrors_Consts.FileName] = fd.FileName;
		dr[TblErrors_Consts.ErrorRow] = lineCounter;
		dr[TblErrors_Consts.ErrorCodeId] = (int)ErrorCode;
		dr[TblErrors_Consts.AddDate] = CurrentDateTime;
		dtErrors.Rows.Add(dr);
	}
	catch (Exception ex)
	{
		GP.JobBE.Errors.Add(new JobErrorBE(ex, m));
	}
	finally
	{
		Console.ForegroundColor = ConsoleColor.Green;
	}
}


During our QA tests, we found that when all the records are wrong, the performance is drastically reduced when it is only at the point of saving the data in the temporary DataTable (even without reaching the database retention stage).

For this reason, we thought to parse all records in a file using Parallel.ForEach, and then of course we encountered a problem inserting a new record into the DataTable because the DataTable is not Thread Safe.

Even working with Try Lock has not solved this.

My question is this: How can all the rows be processed in a parallel method and that the wrong entries will still enter the DataTable?

This is the code section that is responsible for running on all records (DATA represents another DataTable which basically holds all the records that came from the text file):

C#
Parallel.ForEach(DATA.AsEnumerable(), row =>
{
	LineCounter++;
	if (LineCounter % divider == 0)
	{
		SaveDataTablesToDB(fd);
	}

	try
	{
		line = row[0].ToString();
		if (line.Trim().Length.Equals(0))
		{
			return;
		}
		eventCode = line.Substring(0, 3);
		if (eventCode != ImportManager_Consts.Event999)
		{
			EventBE eventTableBE = GetRelevantTable(fd, eventCode);
			if (eventTableBE != null)
			{
				if (line.TrimStart().TrimEnd().Length != eventTableBE.CharactersQty)
				{
					fd.Errors.AddError(fd, (int)ErrorCodes.IllegalEventLength, LineCounter);
					return;
				}

				ProcessLine(fd, line, eventTableBE, LineCounter);
			}
		}
	}
	catch (Exception ex)
	{
		ConsoleUtils.WriteErrorLine_NoObject(string.Format(ImportManager_Consts.LineCounter, LineCounter.ToString()), GP.Info);
		ConsoleUtils.WriteErrorLine_NoObject(string.Format(ImportManager_Consts.ProcessLine, line), GP.Info);
		GP.JobBE.Errors.Add(new JobErrorBE(ex, m));
		fd.Errors.AddError(fd, (int)ErrorCodes.GeneralError, LineCounter);
	}
});


What I have tried:

I replaced the DataTable in a List that holds a business entity and added the errors into the List.

At the end of running all records, all items in the List are passed to the DataTable and then I make Bulking into the SQL.

I discovered that there are a lot of leaks and the data are not kept well at all.
Posted
Updated 10-Apr-19 5:22am
v2

You're going to be "i/o bound"; multiple threads won't help. Since your data is so simple, you could have ONE process; one CSV file; and one "bulk" loader that eliminates the overhead of "building data rows".
 
Share this answer
 
0) Create a model class that represents your error info (all the properties you want to save).

1) Create a class derived from List<errorobject> to hold the objects as they are created.

2) As the files are processed, and errors are encountered, create a new instance of your ErrorObject model, and add it to the list of error objects.

3) In your list class, add a method that

  a) Creates a datatable object
  b) Uses reflection (on your error object class) to determine the colums you need, and the data types stored in those columns.
  c) Cyles through the contents of the list and adds a new row populated with each item's properties
  d) Saves the datatable contents to the database.

4) Call the method described in step 3 when a file is done being processed.

It's all quite simple, really.
 
Share this answer
 
v3

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900