Click here to Skip to main content
15,879,095 members
Articles / Hosted Services / Serverless

Make Serverless Music – Orchestrate Your Workflow with Azure [Part 4 – Durable Functions]

Rate me:
Please Sign up or sign in to vote.
5.00/5 (1 vote)
1 Jun 2018CPOL9 min read 5.3K   2  
The last in the series, a walk through on taking our problem statement and implementing it with the new abstraction from the Azure Functions team, Durable Functions...

At this point, you’ve seen 3 relatively straightforward ways to orchestrate operations for a workflow in a serverless manner:

  1. Microsoft Flow: ideally for Business Users but powerful enough to tackle simple orchestration tasks
  2. Logic Apps: Azure’s purpose-built workflow engine with robust retry & failure capabilities, running on the same execution layer as your Azure deployments
  3. Azure Functions + Logic Apps: for when your tasks are more complex than a simple Logic App can handle

And one pretty involved approach – using Azure Functions in conjunction with state storage and messaging for resilient execution of a chained or parallel workflow across many Azure Functions.

Now let’s have a look at the newest kid on the block: Azure Functions’ Durable Functions extension. Just announced at build but in preview for a few months prior, it’s a code-first approach to workflow and orchestration using Azure Functions.

Durable Functions is only GA on Azure Functions v1. Therefore, many of the constructs it uses are from the v1 namespaces (e.g.: HttpRequestMessage, HttpResponseMessage) however you are free to use these in a v2 Function without issue.

Durable Functions sits on top of Azure Functions and are available simply as a nuget package. What you get with this is a set of Function Trigger bindings and attributes that serve to abstract away the state management and queuing you saw in the Functions + Messaging approach from Part 3. Let’s dive in.

Step 1: Augment Our Current Azure Function into a Durable Function

Every Durable Function workflow has a Starter, one or more Orchestrators, and one or more Activity functions. As we saw in the last instalment, our /Validate function serves as our starter here, so let’s change it into a Durable Functions Orchestration Client (aka Starter):

  1. Add an [OrchestrationClient] parameter:
    C#
    public static IActionResult Run([HttpTrigger
        (AuthorizationLevel.Function, "post", Route = null)]Person req,
        [OrchestrationClient]DurableOrchestrationClient client,
        [ServiceBus(@"function-a", Connection = @"ServiceBusOrchestrationConnection")]
        out QueueMessage functionAmsg,
        TraceWriter log)
  2. Add a call to kick off the new orchestration:
    C#
    var orchestrationInstanceId = await client.StartNewAsync(@"Start", req);
  3. Return either an HTTP 202 ACCEPTED "check for status" response:
    C#
    return client.CreateCheckStatusResponse(req, orchestrationInstanceId);

At this point, you’ll need to convert this starter function to an async method which will leave you with a couple of issues:

Error    CS1988  Async methods cannot have ref or out parameters Functions

This is because of that leftover out parameter for the Service Bus queue message. With DF, we don’t need this (it handles the queuing for you) so you’re free to remove this parameter.

The other thing you’ll notice is the call to CreateCheckStatusResponse takes an HttpRequestMessage as its first parameter which we don’t have. You need to forego the "auto-deserialize" of the request body into a Person object now, and field the actual request, then manually deserialize it into a Person to send on to the orchestration function. At the end of the day, our new /Validate should look like this:

