Click here to Skip to main content
15,867,453 members
Articles / Web Development / HTML

The Clifton Method - Part IV

Rate me:
Please Sign up or sign in to vote.
5.00/5 (6 votes)
25 Aug 2016CPOL18 min read 11.7K   182   11  
The Semantic Publisher/Subscriber
This is Part 4 of a series of articles about a Clifton Method Core Component in which you will learn about the Semantic Publisher/Subscriber.

Article Series

Introduction

In the previous articles on the Module Manager and Service Manager, I described an dependency inversion through the use of interfaces and dynamic application configuration through the use of modules that implement services. This results in the following high level application architecture:

Image 1

The problem with this architecture is that the middle bubble "Interfaces." In my opinion, there is still too close of coupling between the application and the services because of the dependency of common interfaces.

Services typically fall into three categories:

  1. A request for information that either returns immediately, or if it takes some time, the application probably implements a Task wrapper around the request.
  2. The initiation of a "computation" (I'll use that term instead of "process") in which the application doesn't care when it returns or the service "fires and event" when the computation is complete.
  3. Monitoring some device, port, or other asynchronous activity and when a condition is met, it "fires an event" to initiate "computation" on that data to either the application or other services.

While there are many simple services that satisfy the first category, it is really the last two categories that I want to talk about here. For more complex services, the architecture that I found most flexible replaces the "Interfaces" bubble with a publisher/subscriber component:

Image 2

Here, using the Service Manager to access services via dependency inversion (interfaces) is of course still possible, but in many cases, the primary mechanism becomes the publisher/subscriber. Instead of having a common interface specification, a shared semantic "dictionary" is used.

Image 3

Using a publisher/subscriber decouples the application and services from needing to know the exact interface specification of another service. Furthermore, different versions of a service can implement different semantic messages, which increases an application's resilience to change.

The publisher/subscriber implementation I present here has the following features:

  • Is semantic -- routing to the subscriber is accomplished through the use of typed message "envelopes"
  • Implements automatic logging of messages handled by, you guessed it, a logger service of your choice
  • Subscriber exceptions are handled by the pub-sub and can be routed to an exception handling service.
  • Implements subscriber calls as either a synchronous or asynchronous call
  • Utilizes a thread pool for asynchronous processing, queuing the call onto a worker thread with the least number of pending subscriber calls
  • Is completely type safe
  • Unhandled messages (no subscriber) are thrown away
  • Subscribers are (usually) instantiated for each message, facilitating thread safety
  • Isolate message publishers and subscribers in "membranes"
  • Is itself implemented as a service, so it's accessible to all other services
  • Multiple subscribers can receive the same message
  • Automatic call on completion to IDispose for stateless subscribers implementing IDisposable

Use Cases

I've used this architecture very successfully for:

  • Implementing web servers (in fact, it's the backbone of my defacto web server implementation nowadays.)
  • Handling RabbitMQ messages.
  • Processing asynchronous events from hardware such as credit card readers, pin pads, ID scanners, iButton readers, etc.
  • Implementing ATM transaction processing.
  • Implementing CefSharp and .NET's browser control as exchangeable services to create WinForm/WPF hosted web applications.

A short list of what the combination of the Module Manager, Service Manager, and Publisher/Subscriber allows me to do:

  • Mock communication interfaces, hardware, database I/O, etc.
  • Simulate inputs from protocols and hardware, which greatly facilitates workflow testing.
  • Quickly configure an application for a variety of hardware and computation configurations.

The semantic publisher/subscriber is also (albeit an earlier incarnation) at the heart of the Higher Order Programming project which I've written about previously.

About Membrane Computing

A key concept in the publisher/subscriber is that of a "membrane." You can think of a membrane as a container, channel, vesicle, whatever. But it keeps the message contained in that membrane "space" unless permeability rules (which I won't discuss in this article) are set up. The term "membrane" comes from the concept membrane computing:

Membrane computing deals with distributed and parallel computing models, processing multisets of symbol objects in a localized manner. Thus, evolution rules allow for evolving objects to be encapsulated into compartments defined by membranes. The communications between compartments and with the environment play an essential role in the processes.

The intuition behind the notion of a membrane is a three-dimensional vesicle from biology. However the concept itself is more general, and a membrane is seen as a separator of two regions. The membrane provides for selective communication between the two regions. As per Gheorghe Păun, the separation is of the Euclidean space into a finite “inside” and an infinite “outside”. The selective communication is where the computing comes in.

The variety of suggestions from biology and the range of possibilities to define the architecture and the functioning of a membrane-based multiset processing device are practically endless. Indeed the membrane computing literature contains a very large number of models. Thus, MC is not merely a theory related to a specific model, it is a framework for devising compartmentalized models.

If the term is weird, deal with it. Image 4

A Simple Hello World Example

It's probably best to start with a semantic "hello world" example. This is the code (not including the Bootstrap, which is identical to what was used in the previous article):

C#
using System;

using Clifton.Core.Semantics;

namespace SemanticPublisherSubscriberDemo
{
  static partial class Program
  {
    static void Main(string[] args)
    {
      InitializeBootstrap();
      Bootstrap((e) => Console.WriteLine(e.Message));

      ISemanticProcessor semProc = serviceManager.Get<ISemanticProcessor>();
      semProc.Register<SurfaceMembrane, Subscriber>();
      semProc.ProcessInstance<SurfaceMembrane, ST_Message>(m => m.Text = "Hello World", true);
    }
  }

  public class ST_Message : ISemanticType
  {
    public string Text { get; set; }

    public ST_Message()
    {
      Console.WriteLine("Message Instantiated.");
    }
  }

  public class Subscriber : IReceptor
  {
    public Subscriber()
    {
      Console.WriteLine("Subscriber Instantiated.");
    }

    public void Process(ISemanticProcessor semProc, IMembrane membrane, ST_Message msg)
    {
      Console.WriteLine(msg.Text);
    }
  }
}

And the modules.xml file (see previous article) looks like this:

XML
<?xml version="1.0" encoding="utf-8" ?>
<Modules>
  <Module AssemblyName='Clifton.SemanticProcessorService.dll'/>
</Modules>;

The output is:

Image 5

Dissecting the Hello World Example

Here's what's going on.

Acquire the Publisher/Subscriber

After the bootstrap, the first thing we do is acquire the Semantic Publisher/Subscriber singleton:

C#
ISemanticProcessor semProc = serviceManager.Get<ISemanticProcessor>();

Register a Subscriber

Subscribers are called "Receptors." Remember that.

C#
semProc.Register<SurfaceMembrane, Subscriber>();

The SurfaceMembrane is a built-in membrane that is used for convenience for messages sent to the subscriber.

Publishing a Message

The final line:

C#
semProc.ProcessInstance<SurfaceMembrane, ST_Message>(m => m.Text = "Hello World", true);

publishes a message. In this example, the message parameters are initialized by an Action<T> that the publisher/subscriber calls after instantiating the message.

Image 6 Here, we supply the optional value true to indicate that the message should be processed on the caller's thread. If we didn't do this, our simple console app would end without actually giving the thread pool time to process the message!

The Message Class

All messages must be derived from ISemanticType:

C#
public class ST_Message : ISemanticType

This is merely a placeholder that provides compile-time type checking of the generic parameter used in registration and publishing:

C#
public interface ISemanticType { }

Image 7 All receivers receive the same message instance, so the message should be treated as immutable.

The Subscriber

All subscribers must derive from IReceptor, (remember that subscribers are also known as "receptors") again used for compile-time type of the generic parameter used in registration:

C#
public interface IReceptor { }

I tend to prefix my semantic types with "ST_" to distinguish them from other types.

As with ISemanticType, the interface IReceptor is actually just a placeholder for compile-time type checking:

C#
public interface IReceptor { }

Image 8

Because the subscriber is (usually) instantiated for every message, any non-static fields that the subscriber uses are specific to the instance that processes the message. This really helps with thread safety when subscriber is performing a complex task requiring internal state management.

Subscribing to Messages

Each message is received in an overloaded Process method:

C#
public void Process(ISemanticProcessor semProc, IMembrane membrane, ST_Message msg)

Note that the publisher/subscriber (the semantic processor) and the membrane on which the message is being sent is provided as well. This gives the responder the necessary information to publish a message on the same membrane, if it so chooses.

The class that implements the Process methods will only receive messages published in the membrane declared when the class was registered. While messages can permeate membranes, we won't be discussing that feature in this article.

Under the Hood

Let's look next how the publisher / subscriber works.

Registering a Subscriber

There are a variety of ways of registering a subscriber, but the two common ways are with or without an initializer:

C#
public void Register<M, T>()
  where M : IMembrane, new()
  where T : IReceptor
{
  Register<T>();
  IMembrane membrane = RegisterMembrane<M>();
  membraneReceptorTypes[membrane].Add(typeof(T));
}

public void Register<M, T>(Action<IReceptor> receptorInitializer)
  where M : IMembrane, new()
  where T : IReceptor
{
  Register<T>();
  Type receptorType = typeof(T);
  IMembrane membrane = RegisterMembrane<M>();
  membraneReceptorTypes[membrane].Add(receptorType);
  receptorInitializers[new MembraneReceptor() 
     { Membrane = membrane, ReceptorType = receptorType }] = 
     new ReceptorInitializer() { Initializer = receptorInitializer };
}

If you provide an initializer, it will be called when the receptor (the subscriber class) is instantiated, where IReceptor is an instance of the class being instantiated. Initializers are specific to the membrane containing the receptor (the subscriber), providing the ability to initialize the same receptor with different parameters depending on the membrane (channel) to which it is associated.

Message Publishing

When a message is published using ProcessInstance, an initializer for the message can be provided, as we saw in the example above:

C#
/// <summary>
/// Process a semantic type, allowing the caller to specify an initializer 
/// before processing the instance.
/// </summary>
public void ProcessInstance<M, T>(Action<T> initializer, bool processOnCallerThread = false)
  where M : IMembrane, new()
  where T : ISemanticType, new()
{
  T inst = new T();
  initializer(inst);
  ProcessInstance<M, T>(inst, processOnCallerThread);
}

public void ProcessInstance<M, T>(bool processOnCallerThread = false)
  where M : IMembrane, new()
  where T : ISemanticType, new()
{
  T inst = new T();
  ProcessInstance<M, T>(inst, processOnCallerThread);
}

Also note the option to invoke the subscriber on the caller thread, which defaults to false.

Message Processing

The core message processor identifies all receptors (subscribers) in the membrane, instantiates them, and either queues the call or processes it immediately:

C#
protected void ProcessInstance<T>
(IMembrane membrane, IMembrane caller, T obj, bool processOnCallerThread)
  where T : ISemanticType
{
  // We get the source object type.
  Type tsource = obj.GetType();

  // Then, for each target type that is interested in this source type, 
  // we construct the target type, then invoke the correct target's Process method.
  // Constructing the target type provides us with some really interesting abilities.
  // The target type can be treated as an immutable object. We can, for instance, execute
  // the Process call on a separate thread. Constructing the target type ensures that the
  // target is stateless -- state must be managed external of any type!

  // Stateless receptors:

  List<Type> receptors = GetReceptors(membrane, tsource);
  Log(membrane, obj);

  foreach (Type ttarget in receptors)
  {
    // We can use dynamic here because we have a <T> generic to resolve the call parameter.
    // If we instead only have the interface ISemanticType, 
    // dynamic does not downcast to the concrete type --
    // therefore it can't locate the call point because it implements the concrete type.
    dynamic target = Activator.CreateInstance(ttarget);

  ReceptorInitializer receptorInitializer;

  if (receptorInitializers.TryGetValue(new MembraneReceptor() 
     { Membrane = membrane, ReceptorType = ttarget }, out receptorInitializer))
  {
    receptorInitializer.Initializer(target);
  }

  // Call immediately?
  if (processOnCallerThread)
  {
    Call(new DynamicCall() { SemanticInstance = obj, Receptor = target, 
                             Proc = () => target.Process(this, membrane, obj) });
  }
  else
  {
    // Pick a thread that has the least work to do.
    threadPool.MinBy(tp => tp.Count).Enqueue(
      new DynamicCall() { SemanticInstance = obj, Receptor = target, 
                          Proc = () => target.Process(this, membrane, obj) });
  }
}

  // Also check stateful receptors
  List<IReceptor> sreceptors = GetStatefulReceptors(membrane, tsource);

  foreach (IReceptor receptor in sreceptors)
  {
    dynamic target = receptor;
    // Call immediately?
    if (processOnCallerThread)
    {
      Call(new DynamicCall() { SemanticInstance = obj, Receptor = target, 
        Proc = () => target.Process(this, membrane, obj), AutoDispose = false });
    }
    else
    {
      threadPool.MinBy(tp => tp.Count).Enqueue(new DynamicCall() 
                      { SemanticInstance = obj, Receptor = target, 
        Proc = () => target.Process(this, membrane, obj), AutoDispose = false });
    }
  }

  ProcessInnerTypes(membrane, caller, obj, processOnCallerThread);
  PermeateOut(membrane, caller, obj, processOnCallerThread);
}

This looks fairly complicated at first glance, but it's really quite straight forward.

  1. First, all the receptors (subscribers) in the specified membrane that handle the message are acquired.
  2. The message is logged, which can be handled by whatever logging service you've implemented.
  3. Each receptor (subscriber class) is instantiated, optionally initialized, and the callback is executed or queued.
  4. Stateful receptors (discussed later) are also invoked.
  5. Two concluding steps are performed
    1. Messages that implement inner semantic types are also published. This is a bizarre but interesting feature that enables the application to subscribe to semantic messages within the body of a wrapper message. For example, an address message may include semantic types such as Zip Code. A subscriber may be interested in processing Zip Code for its own purposes. This mechanism facilitates creating automatic processing of inner semantic types.
    2. As in Membrane Computing, a message can permeate other membranes if so configured. This feature enables the application to bridge membranes (channels / containers) with specific messages, so that additional computations on those messages can be performed by subscribers in other membranes.

Processing Inner Types

Handled by:

C#
protected void ProcessInnerTypes(IMembrane membrane, IMembrane caller, 
                                 ISemanticType obj, bool processOnCallerThread)
{
  var properties = obj.GetType().GetProperties(BindingFlags.Instance | 
                   BindingFlags.Public).Where(
      pi => pi.PropertyType.GetInterfaces().Contains(typeof(ISemanticType)));

  properties.ForEach(pi =>
  {
    ISemanticType prop = (ISemanticType)pi.GetValue(obj);
    prop.IfNotNull((p) => ProcessInstance(membrane, caller, p, processOnCallerThread));
  });
}

A Brief Look at Permeating Membranes

Image 9

From Wikipedia Membrane Computing

In Membrane Computing, data (or in our case, the message) can permeate a membrane both inwards, to inner membranes, and outwards, to the containing membrane. In the illustration above, the "environment" is represented by the SurfaceMembrane type.

Permeating Membranes

Handled by:

C#
protected void PermeateOut<T>(IMembrane membrane, IMembrane caller, 
                              T obj, bool processOnCallerThread)
  where T : ISemanticType
{
  List<IMembrane> pmembranes = ((Membrane)membrane).PermeateTo(obj);
  pmembranes.Where(m=>m != caller).ForEach((m) => ProcessInstance
                  (m, membrane, obj, processOnCallerThread));
}

Infinite recursion is prevented by ignoring the membrane on the original call.

Inbound and Outbound Permeability

A membrane (channel / container) is permeable to both an outer membrane and to any inner membranes.

C#
/// <summary>
/// Given this membrane's outbound list, what membranes are inbound permeabe to the ST as well?
/// </summary>
public List<IMembrane> PermeateTo(ISemanticType st)
{
  List<IMembrane> ret = new List<IMembrane>();
  Type sttype = st.GetType();

  if (outboundPermeableTo.Contains(sttype))
  {
    // Can we traverse to the parent?
    if ((parent != null) && (parent.inboundPermeableTo.Contains(sttype)))
    {
      ret.Add(parent);
    }

    // Can we traverse to children?
    foreach (Membrane child in childMembranes)
    {
      if (child.inboundPermeableTo.Contains(sttype))
      {
        ret.Add(child);
      }
    }
  }

  return ret;
}
Membrane Types

A custom membrane (channel) type never implements anything, it's simply used so that the "channel" can be specified as a generic type. It must however derive from Membrane, for example:

C#
public class LoggerMembrane : Membrane { }

The base class handles the permeability functions.

Setting up the Subscriber Call

In the code illustrated earlier, the call to the subscriber is made either immediately:

C#
Call(new DynamicCall() { SemanticInstance = obj, Receptor = target, 
  Proc = () => target.Process(this, membrane, obj) });

or is queued on a worker thread:

C#
// Pick a thread that has the least work to do.
threadPool.MinBy(tp => tp.Count).Enqueue(
  new DynamicCall() { SemanticInstance = obj, Receptor = target, 
                      Proc = () => target.Process(this, membrane, obj) });

The call is implemented as an Action wrapped by the DynamicCall class:

C#
public class DynamicCall : ProcessCall
{
  public Action Proc { get; set; }

  public DynamicCall()
  {
    AutoDispose = true;
  }

  public override void MakeCall()
  {
    Proc();
  }
}

The target is of dynamic type. We use this type so that the call is routed to the correct overloaded Process method in the subscriber.

Dequeuing Work

Dequeueing the message is handled by a thread in the thread pool:

C#
protected void ProcessPoolItem(object state)
{
  ThreadSemaphore<ProcessCall> ts = (ThreadSemaphore<ProcessCall>)state;

  while (true)
  {
    ts.WaitOne();
    ProcessCall rc;

    if (ts.TryDequeue(out rc))
    {
      Call(rc);
    }
  }
}

Note that we are not using .NET's thread pooling or Task mechanism as these introduce delays in processing the work and are meant for short lived processes. Because the application's subscriber might be a long running process, the thread pool is actual a collection of Thread instances:

C#
/// <summary>
/// Setup thread pool to for calling receptors to process semantic types.
/// Why do we use our own thread pool? Because .NET's implementation (and
/// particularly Task) is crippled and non-functional for long running threads.
/// </summary>
protected void InitializePoolThreads()
{
  for (int i = 0; i < MAX_WORKER_THREADS; i++)
  {
    Thread thread = new Thread(new ParameterizedThreadStart(ProcessPoolItem));
    thread.IsBackground = true;
    ThreadSemaphore<ProcessCall> ts = new ThreadSemaphore<ProcessCall>();
    threadPool.Add(ts);
    thread.Start(ts);
  }
}

Making the Call

The call itself is wrapped in an exception handler:

C#
protected void Call(ProcessCall rc)
{
  try
  {
    rc.MakeCall();
  }
  catch (Exception ex)
  {
    Exception ex2 = ex;
    // Prevent recursion if the exception process itself throws an exception.
    if (!(rc.SemanticInstance is ST_Exception))
    {
      ProcessInstance(Logger, new ST_Exception(ex), true);
    }

    while (ex2.InnerException != null)
    {
      ex2 = ex2.InnerException;
      // Prevent recursion if the exception process itself throws an exception.
      if (!(rc.SemanticInstance is ST_Exception))
      {
        ProcessInstance(Logger, new ST_Exception(ex2), true);
      }
    }
  }
  finally
  {
    if ( (rc.Receptor is IDisposable) && (rc.AutoDispose) )
    {
      ((IDisposable)rc.Receptor).Dispose();
    }
  }
}

Exceptions (including inner exceptions) are published on the Logger membrane which the Semantic Processor creates for us.

C#
public IMembrane Logger { get; protected set; }
...
Logger = RegisterMembrane<LoggerMembrane>();

Note that in the finally block, a receptor (subscriber) has its Dispose method called if it implements IDisposable and is a stateless subscriber (one that was instantiated by the Semantic Processor (the Publisher/Subscriber.)

Stateful Subscribers

A stateful subscriber can be useful when, not to be redundant, the overall state of the subscriber needs to be maintained. Some use cases include a subscriber that manages a connection, in which we want to keep the connection open, rather than opening and closing it every time a message involving the connection is processed.

An example of a stateful receptor initialization looks like this:

C#
semProc.Register<SurfaceMembrane>(new StatefulSubscriber());

Note that the subscriber type generic parameter is omitted and instead an instance of the subscriber is passed into the Register method.

A simple implementation of stateful subscriber looks like this:

C#
public class StatefulSubscriber : IReceptor
{
  protected int counter = 0;

  public void Process(ISemanticProcessor semProc, IMembrane membrane, ST_Message2 msg)
  {
    Console.WriteLine(counter + ": " + msg.Text);
    ++counter;
  }
}

Note that it is processing messages of type ST_Message2, which is simply, for demo purposes, a way of keeping the two examples separate. Publishing messages is exactly the same:

C#
semProc.ProcessInstance<SurfaceMembrane, ST_Message2>(m => m.Text = "Hello World", true);
semProc.ProcessInstance<SurfaceMembrane, ST_Message2>(m => m.Text = "Goodbye World", true);

Here, we publish two messages, with the result being:

Image 10

We observe that the subscriber instance is preserved.

The Internal Magic

The publisher/subscriber extracts the messages that a stateful subscriber handles through reflection:

C#
/// <summary>
/// Register a stateful receptor contained within the specified membrane.
/// </summary>
public void Register(IMembrane membrane, IReceptor receptor)
{
  statefulReceptors.Add(receptor);
  Type ttarget = receptor.GetType();

  MethodInfo[] methods = ttarget.GetMethods();

  foreach (MethodInfo method in methods)
  {
    // TODO: Use attribute, not specific function name.
    if (method.Name == "Process")
    {
      ParameterInfo[] parameters = method.GetParameters();
      InstanceNotify(receptor, parameters[2].ParameterType);
    }
  }

  membranes[membrane.GetType()] = membrane;
  membraneReceptorInstances[membrane].Add(receptor);
}

This inspects each Process method in the class and assumes that these methods will have the expected signature (this code could be improved). The message types are extracted so that later it can be determined whether the stateful subscriber handles the published message.

Performance

From the above code, you've probably noticed that in the typical scenario, a subscriber is being instantiated (and optionally disposed) for every message, and an optional initializer is being executed. Also, there is the use of the dynamic subscriber instance hides internal reflection. There are also nested levels of calls and the overhead of creating the call, either to be executed immediately or queued on a worker thread. A complex configuration, involving membrane permeation and inner message publishing, adds to the performance overhead.

Conversely, the implementation of this publisher/subscriber is very flexible, thread safe, and the application is resilient to exceptions that the subscriber may throw. This is always the tradeoff when implementing a general purpose component -- useful functionality at the cost of raw performance. Usually, the messages that the publisher/subscriber processes come from low bandwidth events, whether they are user inputs from hardware or even page or web service requests. If you really need a high performance, there are simpler publisher/subscriber implementations, or you may not even want to even use this pattern.

Putting It All Together - An Example

Let's write a web server using the Module Manager, Service Manager, and Publisher Subscriber!

First, A Logger Service

We start with a couple modules defined in the modules.xml file:

C#
<?xml version="1.0" encoding="utf-8" ?>
<Modules>
  <Module AssemblyName='Clifton.SemanticProcessorService.dll'/>
  <Module AssemblyName='ConsoleLoggerService.dll'/>
</Modules>

Being able to log things (especially exceptions!) is really the first thing any development process should start with. The logger service should be able to handle both calls made to it as a service as well as logging exceptions messages published by the Publisher / Subscriber. Here's the logger service we'll use:

C#
using System;

using Clifton.Core.ModuleManagement;
using Clifton.Core.Semantics;
using Clifton.Core.ServiceManagement;

using ServiceInterfaces;

namespace ConsoleLoggerService
{
  public class LoggerModule : IModule
  {
    public void InitializeServices(IServiceManager serviceManager)
    {
      serviceManager.RegisterSingleton<IConsoleLoggerService, LoggerService>();
    }
  }

  public class LoggerService : ServiceBase, IConsoleLoggerService, IReceptor
  {
    public override void FinishedInitialization()
    {
      ISemanticProcessor semProc = ServiceManager.Get<ISemanticProcessor>();
      semProc.Register<LoggerMembrane, GenericTypeLogger>();
      semProc.Register<LoggerMembrane, LoggerService>();
    }

    public void Log(string msg)
    {
      Console.WriteLine(msg);
    }

    public void Process(ISemanticProcessor semProc, IMembrane membrane, ST_Exception msg)
    {
      Log(msg.Exception.Message);
      Log(msg.Exception.StackTrace);
    }

    public void Process(ISemanticProcessor semProc, IMembrane membrane, ST_Log msg)
    {
      Log(msg.Message);
    }
  }

  public class GenericTypeLogger : IReceptor
  {  
    public void Process(ISemanticProcessor semProc, IMembrane membrane, ISemanticType t)
    {
      if ( (!(t is ST_Log)) && (!(t is ST_Exception)) )
      {
        Console.WriteLine("Publishing type: " + t.GetType().Name);
      }
    }
  }
}

There are three things that are interesting about this logger:

  1. It is a service, so we can treat it as such.
  2. However, the service also implements IReceptor, enabling it to process both ST_Log messages that the application posts as well as ST_Exception messages issued by the Publisher/Subscriber on its internal Logger "channel".
  3. A generic type logger is instantiated as a separate receptor, which always logs (courtesy of the Publisher/Subscriber's ability to issue messages for base types/interface as well) the message type. We ignore log and exception message types, as logging the type, then the actual log or exception message, seems silly.

Image 11 The above is an interesting implementation because a singleton service is registered, but the Publisher/Subscriber creates an instance for each log message!

A test application shows this all working:

C#
static partial class Program
{
  static void Main(string[] args)
  {
    InitializeBootstrap();
    Bootstrap((e) => Console.WriteLine(e.Message));

    TestLogging();
    Console.WriteLine("Press ENTER to exit the server.");
    Console.ReadLine();
  }

  static void TestLogging()
  {
    serviceManager.Get<IConsoleLoggerService>().Log("Foobar");
    serviceManager.Get<ISemanticProcessor>().ProcessInstance<LoggerMembrane, 
                   ST_Log>(l => l.Message = "Hi there!", true);
    serviceManager.Get<ISemanticProcessor>().Register<SurfaceMembrane, ExceptionProcess>();
    serviceManager.Get<ISemanticProcessor>().
                   ProcessInstance<SurfaceMembrane, ST_TestException>();
  }
}

public class ST_TestException : ISemanticType { }

public class ExceptionProcess : IReceptor
{
  public void Process(ISemanticProcessor semProc, IMembrane membrane, ST_TestException msg)
  {
    throw new ApplicationException("I Broke!");
  }
}

And here's the output:

Image 12

The output illustrates:

  1. using the logger as a service
  2. publishing a log message
  3. logging generic type messages
  4. the Publisher/Subscriber handling an exception and sending it to our logger

So now, we have the logger module done.

Next, a Web Server

I prefer using my own technology rather than IIS, ASP.NET, Razor, MVC, whatever. So we'll write a simple web server, built from several services.

An HTTP Listener Service

Here's an implementation of a very simple HTTP listener. It receives requests and posts the request as a message to the Publisher/Subscriber:

C#
using System.IO;
using System.Net;
using System.Threading.Tasks;

using Clifton.Core.ExtensionMethods;
using Clifton.Core.ModuleManagement;
using Clifton.Core.Semantics;
using Clifton.Core.ServiceManagement;

using Semantics;
using ServiceInterfaces;

namespace WebServerService
{
  public class WebServerModule : IModule
  {
    public void InitializeServices(IServiceManager serviceManager)
    {
      serviceManager.RegisterSingleton<IWebServerService, WebServer>();
    }
  }

  public class WebServer : ServiceBase, IWebServerService
  {
    protected HttpListener listener;
    protected ILoggerService logger;
    protected ISemanticProcessor semProc;
    protected bool httpOnly;

    public virtual void Start(string ip, int port)
    {
      logger = ServiceManager.Get<ILoggerService>();
      semProc = ServiceManager.Get<ISemanticProcessor>();
      listener = new HttpListener();
      string url = IpWithPort(ip, port);
      logger.Log("Listening on " + ip + ":" + port);
      listener.Prefixes.Add(url);
    }

    listener.Start();
    // Yes, this is a long running task. One of them isn't a problem.
    Task.Run(() => WaitForConnection(listener));
  }

    protected virtual void WaitForConnection(object objListener)
    {
      HttpListener listener = (HttpListener)objListener;

      while (true)
      {
        // Wait for a connection. Return to caller while we wait.
        HttpListenerContext context = listener.GetContext();
        string verb = context.Request.HttpMethod;
        string path = context.Request.RawUrl.LeftOf("?").RightOf("/");
        string parms = context.Request.RawUrl.RightOf("?");
        logger.Log(verb + ": " + path);

        string data = new StreamReader(context.Request.InputStream, 
                                       context.Request.ContentEncoding).ReadToEnd();
        ServiceManager.Get<ISemanticProcessor>().ProcessInstance<WebServerMembrane, 
                           ST_HttpRequest>(r =>
        {
	  r.Context = context;
          r.Verb = verb;
          r.Path = path;
          r.Parameters = parms;
          r.Data = data;
        });
      }
    }

    /// <summary>
    /// Returns the url appended with a / for port 80, otherwise, 
    /// the [url]:[port]/ if the port is not 80.
    /// </summary>
    protected string IpWithPort(string ip, int port)
    {
      string ret;

      if (port == 80)
      {
        ret = "http://" + ip + "/";
      }
      else
      {
        ret = "http://" + ip + ":" + port.ToString() + "/";
      }

      return ret;
    }
  }
}

The message gets handled on a separate thread, letting the listener loop return immediately to the activity of waiting for another connection.

We add this module to our modules.xml file:

C#
<?xml version="1.0" encoding="utf-8" ?>
<Modules>
  <Module AssemblyName='Clifton.SemanticProcessorService.dll'/>
  <Module AssemblyName='ConsoleLoggerService.dll'/>
  <Module AssemblyName='WebServerService.dll'/>
</Modules>

And we can test the server by adding a single line to our application:

C#
static void Main(string[] args)
{
  InitializeBootstrap();
  Bootstrap((e) => Console.WriteLine(e.Message));

  serviceManager.Get<IWebServerService>().Start("127.0.0.1", 80);

  Console.WriteLine("Press ENTER to exit the server.");
  Console.ReadLine();
}

Even though there is no subscriber, our logger will tell us that it has received the web request message:

Image 13

Of course, the browser waits patiently for a response, which we're not giving it!

A Semantic Router Service

Let's add a semantic router. This will have the responsibility of mapping the request verb and path to a semantic type, rather than a method that handles the path. The semantic type is dynamically instantiated and either populated with query values on the URL itself or JSON data in the data stream. I'll put together a rather simple one:

C#
using System;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Reflection;

using Newtonsoft.Json;

using Clifton.Core.Utils;
using Clifton.Core.ModuleManagement;
using Clifton.Core.Semantics;
using Clifton.Core.ServiceManagement;

using Semantics;
using ServiceInterfaces;

namespace SemanticWebRouterService
{
  // Struct, so it's key-able.
  public struct Route
  {
    public string Verb { get; set; }
    public string Path { get; set; }
  }

  public class SemanticWebRouterModule : IModule
  {
    public void InitializeServices(IServiceManager serviceManager)
    {
      serviceManager.RegisterSingleton<ISemanticWebRouterService, WebRouterService>();
    }
  }

  public class WebRouterService : ServiceBase, ISemanticWebRouterService
  {
    protected Dictionary<Route, Type> semanticRoutes;

    public WebRouterService()
    {
      semanticRoutes = new Dictionary<Route, Type>();
    }

    public override void FinishedInitialization()
    {
      base.FinishedInitialization2();
      ServiceManager.Get<ISemanticProcessor>().Register<WebServerMembrane, RouteProcessor>();
    }

    public void Register<T>(string verb, string path) where T : ISemanticRoute
    {
      semanticRoutes[new Route() { Verb = verb.ToLower(), Path = path.ToLower() }] = typeof(T);
    }

    public void RouteRequest(ST_HttpRequest req)
    {
      Route route = new Route() { Verb = req.Verb.ToLower(), Path = req.Path.ToLower() };
      Type routeHandler;
      bool found = semanticRoutes.TryGetValue(route, out routeHandler);
      ISemanticRoute semanticRoute = null;

      if (found)
      {
        semanticRoute = InstantiateRouteHandler(routeHandler, req);
        semanticRoute.Context = req.Context;
        ServiceManager.Get<ISemanticProcessor>().
                       ProcessInstance<WebServerMembrane>(semanticRoute);
      }
      else
      {
        ServiceManager.Get<ILoggerService>().Log("Route not found.");
      }
    }

    protected ISemanticRoute InstantiateRouteHandler(Type routeHandler, ST_HttpRequest req)
    {
      ISemanticRoute semanticRoute = (ISemanticRoute)Activator.CreateInstance(routeHandler);

      if (!string.IsNullOrEmpty(req.Data))
      {
        // We assume data will be in JSON format.
        JsonConvert.PopulateObject(req.Data, semanticRoute);
      }
      else if (req.Verb.ToLower() == "get")
      {
        PopulateFromQueryString(req, semanticRoute);
      }

      return semanticRoute;
    }

    protected void PopulateFromQueryString(ST_HttpRequest req, ISemanticRoute semanticRoute)
    {
      NameValueCollection nvc = req.Context.Request.QueryString;

      foreach (string key in nvc.AllKeys)
      {
        PropertyInfo pi = semanticRoute.GetType().GetProperty(key, 
          BindingFlags.Public | BindingFlags.Instance | BindingFlags.IgnoreCase);

        if (pi != null)
        {
          object valOfType = Converter.Convert
          (Uri.UnescapeDataString(nvc[key].Replace('+', ' ')), pi.PropertyType);
          pi.SetValue(semanticRoute, valOfType);
        }
      }
    }
  }

  public class RouteProcessor : IReceptor
  {
    public void Process(ISemanticProcessor semProc, IMembrane membrane, ST_HttpRequest req)
    {
      semProc.ServiceManager.Get<ISemanticWebRouterService>().RouteRequest(req);
    }
  }
}

The pattern here should emerging:

  1. Create a class that implements IModule
  2. Register the service
  3. In this case, the service registers a subscriber to the ST_HttpRequest message
  4. The message subscriber simply calls back into the service for processing

Testing

Let's add it to our modules.xml file:

C#
<?xml version="1.0" encoding="utf-8" ?>
<Modules>
  <Module AssemblyName='Clifton.SemanticProcessorService.dll'/>
  <Module AssemblyName='ConsoleLoggerService.dll'/>
  <Module AssemblyName='WebServerService.dll'/>
  <Module AssemblyName='SemanticWebRouterService.dll'/>
</Modules>

Next, we'll write register a route that logs setting a property so we can test at least the query parameter initialization process. Here's how we register the route:

C#
ISemanticWebRouterService router = serviceManager.Get<ISemanticWebRouterService>();
router.Register<ST_Foobar>("get", "foobar");

Image 14

Note that we're not registering a method to handle the route, we're registering a semantic type that gets instantiated for a particular route.

Here's a simple test type:

C#
public class ST_Foobar : SemanticRoute
{
  public string Test
  {
    get { return Test; }
    set
    {
      Program.serviceManager.Get<ILoggerService>().Log
              ("test parameter set to: " + value.Quote());
    }
  }
}

Now we can try it:

Image 15

Of course, this is just a bare bones example. There is no authentication, authorization, session management, and so forth. And we still aren't responding to the browser request!

A few interesting points:

  • The router thread will exit after finding (or not) the route, and the route handler message is published in a manner in which a new thread will receive the call.
  • Because we're using the publisher/subscriber, the route can be processed my multiple subscribers. Why you'd want to do that, I'm not sure.
  • Because we're associating a route with a semantic type (ok, fancy name for a class), we can deserialize the parameters into that class and, as it is actually a semantic message, it fits perfectly into the publisher/subscriber concept.
  • Ideally, we would implement the route handlers as modules themselves, keeping the application free of doing the route registration.
    • This has the advantage of being able to add new behaviors to our web server by adding modules that define the semantic types for a given route and implementing the subscribers for those messages.

Responders

Lastly, our web server needs a few simple responders, again implemented as a module. Note in this case, the service interface is a placeholder, as there are no public methods that are directly callable:

C#
using System.Text;

using Clifton.Core.ExtensionMethods;
using Clifton.Core.ModuleManagement;
using Clifton.Core.Semantics;
using Clifton.Core.ServiceManagement;

using Semantics;

namespace WebResponseService
{
  // Here, we create a placeholder, because this service is not actually exposed.
  // All activities are handled as a subscriber.
  public interface IWebResponseService : IService { }

  public class WebResponseModule : IModule
  {
    public void InitializeServices(IServiceManager serviceManager)
    {
      serviceManager.RegisterSingleton<IWebResponseService, WebResponseService>();
    }
  }

  public class WebResponseService : ServiceBase, IWebResponseService
  {
    public override void FinishedInitialization()
    {
      base.FinishedInitialization2();
      ServiceManager.Get<ISemanticProcessor>().Register<WebServerMembrane, WebResponder>();
    }
  }

  public class WebResponder : IReceptor
  {
    public void Process(ISemanticProcessor proc, IMembrane membrane, ST_JsonResponse resp)
    {
      resp.Context.Response.StatusCode = resp.StatusCode;
      resp.Context.Response.ContentType = "text/json";
      resp.Context.Response.ContentEncoding = Encoding.UTF8;
      byte[] byteData = resp.Json.to_Utf8();
      resp.Context.Response.ContentLength64 = byteData.Length;
      resp.Context.Response.OutputStream.Write(byteData, 0, byteData.Length);
      resp.Context.Response.Close();
    }

    public void Process(ISemanticProcessor proc, IMembrane membrane, ST_HtmlResponse resp)
    {
      byte[] utf8data = resp.Html.to_Utf8();
      resp.Context.Response.ContentType = "text/html";
      resp.Context.Response.ContentEncoding = Encoding.UTF8;
      resp.Context.Response.ContentLength64 = utf8data.Length;
      resp.Context.Response.OutputStream.Write(utf8data, 0, utf8data.Length);
      resp.Context.Response.Close();
    }

    public void Process(ISemanticProcessor proc, IMembrane membrane, ST_CssResponse resp)
    {
      byte[] utf8data = resp.Css.to_Utf8();
      resp.Context.Response.ContentType = "text/css";
      resp.Context.Response.ContentEncoding = Encoding.UTF8;
      resp.Context.Response.ContentLength64 = utf8data.Length;
      resp.Context.Response.OutputStream.Write(utf8data, 0, utf8data.Length);
      resp.Context.Response.Close();
    }

    public void Process
    (ISemanticProcessor proc, IMembrane membrane, ST_JavascriptResponse resp)
    {
      byte[] utf8data = resp.Javascript.to_Utf8();
      resp.Context.Response.ContentType = "text/javascript";
      resp.Context.Response.ContentEncoding = Encoding.UTF8;
      resp.Context.Response.ContentLength64 = utf8data.Length;
      resp.Context.Response.OutputStream.Write(utf8data, 0, utf8data.Length);
      resp.Context.Response.Close();
    }

    public void Process(ISemanticProcessor proc, IMembrane membrane, ST_RouteNotFound resp)
    {
      resp.Context.Response.StatusCode = 404;	// respond with page not found 404 error code.
      resp.Context.Response.Close();
    }
  }
}

Now let's go back to the semantic router and publish the ST_RouteNotFound message for undefined routes:

C#
ServiceManager.Get<ILoggerService>().Log("Route not found.");
ServiceManager.Get<ISemanticProcessor>().ProcessInstance<WebServerMembrane, 
               ST_RouteNotFound>(r=>r.Context=req.Context);

We add this module to our modules.xml file:

C#
<?xml version="1.0" encoding="utf-8" ?>
<Modules>
  <Module AssemblyName='Clifton.SemanticProcessorService.dll'/>
  <Module AssemblyName='ConsoleLoggerService.dll'/>
  <Module AssemblyName='WebServerService.dll'/>
  <Module AssemblyName='SemanticWebRouterService.dll'/>
  <Module AssemblyName='WebResponseService.dll'/>
</Modules>

We now have our first successful response to the browser!

Image 16

No more spinny!

Responding with Data from Files (HTML, CSS, etc.)

We'll add one more service for routes that respond with data from HTML/CSS files. Who knows, you may want to replace or extend this with data returned from a server instead, so it makes sense to make this a YAM - Yet Another Module. Again, the service interface is merely a placeholder, as there are no exposed service methods. We'll only handle three file types at the moment: HTML, CSS, and JavaScript:

C#
using System;
using System.IO;
using System.Net;

using Clifton.Core.ExtensionMethods;
using Clifton.Core.ModuleManagement;
using Clifton.Core.Semantics;
using Clifton.Core.ServiceManagement;

using Semantics;
using ServiceInterfaces;

namespace WebFileResponseService
{
  // Here, we create a placeholder, because this service is not actually exposed.
  // All activities are handled as a subscriber.
  public interface IFileResponseService : IService { }

  public class FileResponseModule : IModule
  {
    public void InitializeServices(IServiceManager serviceManager)
    {
      serviceManager.RegisterSingleton<FileResponseService, FileResponseService>();
    }
  }

    public class FileResponseService : ServiceBase, IFileResponseService
    {
      public override void FinishedInitialization()
      {
        base.FinishedInitialization2();
        ServiceManager.Get<ISemanticProcessor>().Register<WebServerMembrane, FileResponder>();
      }
    }
 
    public class FileResponder : IReceptor
    {
      public void Process(ISemanticProcessor proc, IMembrane membrane, ST_FileResponse resp)
      {
        ProcessFileRequest(proc, resp.Context);
      }

      protected void ProcessFileRequest(ISemanticProcessor semProc, HttpListenerContext context)
      {
        bool handled = false;
        string path = context.Request.RawUrl.LeftOf("?").RightOf("/").LeftOfRightmostOf('.');
        string ext = context.Request.RawUrl.RightOfRightmostOf('.');

        if (String.IsNullOrEmpty(path))
        {
          path = "index";
        }

        if (String.IsNullOrEmpty(ext))
        {
          ext = "html";
        }

        path = path + "." + ext;
        // Hardcoded folder path for the website!
        path = Path.Combine("Website", path);

        if (File.Exists(path))
        {
          switch (ext)
          {
            case "html":
              semProc.ProcessInstance<WebServerMembrane, ST_HtmlResponse>(r =>
              {
                r.Context = context;
                r.Html = ReadTextFile(path);
              });
              break;

            case "js":
              semProc.ProcessInstance<WebServerMembrane, ST_JavascriptResponse>(r =>
              {
                r.Context = context;
                r.Javascript = ReadTextFile(path);
              });
              break;

            case "css":
              semProc.ProcessInstance<WebServerMembrane, ST_CssResponse>(r =>
              {
                r.Context = context;
                r.Css = ReadTextFile(path);
              });
              break;
          }

        handled = true;
      }

      if (!handled)
      {
        semProc.ServiceManager.Get<ILoggerService>().Log("Route not found.");
        semProc.ProcessInstance<WebServerMembrane, ST_RouteNotFound>(r => r.Context = context);
      }
    }

    protected string ReadTextFile(string fn)
    {
      string text = File.ReadAllText(fn);
  
      return text;
    }

    protected byte[] ReadBinaryFile(string fn)
    {
      FileStream fStream = new FileStream(fn, FileMode.Open, FileAccess.Read);
      BinaryReader br = new BinaryReader(fStream);
      byte[] data = br.ReadBytes((int)fStream.Length);
      br.Close();
      fStream.Close();

      return data;
    }
  }
}

Testing

Again, we had this module to the modules.xml file:

C#
<?xml version="1.0" encoding="utf-8" ?>
<Modules>
  <Module AssemblyName='Clifton.SemanticProcessorService.dll'/>
  <Module AssemblyName='ConsoleLoggerService.dll'/>
  <Module AssemblyName='WebServerService.dll'/>
  <Module AssemblyName='SemanticWebRouterService.dll'/>
  <Module AssemblyName='WebResponseService.dll'/>
  <Module AssemblyName='WebFileResponseService.dll'/>
</Modules>

Now let's make simple change to our application. We'll use map the ST_FileResponse semantic message the route "foobar" so that a specific page will be loaded. Here's the entire program:

C#
static partial class Program
{
  static void Main(string[] args)
  {
    InitializeBootstrap();
    Bootstrap((e) => Console.WriteLine(e.Message));

    serviceManager.Get<IWebServerService>().Start("127.0.0.1", 80);

    ISemanticWebRouterService router = serviceManager.Get<ISemanticWebRouterService>();
    router.Register<ST_FileResponse>("get", "foobar");

    Console.WriteLine("Press ENTER to exit the server.");
    Console.ReadLine();
  }
}

The bolded line:

C#
router.Register<ST_FileResponse>("get", "foobar");

does the mapping of the route.

Now let's add an HTML page to our Website folder, which lives in bin\Debug (yes, I hard-coded the website path in the above code, typically I retrieve it from the app.config file using, you guessed it, the AppConfigService I described in a previous article in this series.

Image 17

The file is simply:

C#
<h1>Foobar!</h1>

And here's the result:

Image 18

Conclusion

OK, that's enough for now. I had originally intended to use all the tech for writing a simple game, but I think demonstrating the Publisher/Subscriber with just the web server components is enough for this article! In the demo code, you'll notice the web server demo is actually in the project HuntTheWumpus. That was the game I was going to use to demonstrate the Publisher/Subscriber, and that will probably be the next article anyways.

To review what has been achieved:

  • Further decoupling of services. Except for core services, like the logger, and direct access to the router service for registering routes, the intercommunication is handled entirely by the Publisher/Subscriber. In fact, several of the services don't even implement service methods, the interface is simply placeholders.
  • Messages are processed on independent threads, which is perfect for this kind of application.
  • Subscribers are instantiated for each message, a desirable feature where thread safety is paramount.
  • The Publisher/Subscriber implements logging and exception handling.

So, these four articles demonstrates the four components that are the foundation stones to The Clifton Method. Whether I'm writing a web or client app, I almost always start with these components and the growing library of services that I can just plug into an application. You can peruse the whole kit and kaboodle on GitHub. There are minor variances to the implementation there, vs. what's presented here. Through the process of writing these four articles, I've updated the code in the GitHub repo to reflect bug fixes, and some of the code for minor improvements.

History

  • 25th August, 2016: Initial version

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Architect Interacx
United States United States
Blog: https://marcclifton.wordpress.com/
Home Page: http://www.marcclifton.com
Research: http://www.higherorderprogramming.com/
GitHub: https://github.com/cliftonm

All my life I have been passionate about architecture / software design, as this is the cornerstone to a maintainable and extensible application. As such, I have enjoyed exploring some crazy ideas and discovering that they are not so crazy after all. I also love writing about my ideas and seeing the community response. As a consultant, I've enjoyed working in a wide range of industries such as aerospace, boatyard management, remote sensing, emergency services / data management, and casino operations. I've done a variety of pro-bono work non-profit organizations related to nature conservancy, drug recovery and women's health.

Comments and Discussions

 
-- There are no messages in this forum --