Click here to Skip to main content
15,881,882 members
Articles / Programming Languages / C#
Article

Using Reactive Extensions - cold observables

Rate me:
Please Sign up or sign in to vote.
5.00/5 (7 votes)
27 Mar 2016CPOL8 min read 20.7K   142   18   4
Create cold observables, intercept observables and split observable into async tasks.

Introduction

This is a series of articles that describe how to implement Reactive Extensions in applications, the previous article dealt with hot observables:

In this article, I will focus my attention on cold observables, observables that you created and started in your program by subscribing to the event they produced. They did not run prior to your involvement, like the hookup of MouseMove, KeyDown etc. This might seem like a needless distinction at first, but let me assure you that it's not, and I'll show you why.

The hot observables were already started on a given thread like for instance the mouse-related events, they were already running on the UI thread. Since it is a hot observable, you had no say where the event should be created or when it would start, all you could do was to decide whether or not you'd like to subscribe to it.

As in the previous article, the code here is run in a WPF application, using the NuGet packages rx-main and rx-xaml. These two are both released from Microsoft.

 

The UI thread

As you might already know the UI thread is the only thread that can update any of the controls in either a WinForm or WPF application. This makes it desirable to take things off the UI thread as soon as possible to ensure that the UI doesn't become unresponsive.

You have no say in this, as there is only one thread responsible for updating the UI, and if you try and implement changes on a WPF or WinForm control directly from a different thread, regardless of the priority of it, it will throw an error saying that you attempted to change a control from a different thread than the designated UI thread. 

The problem was typically solved by starting a background worker (or background thread) for each of the time-consuming tasks that you wanted to do with the button click event. You would then update the UI thread with callback events from the BackgrondWorker. If you instead use a normal thread, there is a new class called Progress that you could use together with the interface IProgress<T> to make callbacks to the UI thread. 

As you might already have suspected this is quite a lot of work that needs to be done to even make the simplest background tasks work properly, with error handling , cancellation and updating a progress bar etc. Rx offers a unified way to do this in business applications that will ensure that the UI is responsive all of the time, with just a few lines of code.

We will start off by using a simple hooking the MouseMove event to create a hot Observable stream:

C#
var RxMouseMoveEvent = Observable.FromEventPattern<MouseEventArgs>(this, "MouseMove")
    .Select(arg => arg.EventArgs.GetPosition(this));

I'm going to do some heavy calculations based on the mouse position so I'd like to take it off the UI thread as soon as possible. You might have noticed that I put some code inside the Select to get the mouse position relative to a UI element, in this case, the Window. This has to be done on the UI thread as the Window is a UI control. The select statement also uses this to change the stream from a Observable<EventPattern<MouseEventArgs>> to an Observable<Point>

I would now like to take the Observable stream off the UI thread, to a background thread. This is actually an extremely easy task to do with Rx:

RxMouseMoveEvent
    //Take it off the UI thread
    .ObserveOn(Scheduler.Default)

I now want to the calculations to be done when the MouseMove event hasn't fired for 0.5 seconds, i.a the mouse is idle. And then I would like to make a function call that takes some time to complete, and as you might have guessed this function call is most conveniently done inside a Select call. Since I'm going to post the result on a UI control I have chosen to route the stream back to the UI thread, with the ObserveOnDispatcher() call:

C#
RxMouseMoveEvent
    //Take it off the UI thread
    .ObserveOn(Scheduler.Default)
    // Wait for a 0.5 second pause
    .Throttle(new TimeSpan(0, 0, 0, 0, 500))
    //Do calcualtions
    .Select(xy => CalculatePointRelateThing(xy))
    //Change it back to the UI thread
    .ObserveOnDispatcher()
    //Subscribe to the change
    .Subscribe(evt =>
    {
        txtText.Text = evt.ToString();
    });

The evt is simply a single point value from the stream Observable<Point>. Admittedly I could have updated the TextBox directly from the background thread like this:

C#
RxMouseMoveEvent
    //Take it off the UI thread
    .ObserveOn(Scheduler.Default)
    // Wait for a 0.5 second pause
    .Throttle(new TimeSpan(0, 0, 0, 0, 500))
    //Do calcualtions
    .Select(xy => CalculatePointRelateThing(xy))
    .Subscribe(evt =>
    {
        this.txtText.Dispatcher.Invoke(DispatcherPriority.Normal,
            new Action(() => { this.txtText.Text = evt.ToString(); })); ;
    }
    );

