Curating Efficient Distributed Application Runtime (Dapr) Workflows
With Dapr and its powerful abstractions, writing resilient workflows is easy. In this article, we will understand how to write one with an example.
Join the DZone community and get the full member experience.
Join For FreeDistributed Application Runtime (Dapr) is a portable and event-driven runtime that commoditizes some of the problems developers face with distributed systems and microservices daily.
Imagine there are 3-4 different microservices. As part of communication between these services, developers must think about:
- Handling timeouts and failures
- Metrics and traces
- Controlling and restricting communication between services.
These challenges are recurring, but with Dapr's Service-to-Service Invocation building block, they are seamlessly abstracted.
Dapr divides such capabilities into components that can be invoked using a building block, aka API.
Components Overview
Below mentioned are a subset of components that Dapr supports.
Component | Description |
---|---|
Service-to-Service | Facilitates communication between microservices: It encapsulates handling failures, observability, and applying policies (responsible for enforcing restrictions on who is allowed to call) |
Secrets | Facilitate communication with cloud secrets and Kubernetes secrets provider stores |
Workflows | With the Workflows component, developers can run long-running workloads distributed across nodes. |
Publish/Subscribe | Similar to the producer/consumer pattern, with this component messages can be produced to a topic and listeners can consume from the subscribed topic. |
Let's dive into the workflow component.
Workflow Component
Problem
An example of a simple Workflow can be a scheduled job that moves data between data sources. The complexity increases when child workflows must be triggered as part of the parent workflow and the workflow author also becomes responsible for saving, resuming, and maintaining the state and the schema.
With the Dapr Workflow component, most of the state management is abstracted out, allowing developers to focus only on the business logic.
Key Terms
- Workflow: Contains a set of tasks that need to be executed
- Activities: Tasks that need to be executed; For example, in the previous work where data must be moved from source to destination:
- Activity 1: Reads data from Source
- Activity 2: Writes to the destination
Workflow will compromise both these activities.
- Benefits
- Using Workflow Replays we inherently get checkpointing mechanism. For example, in the C# async/await model, Dapr automatically checkpoints at each await call. This allows the system to recover from the most recent I/O operation during a failure, making recovery less costly.
- Built-in retry strategies for the workflows and activities are customizable to suit specific workflows.
Workflow Patterns
Pattern 1
The parent workflow parallelly schedules multiple child activities.
Pattern 2
In this scenario, the workflow schedules Activity 1 and then passes its output to Activity 2 for further processing.
Pattern 3
Here, the parent workflow schedules another child workflow which in turn schedules some activities.
Example
Let's explore an example using C# and Dapr to schedule workflows that read data from Blob storage.
Step 1
Import the Dapr packages into csproj.
<ItemGroup>
# https://www.nuget.org/packages/Dapr.AspNetCore
<PackageReference Include="Dapr.AspNetCore" Version="1.14.0" ></PackageReference>
# https://www.nuget.org/packages/Dapr.Workflow
<PackageReference Include="Dapr.Workflow" Version="1.14.0" ></PackageReference>
</ItemGroup>
Step 2: Configuring Workflow and Activity
- Add workflow and activities to the Dapr Workflow extension.
- "Register Workflow" is used to register workflows.
- "Register Activity" is used to register activity.
/// <summary>
/// Configure workflow extension.
/// </summary>
public static class DaprConfigurationExtension
{
/// <summary>
/// Configure Workflow extension.
/// </summary>
/// <param name="services">services.</param>
/// <returns>IServiceCollection.</returns>
public static IServiceCollection ConfigureDaprWorkflows(this IServiceCollection services)
{
services.AddDaprWorkflow(options =>
{
// Note that it's also possible to register a lambda function as the workflow
// or activity implementation instead of a class.
options.RegisterWorkflow<BlobOrchestrationWorkflow>();
// These are the activities that get invoked by the Dapr workflow(s).
options.RegisterActivity<BlobDataFetchActivity>();
});
return services;
}
}
Step 3: Writing the First Workflow
The Blob Orchestration Workflow implements Workflow coming from Dapr NuGet with input and output parameters.
The input here is the name of the blob, which is a string, and the output is content from the blob, nothing but a list of lines.
/// <summary>
/// Dapr workflow responsible for peforming operations on blob.
/// </summary>
public class BlobOrchestrationWorkflow : Workflow<string, List<string>>
{
/// <inheritdoc/>
public async override Task<List<string>> RunAsync(WorkflowContext context, string input)
{
ArgumentNullException.ThrowIfNull(context);
ArgumentNullException.ThrowIfNull(input);
List<string> identifiers = await context.CallActivityAsync<List<string>>(
name: nameof(BlobDataFetchActivity),
input: input).ConfigureAwait(false); // state is saved
return identifiers;
}
}
Step 4: Writing the First Activity
Like Workflow, Activity also takes input and output. In this case, input is the blob name, and output is the list of lines from the blob.
/// <summary>
/// Fetch identifiers from Blob.
/// </summary>
public class BlobDataFetchActivity : WorkflowActivity<string, List<string>>
{
private readonly IBlobReadProcessor readProcessor;
/// <summary>
/// Initializes a new instance of the <see cref="BlobDataFetchActivity"/> class.
/// </summary>
/// <param name="blobReadProcessor">read blob data.</param>
public BlobDataFetchActivity(IBlobReadProcessor blobReadProcessor)
{
this.readProcessor = blobReadProcessor;
}
/// <inheritdoc/>
public override async Task<List<string>> RunAsync(WorkflowActivityContext context, string input)
{
return await this.readProcessor.ReadBlobContentAsync<List<string>>(input).ConfigureAwait(false); // state is saved
}
}
Step 5: Scheduling the First Workflow
- Use the Workflow Client schedule workflows.
- The "instance id" must be unique to each workflow. Using the same ID can cause indeterministic behavior.
- Each workflow has an input and an output. For example, if the workflow is going to take a blob name as input and return a list of lines in the blob, the input is a string, and the output is a
List<string>
. - Workflow is tracked using the workflow ID and once it is completed, the "Execute Workflow Async" method completes execution.
public class DaprService
{
// Workflow client injected using Dependency Injection.
private readonly DaprWorkflowClient daprWorkflowClient;
/// <summary>
/// Initializes a new instance of the <see cref="QueuedHostedService{T}"></see> class.
/// </summary>
/// <param name="daprWorkflowClient">Dapr workflow client.</param>
public QueuedHostedService(DaprWorkflowClient daprWorkflowClient)
{
this.daprWorkflowClient = daprWorkflowClient;
}
/// <summary>
/// Execute Dapr workflow.
/// </summary>
/// <param name="message">string Message.</param>
/// <returns>Task.</returns>
public async Task ExecuteWorkflowAsync(string message)
{
string id = Guid.NewGuid().ToString();
// Schedule the Dapr Workflow.
await this.daprWorkflowClient.ScheduleNewWorkflowAsync(
name: nameof(NetworkRecordIngestionWorkflow),
instanceId: id,
input: message).ConfigureAwait(false);
WorkflowState state = await this.daprWorkflowClient.GetWorkflowStateAsync(
instanceId: id,
getInputsAndOutputs: true).ConfigureAwait(false);
// Track the workflow state until completion.
while (!state.IsWorkflowCompleted)
{
state = await this.daprWorkflowClient.GetWorkflowStateAsync(
instanceId: id,
getInputsAndOutputs: true).ConfigureAwait(false);
}
}
}
Best Practices
- Each time Dapr encounters an "await," it saves the workflow state. Leveraging this feature is important for ensuring workflows can resume efficiently and cost-effectively after interruptions.
- In addition to the above, the input and output must be deterministic for the Workflow replay pattern to work correctly. For example,
- Assume below is the first input to the workflow. The workflow then pulls the data from the blob, saves it to the state, and for some reason crashes.
{
"blobName": "dapr-blob",
"createdOn": "2024-12-11T23:00:00.11212Z"
}
After a restart, we resend the input with a different "created on" timestamp. Even though we’ve already saved the output for the blob name, the new timestamp qualifies this as a new payload, prompting the output to be recomputed. If the "created on" timestamp was omitted, we could retrieve the state from the state store without making an additional I/O call.
{
"blobName": "dapr-blob",
"createdOn": "2024-12-11T23:01:00.11212Z"
}
Workflow interaction with data other than the state must happen through Activities only.
Opinions expressed by DZone contributors are their own.
Comments