Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions samples/scenarios/WorkItemFilteringSplitActivities/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
## Ignore Visual Studio temporary files, build results, and
## files generated by popular Visual Studio add-ons.
##
## Get latest from https://github.com/github/gitignore/blob/main/VisualStudio.gitignore

# User-specific files
*.rsuser
*.suo
*.user
*.userosscache
*.sln.docstates

# User-specific files (MonoDevelop/Xamarin Studio)
*.userprefs

# Mono auto generated files
mono_crash.*

# Build results
[Dd]ebug/
[Rr]elease/
x64/
x86/
[Ww][Ii][Nn]32/
[Aa][Rr][Mm]/
[Aa][Rr][Mm]64/
bld/
[Bb]in/
[Oo]bj/
[Ll]og/
[Ll]ogs/

# Visual Studio 2015/2017 cache/options directory
.vs/

# NuGet Packages
*.nupkg
# NuGet Symbol Packages
*.snupkg
# The packages folder can be ignored because of Package Restore
**/[Pp]ackages/*
# except build/, which is used as an MSBuild target.
!**/[Pp]ackages/build/
# Uncomment if necessary however generally it will be regenerated when needed
#!**/[Pp]ackages/repositories.config
# NuGet v3's project.json files produces more ignorable files
*.nuget.props
*.nuget.targets

# Azure
.azure
288 changes: 288 additions & 0 deletions samples/scenarios/WorkItemFilteringSplitActivities/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
# Work Item Filtering — Split Activities Sample

This sample demonstrates **Work Item Filtering**, a feature that allows workers to declare which orchestrations, activities, and entities they can process. The Durable Task Scheduler (DTS) backend routes work items only to workers whose filters match, preventing workers from receiving work they cannot handle.

Before work item filtering, all orchestrations, activities, and entities were handed to any connected worker regardless of what it actually hosted. This caused errors (or silent hangs) when a worker received a work item it didn't implement — especially problematic in multi-service deployments, rolling upgrades, and microservice topologies. With filtering, each worker registers its task set; DTS creates per-filter queues and routes work items to matching workers. If no filter is specified, behavior falls back to the "generic queue" (all workers receive everything).

## Architecture

```
┌─────────────────────────────────────────────────────────────┐
│ Durable Task Scheduler (DTS) │
│ │
│ Orchestration queue ──► routed to Orchestrator Worker only │
│ ValidateOrder queue ──► routed to Validator Worker only │
│ ShipOrder queue ──► routed to Shipper Worker only │
└────────────┬──────────────────┬──────────────────┬──────────┘
│ │ │
┌───────▼───────┐ ┌──────▼───────┐ ┌──────▼───────┐
│ Orchestrator │ │ Validator │ │ Shipper │
│ Worker │ │ Worker │ │ Worker │
│ │ │ │ │ │
│ Registers: │ │ Registers: │ │ Registers: │
│ • OrderProc- │ │ • Validate- │ │ • ShipOrder │
│ essing- │ │ Order │ │ │
│ Orchestration│ │ │ │ │
└───────────────┘ └───────────────┘ └───────────────┘

┌───────────────┐
│ Client │
│ (Driver) │
│ │
│ Schedules new │
│ orchestrations │
│ and prints │
│ results │
└───────────────┘
```

**Orchestrator Worker** runs orchestrations only — it has no activities registered.
**Validator Worker** runs `ValidateOrder` only — it has no orchestrations or other activities.
**Shipper Worker** runs `ShipOrder` only — same isolation.
**Client** schedules orchestrations and polls for completion.

## The Orchestration

`OrderProcessingOrchestration` performs two sequential activity calls:

1. `ValidateOrder(orderId)` → routed to Validator Worker
2. `ShipOrder(orderId)` → routed to Shipper Worker

Returns a combined result string.

## Prerequisites

