Click here to Skip to main content
15,886,873 members
Articles / Programming Languages / C#

Implementing a finite-state machine using async/await

Rate me:
Please Sign up or sign in to vote.
4.97/5 (19 votes)
8 May 2013CPOL5 min read 50.5K   830   76   8
A new way to implement finite-state machine using async/await

Introduction     

The goal of the article is to show how to utilize async/await feature which comes with .NET 4.5, to easily implement finite-state machine pattern. 

Background    

According to Wikipedia:   

Finite-state machine is conceived as an abstract machine that can be in one of a finite number of states. The machine is in only one state at a time. 

The pattern could be very useful if you are dealing, for example, with complex interactions between different parts of a system.

The most obvious example of FSM usage is a client-server collaboration, so I’ll stick to this example in the article.

The problem   

Let’s say we have to implement a master-worker computation system. User schedules a job to the master, the master splits the job into tasks, then sends them to a group of workers for execution. Each worker executes the task and sends a result back to the master.  

 

Provided that both the master and the worker are finite-state machines, we could define their states.  

Master’s states:  

  • Waiting for a job   
  • Planning a job (splitting a job into multiple tasks)
  • Scheduling tasks (sending a portion of tasks to a selected worker)
  • Waiting for results 

Worker’s states: 

  • Waiting for a task 
  • Executing a task
  • Sending a result

So the workflow for the master would be: 

Worker’s states and transitions are too trivial, so I omit them. 

The diagram clearly describes what’s going on with the master. 

Let’s see how we’d implement that in C#. 

Typical implementation   

A finite-state machine consists of states and transitions. Each transition is activated when a certain event occurs (e.g., the worker is connected or task result is received) and the current state is valid (e.g., a number of tasks being executed is equal to 0). 

Thus, the goal is to write the described workflow (the diagram) in C#. 

For the sake of simplicity, let us assume that we have two classes: Master and Worker

Worker class isn't that interesting at the moment, so I omit its implementation details and focus at Master

The code must be thread safe due to event-driven design (each event handler could be called from different threads). We could use either locks or rely on thread safe collections/primitives or use a synchronization context. I prefer the third option because it keeps a code clean and simple. 

C#
class Master {
    public SynchronizationContext SynchronizationContext { get; set; }
} 

All event handlers must be invoked in the specified synchronization context, hence we no longer care about race conditions. 

In a real world application, it is a good practice to create facade-like event handlers to ensure that the handler was invoked in the current synchronization context: 

C#
public class OnEventOccured(...) {
    this.SynchronizationContext.Post(o => this.OnEventOccuredInternal(...), null);
}    

Besides, we would like to have the ability to test some key behaviors of the master (e.g., it should wait for worker to get connected or it should schedule tasks among all connected workers). 

Master class has an internal state and public event handlers. 

First, we explicitly define our states:  

C#
public enum MasterState {
    WaitingForJob,
    PlanningJob,
    SchedulingTasks,
    WaitingForResults
} 

Then we define event handlers. Each event handler consists of: 

  1. Validation logic (is it OK that the event handler was invoked with the current Master state?) 
  2. Transition validation (can it change the current state?)  
  3. State change (transition from a state to another)  
C#
public void OnPlanningJob() {
    if (this.State != MasterState.WaitingForJob)
        throw new InvalidOperationException("Invalid transition");
 
    this.State = MasterState.PlanningJob;
 
    this.Tasks = this.CurrentJob.Lines.Select(line => new JobTask {Text = line}).ToList();
 
    this.OnSchedulingTasks();
}
 
private void OnSchedulingTasks() {
    if (this.State != MasterState.PlanningJob)
        throw new InvalidOperationException("Invalid transition");
 
    this.State = MasterState.SchedulingTasks;
 
    this.ScheduleTasks();
}
 
private void OnWaitingForResults() {
    if (this.State != MasterState.SchedulingTasks)
        throw new InvalidOperationException("Invalid transition");
 
    this.State = MasterState.WaitingForResults;
}
 
public void OnTaskResultReceived(TaskResult result) {
    if (this.State != MasterState.WaitingForResults)
        throw new InvalidOperationException("Invalid state");
 
    this.Results.Add(result);
    this.Tasks.RemoveAll(task => task.Id == result.TaskId);
 
    if (this.Tasks.Count > 0)
        return;
 
    this.CurrentJob.Result = this.Results.Sum(x => x.Data);
    this.JobFinished(this.CurrentJob);
    this.OnWaitingForJob();
}
 
private void OnWaitingForJob() {
    this.Results.Clear();
 
    this.State = MasterState.WaitingForJob;
 
    if (this.JobQueue.Any()) {
        this.CurrentJob = this.JobQueue.Dequeue();
        this.OnPlanningJob();
    }
}
 
