Click here to Skip to main content
15,881,882 members
Articles / Programming Languages / C#

A few Implementations of Message Bus/Broker

Rate me:
Please Sign up or sign in to vote.
4.18/5 (5 votes)
4 Sep 2019CPOL3 min read 15K   243   10   5
Revisiting Event Aggregator/Message Bus/Broker in C#.NET

Introduction

When the project starts growing, one of the needs that always becomes more and more important, is a way to communicate between components. An Event Aggregator (or Message Bus/Broker) is used for this purpose.

I have been using .NET Event and Prism's Event Aggregator as shown in my article a few years ago. For delegate EventHandler<TEventArgs>, it's most suitable for small projects (due to tight coupling that the publisher keeps a reference to the subscriber's handler). For Prism's Event Aggregator, while it is great, I would like to implement my own lightweight one that is suitable for my own need.

While there are many different implementations based on different approaches/concerns/complexities, I would like to present a few minimalist implementations, that ignore the threading for now.

Implementations

Suppose we have 3 components: the Publisher, Subscriber1, and Subscriber2. These three are working with the Person objects. The Publisher would publish a message whenever a new person created/modified/deleted, and the two Subscribers would want to subscribe to receive those messages to react accordingly.

We would introduce a 'messaging' service that is injected into the publishers and subscribers for them to use. Things would work like this:

C#
// Then on Publisher side: 
this.messageBus.Publish<PersonCreatedMessage>(new PersonCreatedMessage
{
    // Property setters
});

// Subscribers receive and react:
messageBus.Subscribe<PersonCreatedMessage>(OnPersonCreated);

private void OnPersonCreated(PersonCreatedMessage message)
{
    // Access to message's properties
}

Implementation #1: Messaging via Action<TMessage> Delegate

Not all kinds of messages the subscribers need to receive, and it's better to explicitly specify what kind of messages we want to subscribe. Those messages are IMessage:

C#
public interface IMessage
{
    string Description { get; } // Can remove or add more
}

For each subscriber, we can identify a 'receipt' in various forms, such as GUID, unique increasing integer numbers like IDs in the database, or a different way as below. This is to enable the subscribers to un-subscribe receiving the messages.

C#
public interface ISubscription<T> where T : IMessage
{
    Action<T> ActionHandler { get; }
}

public class Subscription<T> : ISubscription<T> where T : IMessage
{
    public Action<T> ActionHandler { get; private set; }

    public Subscription(Action<T> action)
    {
        ActionHandler = action;
    }
}

Our Message Bus is to provide the following functionalities:

C#
public interface IMessageBus
{
    void Publish<T>(T message) where T : IMessage;

    ISubscription<T> Subscribe<T>(Action<T> actionCallback) where T : IMessage;

    bool UnSubscribe<T>(ISubscription<T> subscription) where T : IMessage;

    void ClearAllSubscriptions();
}

And its implementation is explained as below:

C#
public class MessageBus : IMessageBus
{
    private readonly Dictionary<Type, List<object>> _observers
        = new Dictionary<Type, List<object>>();
}

_observers here has the key of type IMessage, and associate value is the list of subscribers' action handler methods that respond upon receiving the messages.

Subscribe method is implemented as below:

C#
public ISubscription<T> Subscribe<T>(Action<T> callback) where T : IMessage
{
    ISubscription<T> subscription = null;

    Type messageType = typeof(T);
    var subscriptions = _observers.ContainsKey(messageType) ?
        _observers[messageType] : new List<object<();

    if (!subscriptions
        .Select(s => s as ISubscription<T>)
        .Any(s => s.ActionHandler == callback))
    {
        subscription = new Subscription<T>(callback);
        subscriptions.Add(subscription);
    }

    _observers[messageType] = subscriptions;

    return subscription;
}

Then the Publish message:

C#
public void Publish<T>(T message) where T : IMessage
{
    if (message == null) throw new ArgumentNullException(nameof(message));

    Type messageType = typeof(T);
    if (_observers.ContainsKey(messageType))
    {
        var subscriptions = _observers[messageType];
        if (subscriptions == null || subscriptions.Count == 0) return;
        foreach (var handler in subscriptions
            .Select(s => s as ISubscription<T>)
            .Select(s => s.ActionHandler))
        {
            handler?.Invoke(message);
        }
    }
}

The UnSubscribe is implemented as:

C#
public bool UnSubscribe<T>(ISubscription<T> subscription) where T : IMessage
{
    bool removed = false;

    if (subscription == null) return false;

    Type messageType = typeof(T);
    if (_observers.ContainsKey(messageType))
    {
        removed = _observers[messageType].Remove(subscription);

        if (_observers[messageType].Count == 0)
            _observers.Remove(messageType);
    }
    return removed;
}

Finally, we can remove all subscribers by clear the _observers: _observers.Clear();.

How to Use

First define the message types, for example:

C#
public class PersonCreatedMessage : IMessage
{
    public string Description { get; set; }

    public Person Person { get; set; }
}

public class PersonDeletedMessage : IMessage
{
    public string Description { get; set; }
}

Where our model (Person) has a property GivenName.

The subscriber is basically setup as follows:

C#
public class Subscriber1
{
    private MessageBus messageBus;
    private ISubscription<PersonCreatedMessage> personCreatedSubscription;

    public Subscriber1(MessageBus messageBus)
    {
        this.messageBus = messageBus;

        personCreatedSubscription = 
              this.messageBus.Subscribe<PersonCreatedMessage>(OnPersonCreated);
    }

    private void OnPersonCreated(PersonCreatedMessage message)
    {
        // Access message.Description, message.Person.GivenName,... here
    }

    private void Unsubscribe()
    {
        this.messageBus.UnSubscribe(personCreatedSubscription);
    }
}

And the publisher:

C#
public class Publisher
{
    private MessageBus messageBus;

    public Publisher(MessageBus messageBus)
    {
        this.messageBus = messageBus;
    }

    public void CreatePerson()
    {
        this.messageBus.Publish<PersonCreatedMessage>(new PersonCreatedMessage
        {
            Person = new Person { GivenName = "John" },
            Description = "[Demo 1] A new person has been created."
        });
    }
}

We can get started by:

C#
MessageBus messageBus = new MessageBus();

Publisher publisher = new Publisher(messageBus);
Subscriber1 subscriber1 = new Subscriber1(messageBus);
Subscriber2 subscriber2 = new Subscriber2(messageBus);

publisher.CreatePerson();

Implementation #2: Messaging via Weak Reference

When the system becomes so big and the messaging system becomes so complex with many publishers and subscribers, it's inconvenient having to remember to un-subscribe every subscription. Personally, I would want to explicitly do it. However, this is my implementation to use weak reference version of the above action delegate version. The purpose is to have the Subscriber be eligible to be Garbage collected when there is no strong reference to it.

In this implementation, we will wrap the action handler in a WeakSubscription:

C#
public class WeakSubscription<T> where T : IMessage
{
    private WeakReference _weakAction;

    public WeakSubscription(Action<T> action)
    {
        _weakAction = new WeakReference(action);
    }

    public bool IsAlive
    {
        get { return _weakAction.IsAlive; }
    }

    public object Target
    {
        get { return _weakAction.Target; }
    }

    public void OnMessageReceived(T message)
    {
        var action = _weakAction.Target as Action<T>;
        action?.Invoke(message);
    }
}

Since weak reference is used, we won't implement the UnSubscribe method here. So, our MessageBus has Publish and Subscribe as its two main functionalities:

C#
public class MessageBus
{
    private readonly Dictionary<Type, List<object>> _observers
            = new Dictionary<Type, List<object>>();
}
C#
public void Subscribe<T>(Action<T> callback) where T : IMessage
{
    Type messageType = typeof(T);

    var subscriptions = _observers.ContainsKey(messageType) ?
        _observers[messageType] : new List<object>();

    if (!subscriptions
        .Select(s => s as WeakSubscription<T>)
        .Any(s => s.Target == new WeakReference(callback).Target))
        subscriptions.Add(new WeakSubscription<T>(callback));

    _observers[messageType] = subscriptions;

    var deadSubscriptionsRemoved = CleanupSubscriptions<T>();
}
C#
public void Publish<T>(T message) where T : IMessage
{
    if (message == null) throw new ArgumentNullException(nameof(message));

    Type messageType = typeof(T);
    if (_observers.ContainsKey(messageType))
    {
        var subscriptions = _observers[messageType];
        List<WeakSubscription<T>> deadSubscriptions = new List<WeakSubscription<T>>();

        foreach (var subscription in subscriptions
            .Select(s => s as WeakSubscription<T>))
        {
            if (subscription.IsAlive)
                subscription?.OnMessageReceived(message);
            else
                deadSubscriptions.Add(subscription);

            subscriptions.RemoveAll(s => deadSubscriptions.Contains(s));
            if (subscriptions.Count == 0)
                _observers.Remove(messageType);
        }
    }
}

And finally, we can clear all subscriptions like this:

C#
private int CleanupSubscriptions<T>() where T : IMessage
{
    return _observers[typeof(T)].RemoveAll((s) =>
    {
        WeakSubscription<T> subRef = s as WeakSubscription<T>;
        if (subRef != null)
            return !subRef.IsAlive;
        return true;
    });
}

How to Use

The same with the Implementation #1, except that it's simpler! No need to keep track of subscribers to un-subscribe.

Implementation #3: Messaging via Interface

This is a modification of Glenn Block's implementation of Event Aggregator pattern on PluralSight, that instead of using delegate, the system uses interface as a way to respond when receiving the message. Each subscriber can implement as many ISubscribe interfaces of different kinds of messages as a way to subscribe to those messages. The interface only declares an action method for the interested subscribers to respond once the message is received:

C#
public interface ISubscribe<T> where T: IMessage
    {
        void OnMessageReceived(T message);
    }

The MessageBus has been designed to have the following capabilities:

C#
public interface IMessageBus
{
    void Publish<T>(T message) where T : IMessage;

    // Returns number of messages that this subscriber wants to listen to.
    int Subscribe(object subscriber, bool reSubscribable = false);

    // Returns number of messages that this subscriber has topped listening to.
    int UnSubscribe(object subscriber);

    // Return if successfully un-subscribed all subscribers from specified kind of message T.
    bool UnSubscribeTo<T>() where T:IMessage;
}

And its implementation is as follows:

C#
public class MessageBus : IMessageBus 
{
    private readonly Dictionary<Type, 
        List<WeakReference>> _observers = new Dictionary<Type, List<WeakReference>>();

    ...

    private IEnumerable<Type> GetSubscriberTypes(object subscriber)
    {
        // Returns: ISubscribe<T1>, ISubscribe<T2>, etc. 
        // where Ti is message type to be sent/received.
        // In other words, the kinds of message that THIS subscriber is subscribed to.
        // That is the interface(s) that this subscriber class implements.
        // For example: Subscriber: ISubscribe<T1>, ISubscribe<T2>, etc.
        return subscriber
            .GetType()
            .GetInterfaces()
            .Where(i => i.IsGenericType && 
                   i.GetGenericTypeDefinition() == typeof(ISubscribe<>));
    }
}
C#
public int Subscribe(object subscriber, bool reSubscribable = false)
{
    if (subscriber == null) throw new ArgumentNullException(nameof(subscriber));

    WeakReference subscriberRef = new WeakReference(subscriber);
    var subscriberTypes = GetSubscriberTypes(subscriber);

    foreach (var subscriberType in subscriberTypes)
    {
        if (_observers.ContainsKey(subscriberType))
        {
            _observers[subscriberType].RemoveAll(s => !s.IsAlive);

            if (!_observers[subscriberType].Any
               (s => s.Target == subscriberRef.Target) || reSubscribable)
                _observers[subscriberType].Add(subscriberRef);
        }
        else
            _observers.Add(subscriberType, new List<WeakReference> { subscriberRef });
    }

    return subscriberTypes.ToList().Count;
}

Then the Publish method:

C#
public void Publish<T>(T message) where T: IMessage
{
    if (message == null) throw new ArgumentNullException(nameof(message));

    var subscriberType = typeof(ISubscribe<>).MakeGenericType(typeof(T)); // --> ISubscribe<T>

    if (_observers.ContainsKey(subscriberType))
    {
        List<WeakReference> subscriberRefs = _observers[subscriberType];
        List<WeakReference> deadSubscriberRefs=new List<WeakReference>();

        foreach (var subscriberRef in subscriberRefs)
        {
            if (subscriberRef.IsAlive)
            {
                var subscriber = subscriberRef.Target as ISubscribe<T>;
                subscriber?.OnMessageReceived(message);
            }
            else
                deadSubscriberRefs.Add(subscriberRef); // Remove this reference
        }

        subscriberRefs.RemoveAll(s => deadSubscriberRefs.Contains(s));
        if (subscriberRefs.Count == 0)
            _observers.Remove(subscriberType);
    }
}

And we can un-subscribe either a kind of message, or an object subscriber:

C#
public int UnSubscribe(object subscriber)
{
    if (subscriber == null) throw new ArgumentNullException(nameof(subscriber));

    var subscriberRef = new WeakReference(subscriber);
    var subscriberTypes = GetSubscriberTypes(subscriber);
    var emptyKeys = new List<Type>();

    int unSubscribedTypeCount = 0;

    foreach (var subscriberType in subscriberTypes)
    {
        if (_observers.ContainsKey(subscriberType))
        {
            List<WeakReference> subscriberRefs = _observers[subscriberType];
            unSubscribedTypeCount += subscriberRefs.RemoveAll(s => s.Target == subscriber);

            if (subscriberRefs.Count == 0)
                emptyKeys.Add(subscriberType);
        }
    }

    foreach (var key in emptyKeys)
        _observers.Remove(key);

    return unSubscribedTypeCount;
}
C#
public bool UnSubscribeTo<T>() where T: IMessage
{
    var subscriberType = typeof(ISubscribe<>).MakeGenericType(typeof(T)); // --> ISubscribe<T>
    if (_observers.ContainsKey(subscriberType))
        return _observers.Remove(subscriberType);
    else
        throw new KeyNotFoundException(subscriberType.ToString());
}

How to Use

The subscriber needs to subscribe to the kinds of messages they want to receive via the interfaces, that promise the actions to be executed when the messages are received:

C#
public class Subscriber1 : ISubscribe<PersonCreatedMessage>, ISubscribe<PersonDeletedMessage>
{
    private MessageBus messageBus;

    public Subscriber1(MessageBus messageBus)
    {
        this.messageBus = messageBus;

        var subscription = this.messageBus.Subscribe(this);
    }

    public void OnMessageReceived(PersonCreatedMessage message)
    {
        // Access message.Description, message.Person.GivenName,... here
    }

    public void OnMessageReceived(PersonDeletedMessage message)
    {
        // Access message properties here
    }

    public int UnSubscribe()
    {
        return this.messageBus.UnSubscribe(this);
    }

    ...
    // Un-subscribe to a certain kind of message as followed:
    messageBus.UnSubscribeTo<PersonCreatedMessage<Person>>();
}

The publisher is very similar to the Implementation #1 and #2:

C#
public class Publisher
{
    private MessageBus messageBus;

    public Publisher(MessageBus messageBus)
    {
        this.messageBus = messageBus;
    }

    public void CreatePerson()
    {
        this.messageBus.Publish<PersonCreatedMessage>(new PersonCreatedMessage
        {
            Person = new Person { GivenName = "Mike" },
            Description = "[Demo 3] A new person has been created."
        });
    }

    public void DeletePerson()
    {
        this.messageBus.Publish<PersonDeletedMessage>(new PersonDeletedMessage
        {
            Description = "Person has been deleted."
        });
    }
}

This is to start:

C#
MessageBus messageBus = new MessageBus();

Publisher publisher = new Publisher(messageBus);
Subscriber1 subscriber1 = new Subscriber1(messageBus);
Subscriber2 subscriber2 = new Subscriber2(messageBus);

publisher.CreatePerson();
publisher.DeletePerson();

History

  • 16th September, 2019: Applied IMessage for all applicable message types
  • 4th September, 2019: Initial version

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer
United States United States
while (live) {
try {
learn();
code();
food();
...
} catch (Exception ex) {
recover();
}
}

Comments and Discussions

 
QuestionWeakReference and Action Pin
serpini6-Feb-20 6:46
serpini6-Feb-20 6:46 
AnswerRe: WeakReference and Action Pin
Lộc Nguyễn11-Feb-20 11:55
professionalLộc Nguyễn11-Feb-20 11:55 
QuestionHow about this RX version Pin
Sacha Barber4-Sep-19 9:27
Sacha Barber4-Sep-19 9:27 
AnswerRe: How about this RX version Pin
Lộc Nguyễn4-Sep-19 9:55
professionalLộc Nguyễn4-Sep-19 9:55 
It looks clean and elegant! I haven't tried it before. Is it 'Reactive Extensions' (Rx) for .NET?

modified 4-Sep-19 17:42pm.

GeneralRe: How about this RX version Pin
Sacha Barber4-Sep-19 22:06
Sacha Barber4-Sep-19 22:06 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.