C#
[FunctionName("Validate")]
public static async Task<HttpResponseMessage> 
RunAsync([HttpTrigger(AuthorizationLevel.Function, "post", Route = null)]HttpRequestMessage req,
    [OrchestrationClient]DurableOrchestrationClient client,
    TraceWriter log)
{
    var body = await req.Content.ReadAsStringAsync();
    var personObj = Newtonsoft.Json.JsonConvert.DeserializeObject<Person>(body);
    log.Info($@"Person request received: 
{body}");
    var orchestrationInstanceId = await client.StartNewAsync(@"Start", personObj);

    return client.CreateCheckStatusResponse(req, orchestrationInstanceId);
}

Step 2: Write the New Orchestrator Function

As you saw above, our Starter executes a StartNewAsync with a string parameter for an orchestrator Function name. Well, behind the scenes, DF looks for a Function with a matching name and an OrchestrationTrigger attribute. So we need to create one of these. This function will then be orchestrating the actual work of executing all the necessary activities.

Here’s what just such a function looks like when it’s defined:

C#
[FunctionName(nameof(Start))]
public static void Start([OrchestrationTrigger]DurableOrchestrationContext context)
{

}

There’s a pretty important catch when writing Durable Functions and here’s where it comes in to play. You might think that you could write all your calls to other functions within this orchestrator as...

C#
_client.PostAsync(...

...calls but you’d be wrong. DF will blow up at runtime with a TPL error complaining you’re trying to do an asynchronous operation. Any async method call not present on the Durable Functions API surface must be done in DF Activity functions and nowhere else (you can read more on this and other code constraints in the docs here).

Step 3: Write the Worker "Activity Function"

So let’s transform CheckFirstName into an Activity function:

The first step is to change the trigger type to be an ActivityTrigger which gets a DurableActivityContext instance. You’ll also notice that this activity function can return a value right back to the caller; not having to store it elsewhere; much more idiomatic, don’t you think?

C#
[FunctionName("CheckFirstName")]
public static Error CheckFirstName([ActivityTrigger]DurableActivityContext context, TraceWriter log)
{

Next, get the input passed to our activity (a Person object if you recall) via the context’s GetInput<T> function call:

C#
var person = context.GetInput<Person>();

As an added convenience, you can also use the strong type of the input value instead of a context object (e.g.: Person instead of DurableActivityContext). Downside is you can’t get the InstanceId from that, so I stuck with DurableActivityContext here.

And of course, now do the processing & return a new Error object (or null). The whole activity function ends up looking like this:

C#
[FunctionName("CheckFirstName")]
public static Error CheckFirstName([ActivityTrigger]DurableActivityContext context, TraceWriter log)
{
    var person = context.GetInput<Person>();
    log.Info($@"Message received: {context.InstanceId}");
    if ((person?.Name?.First?.Length > 1) == false)
    {
        var err = new Error { id = 1, message = "First name is null or not longer than 1 character" };
        log.Info($@" - Error found: {err.message}");
        return err;
    }
    else
    {
        log.Info($@" - No error found");
        return null;
    }
}

Notice how, where before we had defined a session id on our own and decorated our incoming request with it, Durable Functions has the instance id for the orchestration as part of the context it passes around; so we can use this in any log messages to correlate activity together.

Step 4: Tie It All Together

Now that we have a worker, our orchestrator needs to call it. Back in Start, fill it out with a call to our activity function passing the request Person object we got in:

C#
[FunctionName(nameof(Start))]
public static async Task<IList<Error>> Start([OrchestrationTrigger]DurableOrchestrationContext context)
{
    var person = context.GetInput<Person>();
    var firstNameError = await context.CallActivityAsync<Error>(nameof(CheckFirstName), person);

    return new[] { firstNameError };
}

A few things happened here:

  1. We had to make the method async because we’re executing CallActivityAsync to kick off the activity function
  2. Our orchestration returns a list of Errors, like our API expects, so the return value becomes Task<IList<Error>>
  3. We get the input to send to our activity from the input that came in to the orchestrator; namely the Person object we deserialized in the starter.

At this point, our DF is fully testable. Let’s give it a run and observe the output:

[5/31/2018 4:09:00 PM] Generating 3 job function(s)
[5/31/2018 4:09:00 PM] Found the following functions:
[5/31/2018 4:09:00 PM] Functions.Function1.RunAsync
[5/31/2018 4:09:00 PM] Functions.Function1.Start
[5/31/2018 4:09:00 PM] Functions.Function1.CheckFirstName

...

Http Functions:

        Validate: http://localhost:7071/api/Validate

Note that the host found 3 functions – good – but only exposed one via HTTP. That’s because the other 2 are used only internally by DF to conduct the orchestration; an outside party has no business invoking them. Awesome! Let’s kick off /Validate and see what happens:

POST /api/Validate HTTP/1.1
Host: localhost:7071
Content-Type: application/json
Cache-Control: no-cache
Postman-Token: 64700bdc-69d4-4faa-abc2-6628f5790715

{
    "Name" : {
        "First" : "J",
        "Last" : "Doe",
        "Title" : "Mr"
    },
    "Address" : {
        "Line1" : "1234 Anywhere St.",
        "Line2" : null,
        "City" : "Somewhere",
        "State" : "OR",
        "Zip" : "12345",
        "Country" : "United States of America"
    }
}

Which kicks off our DF execution and the host starts spitting out log data:

[5/31/2018 4:20:30 PM] Executing 'Validate' (Reason='This function was programmatically called 
via the host APIs.', Id=b2969100-d67e-4725-9545-7c017ff19923)
[5/31/2018 4:20:30 PM] Person request received:
[5/31/2018 4:20:30 PM] {
[5/31/2018 4:20:30 PM]     "Name" : {
[5/31/2018 4:20:30 PM]         "First" : "J",
[5/31/2018 4:20:30 PM]         "Last" : "Doe",
[5/31/2018 4:20:30 PM]         "Title" : "Mr"
[5/31/2018 4:20:30 PM]     },
[5/31/2018 4:20:30 PM]     "Address" : {
[5/31/2018 4:20:30 PM]         "Line1" : "1234 Anywhere St.",
[5/31/2018 4:20:30 PM]         "Line2" : null,
[5/31/2018 4:20:30 PM]         "City" : "Somewhere",
[5/31/2018 4:20:30 PM]         "State" : "OR",
[5/31/2018 4:20:30 PM]         "Zip" : "12345",
[5/31/2018 4:20:30 PM]         "Country" : "United States of America"
[5/31/2018 4:20:30 PM]     }
[5/31/2018 4:20:30 PM] }
[5/31/2018 4:20:30 PM] 35ec223e3c2f4aff91d7d9727a894a83: Function 
'Start (Orchestrator)' scheduled. Reason: NewInstance. IsReplay: False. State: Scheduled. 
HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.4.1.0. SequenceNumber: 0.
[5/31/2018 4:20:34 PM] Executed 'Validate' (Succeeded, Id=b2969100-d67e-4725-9545-7c017ff19923)
[5/31/2018 4:20:37 PM] Executing 'Start' (Reason='', Id=4956f975-380b-4e75-86ec-bd86225512d2)
[5/31/2018 4:20:37 PM] 35ec223e3c2f4aff91d7d9727a894a83: Function 'Start (Orchestrator)' 
started. IsReplay: False. Input: (1188 bytes). State: Started. HubName: DurableFunctionsHub. 
AppName: . SlotName: . ExtensionVersion: 1.4.1.0. SequenceNumber: 1.
[5/31/2018 4:20:37 PM] 35ec223e3c2f4aff91d7d9727a894a83: Function 'CheckFirstName (Activity)' 
scheduled. Reason: Start. IsReplay: False. State: Scheduled. HubName: DurableFunctionsHub. 
AppName: . SlotName: . ExtensionVersion: 1.4.1.0. SequenceNumber: 2.
[5/31/2018 4:20:37 PM] Executed 'Start' (Succeeded, Id=4956f975-380b-4e75-86ec-bd86225512d2)
[5/31/2018 4:20:37 PM] 35ec223e3c2f4aff91d7d9727a894a83: Function 'Start (Orchestrator)' awaited. 
IsReplay: False. State: Awaited. HubName: DurableFunctionsHub. AppName: . SlotName: . 
ExtensionVersion: 1.4.1.0. SequenceNumber: 3.
[5/31/2018 4:20:39 PM] 35ec223e3c2f4aff91d7d9727a894a83: Function 'CheckFirstName (Activity)' 
started. IsReplay: False. Input: (1196 bytes). State: Started. HubName: DurableFunctionsHub. 
AppName: . SlotName: . ExtensionVersion: 1.4.1.0. SequenceNumber: 4.
[5/31/2018 4:20:39 PM] Executing 'CheckFirstName' (Reason='', Id=2ddb9093-2588-489e-a39e-a83a266971d7)
[5/31/2018 4:20:39 PM] Message received: 35ec223e3c2f4aff91d7d9727a894a83
[5/31/2018 4:20:39 PM]  - Error found: First name is null or not longer than 1 character
[5/31/2018 4:20:39 PM] Executed 'CheckFirstName' (Succeeded, Id=2ddb9093-2588-489e-a39e-a83a266971d7)
[5/31/2018 4:20:39 PM] 35ec223e3c2f4aff91d7d9727a894a83: Function 'CheckFirstName (Activity)' 
completed. ContinuedAsNew: False. IsReplay: False. Output: (280 bytes). State: Completed. 
HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.4.1.0. SequenceNumber: 5.
[5/31/2018 4:20:40 PM] Executing 'Start' (Reason='', Id=9d968ad5-6d88-4d9b-ba99-c266a0adbac9)
[5/31/2018 4:20:40 PM] 35ec223e3c2f4aff91d7d9727a894a83: Function 'Start (Orchestrator)' 
started. IsReplay: True. Input: (1188 bytes). State: Started. HubName: DurableFunctionsHub. 
AppName: . SlotName: . ExtensionVersion: 1.4.1.0. SequenceNumber: 6.
[5/31/2018 4:20:40 PM] 35ec223e3c2f4aff91d7d9727a894a83: Function 'CheckFirstName (Activity)' 
scheduled. Reason: Start. IsReplay: True. State: Scheduled. HubName: DurableFunctionsHub. 
AppName: . SlotName: . ExtensionVersion: 1.4.1.0. SequenceNumber: 7.
[5/31/2018 4:20:40 PM] 35ec223e3c2f4aff91d7d9727a894a83: Function 'Start (Orchestrator)' 
completed. ContinuedAsNew: False. IsReplay: False. Output: (288 bytes). State: Completed. 
HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.4.1.0. SequenceNumber: 8.
[5/31/2018 4:20:40 PM] Executed 'Start' (Succeeded, Id=9d968ad5-6d88-4d9b-ba99-c266a0adbac9)

After all is said and done, Postman gets this back for a response:

Content-Length →788
Content-Type →application/json; charset=utf-8
Date →Thu, 31 May 2018 16:20:34 GMT
Location →http://localhost:7071/runtime/webhooks/DurableTaskExtension/
instances/35ec223e3c2f4aff91d7d9727a894a83?taskHub=DurableFunctionsHub&connection=
Storage&code=oGDC2qyC2H8gWrNUjpHAZxgj8CiF44hLxN5nebJZmAbCDo6nouIxag==
Retry-After →10
Server →Kestrel

{
    "id": "35ec223e3c2f4aff91d7d9727a894a83",
    "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/DurableTaskExtension/
     instances/35ec223e3c2f4aff91d7d9727a894a83?taskHub=DurableFunctionsHub&connection=
     Storage&code=oGDC2qyC2H8gWrNUjpHAZxgj8CiF44hLxN5nebJZmAbCDo6nouIxag==",
    "sendEventPostUri": "http://localhost:7071/runtime/webhooks/DurableTaskExtension/
     instances/35ec223e3c2f4aff91d7d9727a894a83/raiseEvent/{eventName}?
     taskHub=DurableFunctionsHub&connection=Storage&
     code=oGDC2qyC2H8gWrNUjpHAZxgj8CiF44hLxN5nebJZmAbCDo6nouIxag==",
    "terminatePostUri": "http://localhost:7071/runtime/webhooks/DurableTaskExtension/
     instances/35ec223e3c2f4aff91d7d9727a894a83/terminate?reason={text}&
     taskHub=DurableFunctionsHub&connection=Storage&
     code=oGDC2qyC2H8gWrNUjpHAZxgj8CiF44hLxN5nebJZmAbCDo6nouIxag=="
}

which seems confusing, but if you look at the Location header and punch that into Postman as an HTTP GET request, you receive:

JavaScript
{
    "runtimeStatus": "Completed",
    "input": {
        "$type": "Functions.Person, Functions",
        "Name": {
            "$type": "Functions.Name, Functions",
            "First": "J",
            "Last": "Doe",
            "Title": "Mr"
        },
        "Address": {
            "$type": "Functions.Address, Functions",
            "Line1": "1234 Anywhere St.",
            "Line2": null,
            "City": "Somewhere",
            "State": "OR",
            "Zip": "12345",
            "Country": "United States of America"
        }
    },
    "customStatus": null,
    "output": [
        {
            "id": 1,
            "message": "First name is null or not longer than 1 character"
        }
    ],
    "createdTime": "2018-05-31T16:20:30Z",
    "lastUpdatedTime": "2018-05-31T16:20:40Z"
}

and have a look at that output property – huzzah!

For more information on the URLs given to you by the response from a POST to a Durable Functions starter, you can read the docs here.

Let’s evolve our workflow, though, into something more complex by first "chaining" our functions together, and then running them in parallel (fan out/in).

Function Chaining with Durable Functions

As you might’ve guessed by now, you can chain functions together by simply coordinating their inputs and outputs. If we add in the Last Name validation to our flow, we end up with a new Activity function:

C#
[FunctionName(nameof(CheckLastName))]
public static async Task<InputOutput> CheckLastName
    ([ActivityTrigger]DurableActivityContext context, TraceWriter log)
{
    await Task.Delay(1400);

    var io = context.GetInput<InputOutput>();
    if ((io?.Input?.Name?.Last?.Length > 1) == false)
    {
        io.Output.Add(new Error 
             { id = 2, message = "Last name is null or not longer than 1 character" });
    }

    return io;
}

Here, we’ve set up the input to our 2nd function to be and InputOutput type that looks like this:

C#
public class InputOutput
{
    public IList<Error> Output { get; set; } = new Error[0];

    public Person Input { get; set; }
}

So each function can augment it as they see fit, and at the end of the orchestration, we’ll simply store Output as the result.

This changes our implementation of the Starter and CheckFirstName as well, but only slightly:

C#
[FunctionName(nameof(CheckFirstName))]
public static async Task<InputOutput> CheckFirstName
    ([ActivityTrigger]DurableActivityContext context, TraceWriter log)
{
    await Task.Delay(1300);
    var person = context.GetInput<Person>();
    log.Info($@"Message received: {context.InstanceId}");
    if ((person?.Name?.First?.Length > 1) == false)
    {
        var err = new Error { id = 1, message = "First name is null or not longer than 1 character" };
        log.Info($@" - Error found: {err.message}");
        return new InputOutput { Output = new[] { err }, Input = person };
    }
    else
    {
        log.Info($@" - No error found");
        return new InputOutput { Input = person };
    }
}
C#
[FunctionName(nameof(Start))]
public static async Task<IList<Error>> Start([OrchestrationTrigger]DurableOrchestrationContext context)
{
    var person = context.GetInput<Person>();

    var chainedInputOutput = await context.CallActivityAsync<InputOutput>
                              (nameof(CheckFirstName), person);
    chainedInputOutput = await context.CallActivityAsync<InputOutput>
                            (nameof(CheckLastName), chainedInputOutput);

    return chainedInputOutput.Output;
}

Let’s give it a run and observe the output from our Functions host:

[5/31/2018 4:36:06 PM] Executing 'Validate' 
(Reason='This function was programmatically called via the host APIs.', 
Id=ba7a080e-a701-451f-971b-d7ea9670ce60)
[5/31/2018 4:36:07 PM] Person request received:
[5/31/2018 4:36:07 PM] {
[5/31/2018 4:36:07 PM]     "Name" : {
[5/31/2018 4:36:07 PM]         "First" : "J",
[5/31/2018 4:36:07 PM]         "Last" : "Doe",
[5/31/2018 4:36:07 PM]         "Title" : "Mr"
[5/31/2018 4:36:07 PM]     },
[5/31/2018 4:36:07 PM]     "Address" : {
[5/31/2018 4:36:07 PM]         "Line1" : "1234 Anywhere St.",
[5/31/2018 4:36:07 PM]         "Line2" : null,
[5/31/2018 4:36:07 PM]         "City" : "Somewhere",
[5/31/2018 4:36:07 PM]         "State" : "OR",
[5/31/2018 4:36:07 PM]         "Zip" : "12345",
[5/31/2018 4:36:07 PM]         "Country" : "United States of America"
[5/31/2018 4:36:07 PM]     }
[5/31/2018 4:36:07 PM] }
[5/31/2018 4:36:07 PM] ebc3b429c89942429046ce54c54eaccb: Function 
'Start (Orchestrator)' scheduled. Reason: NewInstance. IsReplay: False. 
State: Scheduled. HubName: DurableFunctionsHub. AppName: . SlotName: . 
ExtensionVersion: 1.4.1.0. SequenceNumber: 0.
[5/31/2018 4:36:09 PM] Executed 'Validate' (Succeeded, Id=ba7a080e-a701-451f-971b-d7ea9670ce60)
[5/31/2018 4:36:10 PM] Executing 'Start' (Reason='', Id=4cfd7beb-89f8-4b84-b738-a47b1d6e8c13)
[5/31/2018 4:36:10 PM] ebc3b429c89942429046ce54c54eaccb: Function 
'Start (Orchestrator)' started. IsReplay: False. Input: (1188 bytes). 
State: Started. HubName: DurableFunctionsHub. AppName: . SlotName: . 
ExtensionVersion: 1.4.1.0. SequenceNumber: 1.
[5/31/2018 4:36:10 PM] ebc3b429c89942429046ce54c54eaccb: Function 
'CheckFirstName (Activity)' scheduled. Reason: Start. IsReplay: False. 
State: Scheduled. HubName: DurableFunctionsHub. AppName: . SlotName: . 
ExtensionVersion: 1.4.1.0. SequenceNumber: 2.
[5/31/2018 4:36:10 PM] Executed 'Start' (Succeeded, Id=4cfd7beb-89f8-4b84-b738-a47b1d6e8c13)
[5/31/2018 4:36:10 PM] ebc3b429c89942429046ce54c54eaccb: Function 
'Start (Orchestrator)' awaited. IsReplay: False. State: Awaited. 
HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.4.1.0. SequenceNumber: 3.
[5/31/2018 4:36:13 PM] ebc3b429c89942429046ce54c54eaccb: Function 
'CheckFirstName (Activity)' started. IsReplay: False. Input: (1196 bytes). 
State: Started. HubName: DurableFunctionsHub. AppName: . SlotName: . 
ExtensionVersion: 1.4.1.0. SequenceNumber: 4.
[5/31/2018 4:36:13 PM] Executing 'CheckFirstName' 
(Reason='', Id=6a1dbbe1-2ac6-4a6e-84c4-3e506d94615b)
[5/31/2018 4:36:14 PM] Message received: ebc3b429c89942429046ce54c54eaccb
[5/31/2018 4:36:14 PM]  - Error found: First name is null or not longer than 1 character
[5/31/2018 4:36:15 PM] Executed 'CheckFirstName' (Succeeded, Id=6a1dbbe1-2ac6-4a6e-84c4-3e506d94615b)
[5/31/2018 4:36:15 PM] ebc3b429c89942429046ce54c54eaccb: Function 
'CheckFirstName (Activity)' completed. ContinuedAsNew: False. IsReplay: False. 
Output: (1104 bytes). State: Completed. HubName: DurableFunctionsHub. AppName: . 
SlotName: . ExtensionVersion: 1.4.1.0. SequenceNumber: 5.
[5/31/2018 4:36:16 PM] Executing 'Start' (Reason='', Id=ccae9c2c-4243-48f1-bbee-5926687350fc)
[5/31/2018 4:36:16 PM] ebc3b429c89942429046ce54c54eaccb: Function 'Start (Orchestrator)' started. 
IsReplay: True. Input: (1188 bytes). State: Started. HubName: DurableFunctionsHub. AppName: . 
SlotName: . ExtensionVersion: 1.4.1.0. SequenceNumber: 6.
[5/31/2018 4:36:16 PM] ebc3b429c89942429046ce54c54eaccb: 
Function 'CheckFirstName (Activity)' scheduled. Reason: Start. IsReplay: True. 
State: Scheduled. HubName: DurableFunctionsHub. AppName: . SlotName: . 
ExtensionVersion: 1.4.1.0. SequenceNumber: 7.
[5/31/2018 4:36:16 PM] ebc3b429c89942429046ce54c54eaccb: Function 
'CheckLastName (Activity)' scheduled. Reason: Start. IsReplay: False. 
State: Scheduled. HubName: DurableFunctionsHub. AppName: . SlotName: . 
ExtensionVersion: 1.4.1.0. SequenceNumber: 8.
[5/31/2018 4:36:16 PM] Executed 'Start' (Succeeded, Id=ccae9c2c-4243-48f1-bbee-5926687350fc)
[5/31/2018 4:36:16 PM] ebc3b429c89942429046ce54c54eaccb: Function 
'Start (Orchestrator)' awaited. IsReplay: False. State: Awaited. 
HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.4.1.0. SequenceNumber: 9.
[5/31/2018 4:36:18 PM] ebc3b429c89942429046ce54c54eaccb: Function 
'CheckLastName (Activity)' started. IsReplay: False. Input: (1964 bytes). 
State: Started. HubName: DurableFunctionsHub. AppName: . SlotName: . 
ExtensionVersion: 1.4.1.0. SequenceNumber: 10.
[5/31/2018 4:36:18 PM] Executing 'CheckLastName' (Reason='', Id=aff38799-8268-435b-934c-d94dff88d931)
[5/31/2018 4:36:20 PM] Executed 'CheckLastName' (Succeeded, Id=aff38799-8268-435b-934c-d94dff88d931)
[5/31/2018 4:36:20 PM] ebc3b429c89942429046ce54c54eaccb: Function 
'CheckLastName (Activity)' completed. ContinuedAsNew: False. IsReplay: 
False. Output: (1104 bytes). State: Completed. HubName: DurableFunctionsHub. AppName: . 
SlotName: . ExtensionVersion: 1.4.1.0. SequenceNumber: 11.
[5/31/2018 4:36:21 PM] Executing 'Start' (Reason='', Id=e7f8047e-eccc-4a75-aab0-9ac085f03b72)
[5/31/2018 4:36:21 PM] ebc3b429c89942429046ce54c54eaccb: Function 
'Start (Orchestrator)' started. IsReplay: True. Input: (1188 bytes). 
State: Started. HubName: DurableFunctionsHub. AppName: . SlotName: . 
ExtensionVersion: 1.4.1.0. SequenceNumber: 12.
[5/31/2018 4:36:21 PM] ebc3b429c89942429046ce54c54eaccb: Function 
'CheckFirstName (Activity)' scheduled. Reason: Start. IsReplay: True. 
State: Scheduled. HubName: DurableFunctionsHub. AppName: . SlotName: . 
ExtensionVersion: 1.4.1.0. SequenceNumber: 13.
[5/31/2018 4:36:21 PM] ebc3b429c89942429046ce54c54eaccb: Function 
'CheckFirstName (Activity)' completed. ContinuedAsNew: False. IsReplay: True. 
Output: (replayed). State: Completed. HubName: DurableFunctionsHub. AppName: . 
SlotName: . ExtensionVersion: 1.4.1.0. SequenceNumber: 14.
[5/31/2018 4:36:21 PM] ebc3b429c89942429046ce54c54eaccb: Function 
'CheckLastName (Activity)' scheduled. Reason: Start. IsReplay: True. State: 
Scheduled. HubName: DurableFunctionsHub. AppName: . SlotName: . 
ExtensionVersion: 1.4.1.0. SequenceNumber: 15.
[5/31/2018 4:36:21 PM] ebc3b429c89942429046ce54c54eaccb: Function 
'Start (Orchestrator)' completed. ContinuedAsNew: False. IsReplay: False. 
Output: (288 bytes). State: Completed. HubName: DurableFunctionsHub. AppName: . 
SlotName: . ExtensionVersion: 1.4.1.0. SequenceNumber: 16.
[5/31/2018 4:36:22 PM] Executed 'Start' (Succeeded, Id=e7f8047e-eccc-4a75-aab0-9ac085f03b72)

Notice in our activity functions, I had added a couple of await Task.Delay calls. Also notice that our full execution took around 15 seconds, but in Postman, I had a response back from the endpoint within 5.

A subsequent call to the Location header value of the 202 ACCEPTED response I got back yielded:

JavaScript
{
    "runtimeStatus": "Completed",
    "input": {
        "$type": "Functions.Person, Functions",
        "Name": {
            "$type": "Functions.Name, Functions",
            "First": "J",
            "Last": "Doe",
            "Title": "Mr"
        },
        "Address": {
            "$type": "Functions.Address, Functions",
            "Line1": "1234 Anywhere St.",
            "Line2": null,
            "City": "Somewhere",
            "State": "OR",
            "Zip": "12345",
            "Country": "United States of America"
        }
    },
    "customStatus": null,
    "output": [
        {
            "id": 1,
            "message": "First name is null or not longer than 1 character"
        }
    ],
    "createdTime": "2018-05-31T16:36:07Z",
    "lastUpdatedTime": "2018-05-31T16:36:22Z"
}

exactly what we expected.

What happens if we call the location endpoint before the run’s completed? Get your ninja copy/paste skills ready and give it a shot:

JavaScript
{
    "runtimeStatus": "Running",
    "input": {
        "$type": "Functions.Person, Functions",
        "Name": {
            "$type": "Functions.Name, Functions",
            "First": "J",
            "Last": "Doe",
            "Title": "Mr"
        },
        "Address": {
            "$type": "Functions.Address, Functions",
            "Line1": "1234 Anywhere St.",
            "Line2": null,
            "City": "Somewhere",
            "State": "OR",
            "Zip": "12345",
            "Country": "United States of America"
        }
    },
    "customStatus": null,
    "output": null,
    "createdTime": "2018-05-31T16:39:22Z",
    "lastUpdatedTime": "2018-05-31T16:39:28Z"
}

Notice the first property, runtimeStatus with a value of Running and how our output property is still null. This indicates the DF is in progress and the caller should check back later. Waiting a few more seconds to call it and we get back:

JavaScript
{
    "Runtimestatus": "Completed",
    "Input": {
        "$type": "Functions.Person, Functions",
        "Name": {
            "$type": "Functions.Name, Functions",
            "First": "J",
            "Last": "Doe",
            "Title": "Mr"
        },
        "Address": {
            "$type": "Functions.Address, Functions",
            "Line1": "1234 Anywhere St.",
            "Line2": Null,
            "City": "Somewhere",
            "State": "Or",
            "Zip": "12345",
            "Country": "United States of America"
        }
    },
    "Customstatus": Null,
    "Output": [
        {
            "id": 1,
            "Message": "First Name Is Null or Not Longer Than 1 Character"
        }
    ],
    "Createdtime": "2018-05-31t16:39:22z",
    "Lastupdatedtime": "2018-05-31t16:39:37z"
}

as expected.

So now, you can see that chaining functions, or executing them sequentially is as simple as async/await elsewhere in .NET. But what about running them in parallel (aka Fan out/in pattern)?

Parallel Execution (Fan Out/In Pattern)

Well, believe it or not, you’ve already seen how this is done, it’s just got a bit of a DF twist on it. Much like our Azure function, the trick here is to put all the CallActivityAsync<> calls in to a Task[] collection and then WaitAll on the collection. As you also might have figured out, our current InputOutput construct won’t work here due to the parallel nature so instead, let’s just have each function put out its error (or null) and we’ll aggregate when we’re done.

Let’s get started.

First, change our Starter to call the activities in parallel and wait on the collection:

C#
var tasks = new[]
{
    context.CallActivityAsync<Error>(nameof(CheckFirstName), person),
    context.CallActivityAsync<Error>(nameof(CheckLastName), person)
};

var errors = await Task.WhenAll(tasks);

You see, also, here that we’re now just giving our Person object to everybody, which changes our implementation of the activities slightly:

C#
[FunctionName(nameof(CheckFirstName))]
public static async Task<Error> CheckFirstName
([ActivityTrigger]DurableActivityContext context, TraceWriter log)
{
    await Task.Delay(1300);
    var person = context.GetInput<Person>();
    log.Info($@"Message received: {context.InstanceId}");
    if ((person?.Name?.First?.Length > 1) == false)
    {
        var err = new Error 
            { id = 1, message = "First name is null or not longer than 1 character" };
        log.Info($@" - Error found: {err.message}");
        return err;
    }

    log.Info($@" - No error found");
    return null;
}

[FunctionName(nameof(CheckLastName))]
public static async Task<Error> CheckLastName
      ([ActivityTrigger]DurableActivityContext context, TraceWriter log)
{
    await Task.Delay(1400);

    var person = context.GetInput<Person>();
    if ((person?.Name?.Last?.Length > 1) == false)
    {
        var err = new Error { id = 2, message = "Last name is null or not longer than 1 character" };
        log.Info($@" - Error found: {err.message}");
        return err;
    }

    log.Info($@" - No error found");
    return null;
}

And finally, we need to return the results from our orchestrator:

C#
var errors = await Task.WhenAll(tasks);

return errors.Where(e => e != null).ToList();

Let’s give it a run, paying attention to the output as we should see both Activity Functions executing simultaneously.
And sure enough:

[5/31/2018 4:49:13 PM] Executing 'Validate' 
(Reason='This function was programmatically called via the host APIs.', 
Id=4496dbf0-d09e-44fc-95d2-2445ea612bbb)
[5/31/2018 4:49:13 PM] Person request received:
[5/31/2018 4:49:13 PM] {
[5/31/2018 4:49:13 PM]     "Name" : {
[5/31/2018 4:49:13 PM]         "First" : "J",
[5/31/2018 4:49:13 PM]         "Last" : "Doe",
[5/31/2018 4:49:13 PM]         "Title" : "Mr"
[5/31/2018 4:49:13 PM]     },
[5/31/2018 4:49:13 PM]     "Address" : {
[5/31/2018 4:49:13 PM]         "Line1" : "1234 Anywhere St.",
[5/31/2018 4:49:13 PM]         "Line2" : null,
[5/31/2018 4:49:13 PM]         "City" : "Somewhere",
[5/31/2018 4:49:13 PM]         "State" : "OR",
[5/31/2018 4:49:13 PM]         "Zip" : "12345",
[5/31/2018 4:49:13 PM]         "Country" : "United States of America"
[5/31/2018 4:49:13 PM]     }
[5/31/2018 4:49:13 PM] }
[5/31/2018 4:49:13 PM] 4e580cf9c1f841a6b820eb030e6c37bf: 
Function 'Start (Orchestrator)' scheduled. Reason: NewInstance. IsReplay: False. 
State: Scheduled. HubName: DurableFunctionsHub. AppName: . SlotName: . 
ExtensionVersion: 1.4.1.0. SequenceNumber: 13.
[5/31/2018 4:49:14 PM] Executed 'Validate' (Succeeded, Id=4496dbf0-d09e-44fc-95d2-2445ea612bbb)
[5/31/2018 4:49:17 PM] Executing 'Start' (Reason='', Id=2c80b880-6062-4a04-a2c3-5b2f6fc0490e)
[5/31/2018 4:49:17 PM] 4e580cf9c1f841a6b820eb030e6c37bf: 
Function 'Start (Orchestrator)' started. IsReplay: False. Input: (1188 bytes). 
State: Started. HubName: DurableFunctionsHub. AppName: . SlotName: . 
ExtensionVersion: 1.4.1.0. SequenceNumber: 14.
[5/31/2018 4:49:18 PM] 4e580cf9c1f841a6b820eb030e6c37bf: 
Function 'CheckFirstName (Activity)' scheduled. Reason: 
Start. IsReplay: False. State: Scheduled. HubName: DurableFunctionsHub. 
AppName: . SlotName: . ExtensionVersion: 1.4.1.0. SequenceNumber: 15.
[5/31/2018 4:49:18 PM] 4e580cf9c1f841a6b820eb030e6c37bf: 
Function 'CheckLastName (Activity)' scheduled. Reason: 
Start. IsReplay: False. State: Scheduled. HubName: DurableFunctionsHub. 
AppName: . SlotName: . ExtensionVersion: 1.4.1.0. SequenceNumber: 16.
[5/31/2018 4:49:18 PM] Executed 'Start' (Succeeded, Id=2c80b880-6062-4a04-a2c3-5b2f6fc0490e)
[5/31/2018 4:49:18 PM] 4e580cf9c1f841a6b820eb030e6c37bf: 
Function 'Start (Orchestrator)' awaited. IsReplay: False. 
State: Awaited. HubName: DurableFunctionsHub. AppName: . SlotName: . 
ExtensionVersion: 1.4.1.0. SequenceNumber: 17.
[5/31/2018 4:49:19 PM] 4e580cf9c1f841a6b820eb030e6c37bf: 
Function 'CheckFirstName (Activity)' started. IsReplay: False. Input: (1196 bytes). 
State: Started. HubName: DurableFunctionsHub. AppName: . SlotName: . 
ExtensionVersion: 1.4.1.0. SequenceNumber: 18.
[5/31/2018 4:49:19 PM] Executing 'CheckFirstName' (Reason='', Id=c1607f15-607d-40e1-a14e-74a1c0046c13)
[5/31/2018 4:49:20 PM] 4e580cf9c1f841a6b820eb030e6c37bf: 
Function 'CheckLastName (Activity)' started. IsReplay: False. 
Input: (1196 bytes). State: Started. HubName: DurableFunctionsHub. AppName: . 
SlotName: . ExtensionVersion: 1.4.1.0. SequenceNumber: 19.
[5/31/2018 4:49:20 PM] Executing 'CheckLastName' (Reason='', Id=8a38bb91-4c3b-431a-bfb9-66e8c6f259bd)

...

Notice that CheckFirstName has not completed before CheckLastName kicked off. And yet, we waited for all to be done before our output value arrived at our endpoint and was populated correctly.

The Implementation

So how are Durable Functions managing all this? It’s all done within the storage account you specify for AzureWebJobsStorage; required by Azure Functions. If you open Storage Explorer and have a look, you’ll see a number of Durable Functions-created containers, tables, and queues:

and it is within these that DF manages the state of each orchestration including inputs/outputs to/from Activity functions and where each orchestration instance is in its execution.

In short, DF manages execution by using Storage Queues to pass messages to/from Activity Functions and the Orchestrator. The Orchestrator maintains the state of an individual orchestration within Table storage and that’s what you’re getting when you hit the Location endpoint returned by the 202 ACCEPTED response to the Starter.

Because Storage Queues only have polling capability today w/in Functions, this is also why this particular set up takes longer to run than the Service Bus or Logic Apps approach shown in Step 3. My hope for the future is this will eventually be driven by Event Grid for near-instantaneous execution of activities & orchestrations.


That concludes the series! I hope you’ve found this insightful and educational as you plan to move your workflows to the cloud and take advantage of the microbilling and autoscaling that Azure Serverless has to offer!

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer (Senior)
United States United States
I'm a Sr. Software Engineer in the Seattle area primarily focused on serverless technologies in the cloud. In my free time I enjoy hiking & other adventures with my family around the Puget Sound and the country! You can find out more about me at my homepage: http://bc3.tech/brandonh

Comments and Discussions

 
-- There are no messages in this forum --