- [.NET 10 SDK](https://dotnet.microsoft.com/download) (or later)
- [Docker](https://docs.docker.com/get-docker/) (for the DTS emulator)

## Running Locally

### 1. Start the DTS Emulator

```bash
docker pull mcr.microsoft.com/durable-task/emulator:latest
docker run -d --name dts-emulator -p 8080:8080 -p 8082:8082 mcr.microsoft.com/durable-task/emulator:latest
```

The emulator dashboard is available at `http://localhost:8082`.

### 2. Build all projects

```bash
cd samples/scenarios/WorkItemFilteringSplitActivities
dotnet build
```

### 3. Start the three workers (each in a separate terminal)

**Terminal 1 — Orchestrator Worker:**
```bash
dotnet run --project src/OrchestratorWorker
```

**Terminal 2 — Validator Worker (ValidateOrder activity):**
```bash
dotnet run --project src/ValidatorWorker
```

**Terminal 3 — Shipper Worker (ShipOrder activity):**
```bash
dotnet run --project src/ShipperWorker
```

### 4. Run the Client (in a fourth terminal)

```bash
dotnet run --project src/Client
```

## Expected Output

The client runs in a **continuous loop**, scheduling a batch of 3 orchestrations every 30 seconds for 10 minutes. This makes it easy to observe scaling behavior over time.

### Client terminal

```
10:30:01 [Client] === Work Item Filtering Demo — Client ===
10:30:01 [Client] Will schedule 3 orchestrations every 30s for 10 minutes.

10:30:01 [Client] --- Batch #1 at 10:30:01 ---
10:30:01 [Client] Scheduling orchestration #1 with orderId='ORD-B001-001'...
10:30:01 [Client] -> Scheduled with InstanceId=abc123
10:30:01 [Client] Scheduling orchestration #2 with orderId='ORD-B001-002'...
10:30:01 [Client] -> Scheduled with InstanceId=def456
10:30:01 [Client] Scheduling orchestration #3 with orderId='ORD-B001-003'...
10:30:01 [Client] -> Scheduled with InstanceId=ghi789
10:30:01 [Client] All 3 orchestrations scheduled. Waiting for completion...
10:30:02 [Client] COMPLETED | InstanceId=abc123 | Output: Order 'ORD-B001-001' => Validation: [Order ORD-B001-001 is valid], Shipping: [Shipped with tracking TRACK-ORD-B001-001-4271]
...
10:30:02 [Client] === RESULTS: 3 completed, 0 failed, 3 total ===
10:30:02 [Client] Sleeping 30s until next batch...

10:30:32 [Client] --- Batch #2 at 10:30:32 ---
...
```

### Orchestrator Worker terminal (orchestrations only — no activities)

```
10:30:02 [Orchestrator] Orchestration | Name=OrderProcessingOrchestration | InstanceId=abc123 | Processing order 'ORD-B001-001'
10:30:02 [Orchestrator] Orchestration | InstanceId=abc123 | Dispatching ValidateOrder to Validator Worker...
10:30:02 [Orchestrator] Orchestration | InstanceId=abc123 | Dispatching ShipOrder to Shipper Worker...
10:30:02 [Orchestrator] Orchestration | InstanceId=abc123 | Completed: Order 'ORD-B001-001' => Validation: [...], Shipping: [...]
```

### Validator Worker terminal (ValidateOrder only — no ShipOrder, no orchestrations)

```
10:30:02 [Validator] Activity | Name=ValidateOrder | InstanceId=abc123 | Validating order 'ORD-B001-001'...
10:30:02 [Validator] Activity | Name=ValidateOrder | InstanceId=abc123 | Result: Order ORD-B001-001 is valid
10:30:02 [Validator] Activity | Name=ValidateOrder | InstanceId=def456 | Validating order 'ORD-B001-002'...
```

### Shipper Worker terminal (ShipOrder only — no ValidateOrder, no orchestrations)

```
10:30:02 [Shipper] Activity | Name=ShipOrder | InstanceId=abc123 | Shipping order 'ORD-B001-001'...
10:30:02 [Shipper] Activity | Name=ShipOrder | InstanceId=abc123 | Result: Shipped with tracking TRACK-ORD-B001-001-4271
10:30:02 [Shipper] Activity | Name=ShipOrder | InstanceId=def456 | Shipping order 'ORD-B001-002'...
```

**Key observation:** Each worker processes **only** its registered work item types. No cross-processing occurs.

## What to Try Next: Strict Routing Experiment

1. **Stop Shipper Worker** (Ctrl+C in Terminal 3).
2. Run the Client again to schedule new orchestrations.
3. Observe that:
- Orchestrator Worker picks up and starts orchestrations.
- Validator Worker completes `ValidateOrder` for each order.
- `ShipOrder` work items **remain pending** — they are not delivered to Validator Worker or Orchestrator Worker.
- The orchestrations stay in "Running" status, waiting for the `ShipOrder` activity to complete.
4. **Restart Shipper Worker** — the pending `ShipOrder` work items are immediately delivered and the orchestrations complete.

This demonstrates that filtering is strict: work items are routed only to workers with matching filters. There is no fallback to other workers.

## How It Works

Each worker process registers its tasks in a `DurableTaskRegistry` via `AddAllGeneratedTasks()` (which picks up classes decorated with `[DurableTask]`). When the worker connects to DTS, the SDK automatically constructs **work item filters** from the registry:

- Orchestrator Worker's filter: `orchestrations: [OrderProcessingOrchestration]`
- Validator Worker's filter: `activities: [ValidateOrder]`
- Shipper Worker's filter: `activities: [ShipOrder]`

DTS creates per-filter queues and routes each work item to the matching queue. If a filter list is empty for a given type (e.g., Validator Worker has no orchestration filter), that worker simply never receives work items of that type.

This is all **automatic** — no explicit `UseWorkItemFilters()` call is needed. The SDK generates filters from whatever you register. To override or opt out, you can use `UseWorkItemFilters(customFilters)` or `UseWorkItemFilters(null)` respectively.

## Deploying to Azure

This sample includes full infrastructure-as-code (Bicep) and an `azure.yaml` for one-command deployment via [Azure Developer CLI (`azd`)](https://learn.microsoft.com/azure/developer/azure-developer-cli/).

### What Gets Deployed

| Resource | Purpose |
|---|---|
| **Resource Group** | Contains all resources |
| **Durable Task Scheduler** (Consumption SKU) | Managed orchestration backend |
| **Task Hub** | Logical unit for orchestrations and work items |
| **Container Apps Environment** | Shared hosting environment with VNet integration |
| **Azure Container Registry** | Stores Docker images for each service |
| **User-Assigned Managed Identity** | Shared identity with DTS Worker/Client RBAC role |
| **4 Container Apps** | Client, Orchestrator Worker, Validator Worker, Shipper Worker |

### Deploy with `azd`

```bash
cd samples/scenarios/WorkItemFilteringSplitActivities
azd up
```

You'll be prompted for an environment name, subscription, and location. The deployment takes ~5 minutes.

### KEDA Scaling with DTS

Each worker Container App is configured with a **DTS-aware KEDA custom scale rule** (`azure-durabletask-scheduler`) that scales based on the **work item backlog** in the task hub. The key parameter is `workItemType`, which tells the scaler what kind of work to monitor:

| Container App | Service Name | `workItemType` | Scales on |
|---|---|---|---|
| **Client** | `client` | `Orchestration` | Pending orchestration work items |
| **Orchestrator Worker** | `orchestrator-worker` | `Orchestration` | Pending orchestration work items |
| **Validator Worker** | `validator-worker` | `Activity` | Pending activity work items |
| **Shipper Worker** | `shipper-worker` | `Activity` | Pending activity work items |

The scale rule metadata (from [app.bicep](infra/app/app.bicep)):

```bicep
scaleRuleType: 'azure-durabletask-scheduler'
scaleRuleMetadata: {
endpoint: dtsEndpoint // DTS scheduler URL
maxConcurrentWorkItemsCount: '1'
taskhubName: taskHubName
workItemType: workItemType // 'Orchestration' or 'Activity'
}
scaleRuleIdentity: userAssignedManagedIdentity.resourceId
```

- Workers scale from **0 to 10** replicas. When the client finishes its loop and no more work items arrive, workers scale back to zero.
- The `scaleRuleIdentity` uses the shared user-assigned managed identity to authenticate with DTS, so no connection strings or secrets are needed for scaling.
- `maxConcurrentWorkItemsCount: '1'` means KEDA will scale up one replica per pending work item, up to the max.

### Manual Deployment (without `azd`)

Set the `ENDPOINT` and `TASKHUB` environment variables to point to your deployed scheduler:

```bash
export ENDPOINT="https://your-scheduler.westus2.durabletask.io"
export TASKHUB="your-taskhub-name"
```

The workers and client will automatically use `DefaultAzureCredential` for authentication. Make sure the identity running each process has the **Durable Task Scheduler Worker** / **Durable Task Scheduler Client** role on the scheduler resource.

## Project Structure

```
WorkItemFilteringSplitActivities/
├── WorkItemFilteringSplitActivities.sln
├── README.md
├── azure.yaml # azd service definitions
├── .gitignore
├── infra/ # Bicep infrastructure-as-code
│ ├── main.bicep # Top-level — resource group, DTS, container apps
│ ├── main.parameters.json
│ ├── abbreviations.json
│ ├── app/
│ │ ├── app.bicep # Per-service container app (with KEDA scale rule)
│ │ ├── dts.bicep # DTS scheduler + task hub
│ │ └── user-assigned-identity.bicep
│ └── core/
│ ├── host/ # Container Apps Environment, Registry, App template
│ ├── networking/ # VNet
│ └── security/ # ACR pull role, DTS role assignments
└── src/
├── Client/ # Schedules orchestrations in a loop, prints results
│ ├── Client.csproj
│ ├── Program.cs
│ └── Dockerfile
├── OrchestratorWorker/ # Orchestrator Worker — runs orchestrations only
│ ├── OrchestratorWorker.csproj
│ ├── Program.cs
│ ├── OrderProcessingOrchestration.cs
│ └── Dockerfile
├── ValidatorWorker/ # Validator Worker — runs ValidateOrder activity only
│ ├── ValidatorWorker.csproj
│ ├── Program.cs
│ ├── ValidateOrder.cs
│ └── Dockerfile
└── ShipperWorker/ # Shipper Worker — runs ShipOrder activity only
├── ShipperWorker.csproj
├── Program.cs
├── ShipOrder.cs
└── Dockerfile
```

## Reference

- [Work Item Filtering PR (durabletask-dotnet #616)](https://github.com/microsoft/durabletask-dotnet/pull/616)
- [Durable Task Scheduler documentation](https://learn.microsoft.com/azure/durable-task-scheduler/)
- [Durable Task .NET SDK](https://github.com/microsoft/durabletask-dotnet)
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.5.2.0
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OrchestratorWorker", "src\OrchestratorWorker\OrchestratorWorker.csproj", "{A1B2C3D4-E5F6-7890-ABCD-EF1234567890}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ValidatorWorker", "src\ValidatorWorker\ValidatorWorker.csproj", "{B2C3D4E5-F6A7-8901-BCDE-F12345678901}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ShipperWorker", "src\ShipperWorker\ShipperWorker.csproj", "{C3D4E5F6-A7B8-9012-CDEF-123456789012}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Client", "src\Client\Client.csproj", "{D4E5F6A7-B8C9-0123-DEF0-234567890123}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{A1B2C3D4-E5F6-7890-ABCD-EF1234567890}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A1B2C3D4-E5F6-7890-ABCD-EF1234567890}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A1B2C3D4-E5F6-7890-ABCD-EF1234567890}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A1B2C3D4-E5F6-7890-ABCD-EF1234567890}.Release|Any CPU.Build.0 = Release|Any CPU
{B2C3D4E5-F6A7-8901-BCDE-F12345678901}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B2C3D4E5-F6A7-8901-BCDE-F12345678901}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B2C3D4E5-F6A7-8901-BCDE-F12345678901}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B2C3D4E5-F6A7-8901-BCDE-F12345678901}.Release|Any CPU.Build.0 = Release|Any CPU
{C3D4E5F6-A7B8-9012-CDEF-123456789012}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C3D4E5F6-A7B8-9012-CDEF-123456789012}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C3D4E5F6-A7B8-9012-CDEF-123456789012}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C3D4E5F6-A7B8-9012-CDEF-123456789012}.Release|Any CPU.Build.0 = Release|Any CPU
{D4E5F6A7-B8C9-0123-DEF0-234567890123}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{D4E5F6A7-B8C9-0123-DEF0-234567890123}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D4E5F6A7-B8C9-0123-DEF0-234567890123}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D4E5F6A7-B8C9-0123-DEF0-234567890123}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {E5F6A7B8-C9D0-1234-EF01-345678901234}
EndGlobalSection
EndGlobal
Loading
Loading