This is fine as long as you don't need and interaction with multiple controls concerning error reporting and other stuff done on the UI thread (You could now see why the function that switches the stream back to the UI thread is called ObserveOnDispatcher). In short, the Rx version is a much more readable, shorter and more flexible than any of the old (or new) callback methods.

Cold observables

This was a hot observable example, let's look at how to start and switching threads on cold observables is done. There are 3 main ways of starting a cold Observable stream, creating a single instance value or stream (like Observable.Create), creating several instances within a stream (like Observable.Range) and lastly changing a general IEnumerable object into a stream of Observable objects (like Task<T>.ToObservable), you can see Lee Campbells free online book for more examples. There is a fourth way to, and that is to use the Subject<T> directly, but that is not recommended, mainly due to the consistency of how Observable streams are created and handled. 

Creating a cold observable stream of integers are really simple:

C#
var MyIntStream = Observable.Range(0, 5);

The important thing to notice here though is that the stream isn't actually running with this line of code. It only starts to run when you Subscribe to the stream, i.a. this is how a cold observable is defined. Now suppose I would like this stream to be initiated on a background thread from the very beginning. I could change the thread by using the ObserveOn() function call, but that would mean that the stream starts on the UI thread initially. Cold observables actually have a different trick, to change the initialization thread by using SubscribeOn(Scheduler.Default). However, how the SubscribeOn method work deserves a bit more thorough walkthrough. Remember that the cold observables actually get started by the Subscribe call. If you take the example from a StackOverflow you can get a clearer picture of what happens:

C#
Thread.CurrentThread.Name = "Main";

IScheduler thread1 = new NewThreadScheduler(x => new Thread(x) { Name = "Thread1" });
IScheduler thread2 = new NewThreadScheduler(x => new Thread(x) { Name = "Thread2" });

Observable.Create<int>(o =>
{
    Console.WriteLine("Created on " + Thread.CurrentThread.Name);
    o.OnNext(1);
    o.OnCompleted();
    return Disposable.Create(() => { });
})
.SubscribeOn(thread1)
.ObserveOn(thread2)
.Subscribe(
    x => Console.WriteLine("Observing '" + x + "' on " + Thread.CurrentThread.Name)
    , ex => Console.WriteLine(ex.Message)
    , () => { Console.WriteLine("Completed on " + Thread.CurrentThread.Name); }
    );

This will show you that the Observable is created on the thread that SubscribeOn contains. It is interesting to know that if you remove the ObserveOn, all the things inside the Subscribe function will be on the thread submitted in SubscribeOn. You could also throw in a Select to check that SubscribeOn changes the thread at all points:

C#
.SubscribeOn(thread1)
.Select(x =>
{
    Console.WriteLine("Current thread check: " + Thread.CurrentThread.Name);
    return x;
})

Everything will happen on the same thread. But If you change the Observable to a hot one instead, this will work a little different here:

C#
var KeyDown = Observable.FromEventPattern<KeyEventArgs>(this, "PreviewKeyDown")
.Select(x =>{Console.WriteLine("Current thread check: " + Thread.CurrentThread.Name);return x;})
.SubscribeOn(thread1)
.Select(x =>{Console.WriteLine("Current thread check: " + Thread.CurrentThread.Name);return x;})
.Subscribe(
    x => Console.WriteLine("Observing '" + x + "' on " + Thread.CurrentThread.Name)
    , ex => Console.WriteLine(ex.Message)
    , () => { Console.WriteLine("Completed on " + Thread.CurrentThread.Name); }
    );

SubscribeOn will have no effect here, everything will happen on the Main thread in the above example. In fact, I'm really struggling to see if it has any effect on hot observables, and in any case, I can't really see a point of using them on hot observables, as it is already determined what thread they are on and all you can do is to change the thread. If I'm wrong here, please provide me with a code example and point out my mistake.

Create an intermediate layer

In the previous examples, I have made function calls inside the Select function in order to change or do some calculations on the streamed values. There are, however, a different way of doing this, and that is to create an intermediate layer that subscribes to the incoming stream, and returns a new stream as the result. This technique could be used, as Nicolas Dorier has, to create a weak subscription and other things as well. He is not the only one using it though it is actually used quite a bit in the Rx .NET source code, you can download it here and see for yourself.

C#
public static IObservable<TItem> ObserveWeakly<TItem>(this IObservable<TItem> collection)
{
    return Observable.Create<TItem>(obs =>
    {
        var weakSubscription = new WeakSubscription<TItem>(collection, obs);
        return () =>
        {
            weakSubscription.Dispose();
        };
    });
}