public void OnWorkerConnected(Guid workerId) {
    this.Workers.Add(workerId);
 
    if (this.State == MasterState.SchedulingTasks)
        this.ScheduleTasks();
}
 
public void OnWorkerDisconnected(Guid workerId) {
    this.Workers.Remove(workerId);
}  

The code works as expected but there is a problem: it is hard to trace the workflow among these event handlers.

The workflow is distributed throughout the class hence it is hard to make changes in the code. To demonstrate the statement, let's add timeout to the "waiting for worker" operation. 

First, we add a new state called WaitingForWorker

C#
internal enum MasterState {
    WaitingForJob,
    PlanningJob,
    WaitingForWorker,
    SchedulingTasks,
    WaitingForResults
} 

Then we add a timer: 

C#
Timer workerTimeoutTimer = 
   new Timer(this.CheckWorkerTimeout, null, Timeout.Infinite, Timeout.Infinite); 

A new transition for the state is: 

C#
public void OnWaitingForWorker() {
    if (this.State != MasterState.PlanningJob)
        throw new InvalidOperationException("Invalid transition");
 
    this.State = MasterState.WaitingForWorker;
 
    if (this.Workers.Count == 0)
        this.workerTimeoutTimer.Change(this.WorkerTimeout, Timeout.InfiniteTimeSpan);
    else
        this.OnSchedulingTasks();
}

Callback for the timer is: 

C#
private void CheckWorkerTimeout(object parameter) {
    this.SynchronizationContext.Post(o => {
        if (this.State != MasterState.WaitingForWorker)
            return;
 
        this.WorkerTimedout();
        this.OnWaitingForJob();
    }, null);
} 

Then we modify other event handlers to alter the workflow: 

C#
public void OnPlanningJob() {
    if (this.State != MasterState.WaitingForJob)
        throw new InvalidOperationException("Invalid transition");
 
    this.State = MasterState.PlanningJob;
 
    this.Tasks = this.CurrentJob.Lines.Select(line => new JobTask {Text = line}).ToList();
 
    this.OnWaitingForWorker();
}
 
public void OnSchedulingTasks() {
    if (this.State != MasterState.WaitingForWorker)
        throw new InvalidOperationException("Invalid transition");
 
    this.State = MasterState.SchedulingTasks;
 
    this.ScheduleTasks();
} 

We had to modify two methods and add an extra one to alter the workflow. Each new requirement would cause increasing of code complexity. Eventually the code become hard to maintain and modify. 

Let us consider an alternative way to implement a finite-state machine. 

Async/await implementation    

