Introduction
Consider the situation: you've got a program that should continuously send big amounts of messages somewhere. For example, when inserting into a database,
or when data is sent to a billing system. And the channel, for some reason, is broken.
The first thing that comes to mind is to store messages in a collection like List
. In other words, in RAM. However if the channel is broken for
a day or more, the RAM size might not be enough. Then we come to storage on HDD (Hard Disk Drive). The HDD space will increase the chances for your application
to survive a long connectionless period.
In this article, several classes will be presented, which might be used for buffering your messages to HDD and then, when the connection is restored,
for reading them back and sending them to the happy customer.
Whole picture
On the following picture, the example class diagram is shown. Classes HddBufferReader
, HddBufferWriter
, IMessageConverter
,
SerializingConverter
, and ValueMonitor
are "production" classes. OneSideChannel
and Network
were developed for demonstration purposes, they are useless outside of the example. You should provide your own implementation of these classes, or build the system the other way.
HddBufferWriter
and HddBufferReader
are used to perform the buffering into the HDD and reading the messages back.
Both classes aggregate the IMessageConverter
interface, which should contain the serializing logic.
In the example program, we send several messages through OneSideChannel
while the connection is broken and then restored. The breakage
and the restoration of the connection is made by calling the helper BreakConnection
and RestoreConnection
functions of the Network
class.
In the "real world" program, you will have to determine whether the connection is broken or restored yourself.
The Network
class aggregates ValueMonitor<bool>
, which is used to raise events when the value of the underlying variable is changed.
See the code for an example program:
static void Main(string[] args)
{
string bufferingFolder = @"localBuffer";
OneSideChannel<int> channel =
new OneSideChannel<int>(new SerialisingConverter<int>(), bufferingFolder);
Console.WriteLine("Start sending messages");
channel.Send(1);
channel.Send(2);
Console.WriteLine("Breaking conection");
channel.ActualNetworking.BreakConnection();
channel.Send(3);
channel.Send(4);
channel.Send(5);
Console.WriteLine("Restoring connection");
channel.ActualNetworking.RestoreConnection();
channel.Send(6);
channel.Send(7);
Console.ReadKey();
}
Serializing technique
To write and to read from HDD, we need to serialize our messages. HddBufferReader
and HddBufferWriter
use IMessageConverter
,
which should provide serializing. Implementing this interface is your task, but if you're going to use BinaryFormatter
for serializing - then there is already
a nice and short implementation - the SerializingConverter
class.
However, to speed up writing and reading operations and to save disk space, it's highly recommended to use
Manual serializing
or alternatives like protobuf-net (the last has some restrictions on classes you
might wish to serialize). In our example code, to keep things easier, we use the SerializingConverter
class for serialization.
Buffering
Buffering to HDD is handled by the HddBufferWriter
class. It has a Write
method - actually for buffering, then it has a CloseFile
method
for signaling that buffering is finished and the file should be closed and renamed, so it could be picked up by the Reader. There is also an opportunity
to find out the current file size. If you've got some policy for file size ("no more than 2 GB", for example), then you should check if the limit has exceeded, and if yes,
call the CloseFile
method. The next Write
call will create a new file.
Here's the code:
public class HddBufferWriter<T>:IDisposable
{
private string workingFolder;
private FileStream currentFile = null;
private string finishExtension;
private string filePrefix;
private IMessageConverter<T> converter;
public HddBufferWriter(IMessageConverter<T> aConverter,
string theFolder, string prefix, string extension)
{
converter = aConverter;
if (Directory.Exists(theFolder) == false)
Directory.CreateDirectory(theFolder);
workingFolder = theFolder;
filePrefix = prefix;
finishExtension = extension;
}
public HddBufferWriter(IMessageConverter<T> aConverter, string theFolder) :
this(aConverter, theFolder, "Buf", ".buffered") { }
public void Write(T toWrite)
{
if (currentFile == null)
{
string newFileName = Path.Combine(workingFolder,
string.Format("{0}_{1:MM_dd_hh_mm_ss}.tmp", filePrefix, DateTime.Now));
currentFile = new FileStream(newFileName, FileMode.Create, FileAccess.Write);
}
converter.WriteToStream(toWrite, currentFile);
}
public void Dispose() { CloseFile(); }
public void CloseFile()
{
if (currentFile != null)
{
string file = currentFile.Name;
string dest = Path.ChangeExtension(file, finishExtension);
currentFile.Close();
currentFile = null;
File.Move(file, dest);
}
}
public long BytesWritten
{
get
{
if (currentFile != null)
return currentFile.Length;
else return 0;
}
}
}
HddBufferReader
gives some iteration possibilities. It has an Available
property, which is true
if there are buffered messages
which can be read. The Current
property returns the current deserialized object. If you call the Current
property several times, you'll get the same object.
Like if the iterator points to the same place. Only after calling the AcceptCurrent
method, the attempt to "move the iterator" is made (a trial to read
the next message or to remember that the last message was read). See the code below:
public class HddBufferReader<T>:IDisposable
{
private T current = default(T);
private bool currentPresent = false;
private string workingFolder;
private string workingExtension;
private FileStream currentFile = null;
private IMessageConverter<T> converter;
public HddBufferReader(IMessageConverter<T> aConverter,
string folder, string extension)
{
converter = aConverter;
workingFolder = folder;
if (Directory.Exists(workingFolder) == false)
Directory.CreateDirectory(workingFolder);
workingExtension = extension;
if (workingExtension[0] != '.')
throw new ArgumentException("File extension must have '.' character first",
"extension");
}
public HddBufferReader(IMessageConverter<T> aConverter,
string folder):this(aConverter, folder, ".buffered") {}
public void Dispose()
{
if (currentFile != null)
{
currentFile.Close();
currentFile = null;
}
}
public T Current
{
get
{
if (currentPresent == false && Available)
{
string[] files = Directory.GetFiles(workingFolder, "*" + workingExtension);
currentFile = new FileStream(files[0], FileMode.Open, FileAccess.Read);
current = converter.ReadFromStream(currentFile);
currentPresent = true;
}
return current;
}
}
public bool Available
{
get
{
string[] files = Directory.GetFiles(workingFolder, "*" + workingExtension);
if (files.Length == 0)
{
return false;
}
else
{
return true;
}
}
}
public void AcceptCurrent()
{
if (currentPresent == false) return;
if (currentFile.Position >= currentFile.Length - 1)
{
current = default(T);
currentPresent = false;
string fileToDelete = currentFile.Name;
currentFile.Close();
File.Delete(fileToDelete);
currentFile = null;
return;
}
else
{
current = converter.ReadFromStream(currentFile);
currentPresent = true;
}
}
}
How it works
Now let's see the way the objects interact. Three cases are shown on the picture below: when everything is neat and nice, when the connection is broken, and when it's restored.
The first case: connection is OK
- 1. External object (in our case, the
Program.Main
function) calls the Send
method of the OneSideChannel
object. - 1.1
OneSideChannel
forwards the send query to the Network
object. Because the connection is OK, we don't need any buffering logic.
The second case: connection is broken.
- 2. External object calls the
Send
method of the OneSideChannel
object. - 2.1 Because the channel is broken,
OneSideChannel
calls the Write
method of the HddBufferWriter
object.
The third case: connection has been just restored.
- 3. The
ValueMonitor
variable in the Network
object raises the ValueChanged
event with newValue
equal to true
. - 3.1 - 3.2 While there are messages available, the
OneSideChannel
object takes the serialized messages one by one from the reader, and after successfully
sending, accepts the sending by calling AcceptCurrent
. The next time Current
returns the next object, read from the file. BTW, if objects are not available
any more, you call the Current
property again, and you'll get the default(T)
object.
Points of interest
Determining whether the connection is broken/restored is itself a hard task, which is solved in different ways for each type of channel.
Sending the buffered content is performed in the same thread, where sending "nice" messages works in the example. It's better to send buffered traffic
in a separate thread not to make other messages wait.
The example here doesn't consider the situation when during sending from HDD, the buffer connection fails again.
History
- 14.01.2012 - First published.
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.