The obs is actually an Observer<T> object, and it is from this object that the OnNext, OnCompleted and OnError are passed to the stream by the Observable.Create method. So we create a class that subscribes to the incoming Observable, and passes the appropriate values onwards to the observer, provided that it is still alive. If the value isn't alive the subscription is terminated.

C#
public class WeakSubscription<T> : IDisposable, IObserver<T>
    {
        private readonly WeakReference reference;
        private readonly IDisposable subscription;
        private bool disposed;

        public WeakSubscription(IObservable<T> observable, IObserver<T> observer)
        {
            this.reference = new WeakReference(observer);
            this.subscription = observable.Subscribe(this);
        }

        void IObserver<T>.OnCompleted()
        {
            var observer = (IObserver<T>)this.reference.Target;
            if(observer != null)
                observer.OnCompleted();
            else
                this.Dispose();
        }

        void IObserver<T>.OnError(Exception error)
        {
            var observer = (IObserver<T>)this.reference.Target;
            if(observer != null)
                observer.OnError(error);
            else
                this.Dispose();
        }

        void IObserver<T>.OnNext(T value)
        {
            var observer = (IObserver<T>)this.reference.Target;
            if(observer != null)
                observer.OnNext(value);
            else
                this.Dispose();
        }

        public void Dispose()
        {
            if(!this.disposed)
            {
                this.disposed = true;
                this.subscription.Dispose();
            }
        }
    }

This method is actually very powerful, as I mentioned previously. It is actually an intermediary subscription of the stream, that you can intercept any stream value and perform complex arithmetic inside a class while still leveraging the full power of Rx.

Parallel and async processes

Now assume that I want to start running some calculations in parallel based on the integer stream input. The easiest way to do this is to use the Task.Run method. You should be aware that the Task.Run is mainly a async process and might not run everything in parallel. However, in my case, the Task.Run is a much better fit as it will adjust its threading pool according to the number of tasks you initiates, basically making them easier to use. To initiate and start them in one operation you could just write:

.Select(x=>Task.Run(()=>DoTaskWork(x)))

But this will cause you some headaches because the return value from this function call Task.Run will actually be Task<T> and worse, it will return the Task<T> as soon as it is initiated. You don't really want that, you generally want the task to return the T value, once it is completed. Luckily there is a simple trick you could use, and that is to call <a href="https://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.selectmany(v=vs.103).aspx">SelectMany</a>. It is used to flatten several streams into a single stream, and that is exactly what I want here, to have the task return a value as soon as it is completed. I would have to sort the values after it is all completed if I want the into the original sequence.

The code is really simple an neat, and most importantly, I don't have to make any complicated callbacks in order to update the UI:

C#
var MyIntStream = Observable.Range(0, 5)
    // Take the Observable off the UI thread
    .SubscribeOn(Scheduler.Default)
    //Create parallel tasks and report results when completed
    .SelectMany(x => Task.Run<FreqValues>(() => DoTaskWork(x), ctr.Token))
    // Switch back to the UI thread
    .ObserveOnDispatcher()
    .Subscribe(args => {
        //Update the progressbar when a thread is completed
        pgbProgress.Value += 1;
        //Tell me what thread is completed
        txtText.Text += Environment.NewLine + args.Frequency.ToString();
    },
    () => {
        //All done, here I could do sorting etc.
        txtText.Text += Environment.NewLine + "All done";
          }
    );

Summary

This concludes the basics of cold observables. Admittedly it would take a lot more articles to explain all the variants of cold and hot observables you can create. Hopefully, this might help you to create your own Rx projects.

References

 

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Chief Technology Officer
Norway Norway
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.

Comments and Discussions

 
GeneralMy vote of 5 Pin
Florian Rappl20-Apr-16 20:48
professionalFlorian Rappl20-Apr-16 20:48 
Good one! This is really well written and contains some nice examples. Have my 5!
GeneralRe: My vote of 5 Pin
Kenneth Haugland22-Apr-16 0:45
mvaKenneth Haugland22-Apr-16 0:45 
PraiseGreat! Pin
EveryNameIsTakenEvenThisOne27-Mar-16 3:46
professionalEveryNameIsTakenEvenThisOne27-Mar-16 3:46 
GeneralRe: Great! Pin
Kenneth Haugland27-Mar-16 4:02
mvaKenneth Haugland27-Mar-16 4:02 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.