In .NET 4.5 (and C# 5.0) Microsoft introduced a new way to handle asynchronous method chains using async and await keywords.

Basically, when you create an asynchronous method, the compiler transforms it to a finite-state machine similar to one created by the yield keyword. For further details, please check out this article

Let’s take advantage of this shiny new feature and rewrite our previous code: 

C#
private async void ExecuteJobs() {
    SynchronizationContext.SetSynchronizationContext(this.SynchronizationContext);
 
    var cancellationToken = this.cancellationTokenSource.Token;
 
    while (cancellationToken.IsCancellationRequested == false) {
        var job = await this.GetJobAsync(cancellationToken);
        var tasks = job.Lines.Select(line => new JobTask {Text = line}).ToList();
        var results = new List<TaskResult>();
        var workers = await this.GetWorkersAsync(cancellationToken);
 
        var i = 0;
 
        foreach (var jobTask in tasks)
            this.ScheduleTask(jobTask, workers[i++%workers.Count]);
 
        while (cancellationToken.IsCancellationRequested == false) {
            var result = await this.ReceiveTaskResultAsync(cancellationToken);
 
            tasks.RemoveAll(task => task.Id == result.TaskId);
            results.Add(result);
 
            if (tasks.Count == 0)
                break;
        }
 
        job.Result = results.Sum(x => x.Data);
 
        this.JobFinished(job);
    }
} 

That's it. The whole workflow fits into one simple method! All the other methods are completely utilitarian: 

C#
private static Task<TResult> WaitFor<TResult>(
    Action<Action<TResult>> subscribe,
    Action<Action<TResult>> unsubscribe,
    CancellationToken cancellationToken) {
    var source = new TaskCompletionSource<TResult>(cancellationToken);
 
    cancellationToken.Register(OnCancelled<TResult>, source);
 
    Action<TResult> handler = null;
 
    handler = result => {
        unsubscribe(handler);
        source.TrySetResult(result);
    };
 
    subscribe(handler);
    return source.Task;
}
 
private Task<Job> ReceiveJobAsync(CancellationToken cancellationToken) {
    return WaitFor<Job>(
        handler => this.JobReceived += handler,
        handler => this.JobReceived -= handler,
        cancellationToken);
}
 
private Task<TaskResult> ReceiveTaskResultAsync(CancellationToken cancellationToken) {
    return WaitFor<TaskResult>(
        hanlder => this.TaskResultReceived += hanlder,
        handler => this.TaskResultReceived -= handler,
        cancellationToken);
}
 
private async Task<IReadOnlyList<Guid>> GetWorkersAsync(CancellationToken cancellationToken) {
    if (this.connectedWorkers.Count == 0)
        return new[] {await this.WaitForWorkerToConnect(cancellationToken)};
 
    return this.connectedWorkers;
}
 
private Task<Guid> WaitForWorkerToConnect(CancellationToken cancellationToken) {
    return WaitFor<Guid>(
        hanlder => this.WorkerConnected += hanlder,
        handler => this.WorkerConnected -= handler,
        cancellationToken);
}
 
private static void OnCancelled<TResult>(object parameter) {
    var source = (TaskCompletionSource<TResult>) parameter;
 
    source.TrySetCanceled();
}
 
private async Task<Job> GetJobAsync(CancellationToken cancellationToken) {
    while (this.jobQueue.Count == 0)
        await this.ReceiveJobAsync(cancellationToken);
 
    return this.jobQueue.Dequeue();
}
 
public void OnJobReceived(Job job) {
    this.jobQueue.Enqueue(job);
    this.JobReceived(job);
}
 
public void OnTaskResultReceived(TaskResult result) {
    this.TaskResultReceived(result);
}
 
public void OnWorkerConnected(Guid worker) {
    this.connectedWorkers.Add(worker);
    this.WorkerConnected(worker);
}
 
private void OnWorkerDisconnected(Guid worker) {
    this.connectedWorkers.Remove(worker);
    this.WorkerDisconnected(worker);
}
 
private event Action<Job> JobReceived = _ => { };
private event Action<Guid> WorkerConnected = _ => { };
private event Action<Guid> WorkerDisconnected = _ => { };
private event Action<TaskResult> TaskResultReceived = _ => { }; 

All the states are implicit, which means that you could easily modify the workflow.   

Let's see how it handles alteration of the workflow by adding timeout functionality from the first example: 

C#
private async void ExecuteJobs() {
    SynchronizationContext.SetSynchronizationContext(this.SynchronizationContext);
 
    var cancellationToken = this.cancellationTokenSource.Token;
 
    while (cancellationToken.IsCancellationRequested == false) {
        var job = await this.GetJobAsync(cancellationToken);
        var tasks = job.Lines.Select(line => new JobTask {Text = line}).ToList();
        var results = new List<TaskResult>();
        var workers = default (IReadOnlyList<Guid>);
 
        var localTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
 
        localTokenSource.CancelAfter(this.WorkerTimeout);
 
        try {
            workers = await this.GetWorkersAsync(localTokenSource.Token);
        } catch (OperationCanceledException) {
            this.WorkerTimedout();
            continue;
        }
 
        var i = 0;
        foreach (var jobTask in tasks)
            this.ScheduleTask(jobTask, workers[i++%workers.Count]);
 
        while (cancellationToken.IsCancellationRequested == false) {
            var result = await this.ReceiveTaskResultAsync(cancellationToken);
 
            tasks.RemoveAll(task => task.Id == result.TaskId);
            results.Add(result);
 
            if (tasks.Count == 0)
                break;
        }
 
        job.Result = results.Sum(x => x.Data);
 
        this.JobFinished(job);
    }
}  

All modifications are in one method only, and we still can trace the workflow with naked eyes. 

Now let's see how we could test this code. 

Tests    

Method ExecuteJobs consists of multiple "parts" which are executed at different times in the synchronization context. 

A typical test would be consisted of three parts: 

  1. Act (interact with Master using event handlers) 
  2. React (let the gears spin) 
  3. Verify (assert the result) 

The first thing we need is to obtain control over the execution flow of these "parts". To do this, we have to implement another version of the synchronization context: 

C#
public class TestSynchronizationContext : SynchronizationContext {
    private readonly Queue<KeyValuePair<SendOrPostCallback, object>> queue =
        new Queue<KeyValuePair<SendOrPostCallback, object>>();
 
    public override void Post(SendOrPostCallback d, object state) {
        if (d == null)
            throw new ArgumentNullException("d");
 
        this.queue.Enqueue(new KeyValuePair<SendOrPostCallback, object>(d, state));
    }
 
    public override void Send(SendOrPostCallback d, object state) {
        throw new NotSupportedException("Synchronously sending is not supported.");
    }
 
    public bool RunOperations() {
        if (this.queue.Count == 0)
            return false;
 
        while (this.queue.Count > 0) {
            var item = this.queue.Dequeue();
            item.Key(item.Value);
        }
 
        return true;
    }
}

Method RunOperations runs all the pending "parts" of ExecuteJobs method. So after RunOperations was executed we can definitely say that all transitions are completed and Master is ready for interaction.  

Here is some complete tests:  

C#
public class Tests {
    private readonly MasterAsyncAwait master;
    private readonly List<Tuple<JobTask, Guid>> scheduledTasks = 
                   new List<Tuple<JobTask, Guid>>();
    private readonly TestSynchronizationContext syncContext = new TestSynchronizationContext();
 
    public Tests() {
        SynchronizationContext.SetSynchronizationContext(this.syncContext);
 
        this.master = new MasterAsyncAwait {
            SynchronizationContext = this.syncContext,
            ScheduleTask = (task, workerId) => 
                this.scheduledTasks.Add(Tuple.Create(task, workerId))
        };
    }
 
    [Fact]
    public void SchedulesJob() {
        this.master.Start();
 
        var workerId = new Guid("1eb02478-cc31-4759-97b4-5d459d802e73");
 
        this.master.OnJobReceived(new Job {Lines = new[] {"test", "ok"}});
        this.master.OnWorkerConnected(workerId);
 
        this.syncContext.RunOperations();
 
        Assert.NotEmpty(scheduledTasks);
        Assert.True(scheduledTasks.All(task => task.Item2 == workerId));
    }
 
    [Fact]
    public void SplitsTasksBetweenWorkersFairly() {
        this.master.Start();
 
        var workerId1 = new Guid("1eb02478-cc31-4759-97b4-5d459d802e73");
        var workerId2 = new Guid("ffe9c75e-9b41-46fe-b75d-a8bb2167d43c");
 
        this.master.OnJobReceived(new Job {Lines = new[] {"test", "ok"}});
        this.master.OnWorkerConnected(workerId1);
        this.master.OnWorkerConnected(workerId2);
 
        this.syncContext.RunOperations();
 
        Assert.NotEmpty(scheduledTasks);
        Assert.True(scheduledTasks.Any(task => task.Item2 == workerId1));
        Assert.True(scheduledTasks.Any(task => task.Item2 == workerId2));
    }
 
    [Fact]
    public void ComputesJobResult() {
        var result = 0;
 
        this.master.JobFinished = job => result = job.Result;
 
        this.master.Start();
        this.master.OnJobReceived(new Job {Lines = new[] {"test"}});
        this.master.OnWorkerConnected(new Guid("1eb02478-cc31-4759-97b4-5d459d802e73"));
        this.syncContext.RunOperations();
 
        this.master.OnTaskResultReceived(new TaskResult {
            TaskId = this.scheduledTasks.Single().Item1.Id,
            Data = 4
        });
 
        this.syncContext.RunOperations();
 
        Assert.Equal(4, result);
    }
}  

As you can see, we can't explicitly assert the current state, because it is implicit, and the only way to see what`s going on, is to check the output from the Master. So each test is a scenario (spec) which you play and assert the output at the end. 

You could also expose some information about what`s going on inside: 

C#
public MasterStatus Status { get; private set; }
 
private async void ExecuteJobs() {
   ...
   this.Status = MasterStatus.WaitingForWorker;
   ...
}    
 
// Collection of connected workers
public IReadOnlyCollection<Guid> ConnectedWorkers { get; private set; } 

Summary  

Each approach has its own pros and cons.

The typical implementation is hard to maintain, but you can verify each state explicitly and you don`t have to replay a complete scenario. Instead you could just set the current state, act, and make sure that the transition occurs and the new state is valid. 

The async/await implementation gives you the simplicity and flexibility. However, you can't treat the class as FSM in unit tests, all you have is a bunch of interaction methods and an output. Use it wisely. 

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer (Senior)
Russian Federation Russian Federation
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.

Comments and Discussions

 
QuestionFurther attempt to download and compile fails Pin
Naked Coder14-Jun-13 22:23
Naked Coder14-Jun-13 22:23 
GeneralInteresting solution Pin
H.Brydon8-May-13 16:23
professionalH.Brydon8-May-13 16:23 
BugAttempted to divide by zero. Pin
Naked Coder8-May-13 3:06
Naked Coder8-May-13 3:06 
GeneralRe: Attempted to divide by zero. Pin
Sergey Shumov8-May-13 7:15
Sergey Shumov8-May-13 7:15 
QuestionState machine? Pin
Thornik6-May-13 9:08
Thornik6-May-13 9:08 
QuestionExcellent Article, Suggestion for further work Pin
John Korondy6-May-13 8:13
John Korondy6-May-13 8:13 
GeneralMy vote of 5 Pin
roscler6-May-13 7:50
professionalroscler6-May-13 7:50 
GeneralMy vote of 5 Pin
Marc Clifton3-May-13 2:36
mvaMarc Clifton3-May-13 2:36 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.