Skip to content

Commit f20505c

Browse files
authored
Versioning, recursive terminating/purging, Replay safe logger (#93)
1 parent af18105 commit f20505c

24 files changed

Lines changed: 2393 additions & 22 deletions

packages/durabletask-js-azuremanaged/src/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,8 @@ export { DurableTaskAzureManagedWorkerBuilder, createAzureManagedWorkerBuilder }
2121
// Logger exports - re-export from core package for convenience
2222
export { Logger, ConsoleLogger, NoOpLogger } from "@microsoft/durabletask-js";
2323

24+
// Versioning exports - re-export from core package for convenience
25+
export { VersioningOptions, VersionMatchStrategy, VersionFailureStrategy } from "@microsoft/durabletask-js";
26+
2427
// Azure-specific logger adapter
2528
export { AzureLoggerAdapter, createAzureLogger } from "./azure-logger-adapter";

packages/durabletask-js-azuremanaged/src/worker-builder.ts

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,16 @@
44
import { TokenCredential } from "@azure/identity";
55
import * as grpc from "@grpc/grpc-js";
66
import { DurableTaskAzureManagedWorkerOptions } from "./options";
7-
import { TaskHubGrpcWorker, TOrchestrator, TActivity, TInput, TOutput, Logger, ConsoleLogger } from "@microsoft/durabletask-js";
7+
import {
8+
TaskHubGrpcWorker,
9+
TOrchestrator,
10+
TActivity,
11+
TInput,
12+
TOutput,
13+
Logger,
14+
ConsoleLogger,
15+
VersioningOptions,
16+
} from "@microsoft/durabletask-js";
817

918
/**
1019
* Builder for creating DurableTaskWorker instances that connect to Azure-managed Durable Task service.
@@ -17,6 +26,7 @@ export class DurableTaskAzureManagedWorkerBuilder {
1726
private _activities: { name?: string; fn: TActivity<TInput, TOutput> }[] = [];
1827
private _logger: Logger = new ConsoleLogger();
1928
private _shutdownTimeoutMs?: number;
29+
private _versioning?: VersioningOptions;
2030

2131
/**
2232
* Creates a new instance of DurableTaskAzureManagedWorkerBuilder.
@@ -198,6 +208,18 @@ export class DurableTaskAzureManagedWorkerBuilder {
198208
return this;
199209
}
200210

211+
/**
212+
* Configures versioning options for the worker.
213+
* This allows filtering orchestrations by version using different match strategies.
214+
*
215+
* @param options The versioning options including version, matchStrategy, and failureStrategy.
216+
* @returns This builder instance.
217+
*/
218+
versioning(options: VersioningOptions): DurableTaskAzureManagedWorkerBuilder {
219+
this._versioning = options;
220+
return this;
221+
}
222+
201223
/**
202224
* Builds and returns a configured TaskHubGrpcWorker.
203225
*
@@ -219,18 +241,17 @@ export class DurableTaskAzureManagedWorkerBuilder {
219241
...this._grpcChannelOptions,
220242
};
221243

222-
// Use the core TaskHubGrpcWorker with custom credentials and metadata generator
223-
// For insecure connections, metadata is passed via the metadataGenerator parameter
224-
// For secure connections, metadata is included in the channel credentials
225-
const worker = new TaskHubGrpcWorker(
244+
// Use the core TaskHubGrpcWorker with options-based constructor
245+
const worker = new TaskHubGrpcWorker({
226246
hostAddress,
227-
combinedOptions,
228-
true,
229-
channelCredentials,
247+
options: combinedOptions,
248+
useTLS: true,
249+
credentials: channelCredentials,
230250
metadataGenerator,
231-
this._logger,
232-
this._shutdownTimeoutMs,
233-
);
251+
logger: this._logger,
252+
shutdownTimeoutMs: this._shutdownTimeoutMs,
253+
versioning: this._versioning,
254+
});
234255

235256
// Register all orchestrators
236257
for (const { name, fn } of this._orchestrators) {

packages/durabletask-js/src/client/client.ts

Lines changed: 70 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ import { OrchestrationStatus, toProtobuf, fromProtobuf } from "../orchestration/
1717
import { TimeoutError } from "../exception/timeout-error";
1818
import { PurgeResult } from "../orchestration/orchestration-purge-result";
1919
import { PurgeInstanceCriteria } from "../orchestration/orchestration-purge-criteria";
20+
import { PurgeInstanceOptions } from "../orchestration/orchestration-purge-options";
21+
import { TerminateInstanceOptions, isTerminateInstanceOptions } from "../orchestration/orchestration-terminate-options";
2022
import { callWithMetadata, MetadataGenerator } from "../utils/grpc-helper.util";
2123
import { OrchestrationQuery, ListInstanceIdsOptions, DEFAULT_PAGE_SIZE } from "../orchestration/orchestration-query";
2224
import { Page, AsyncPageable, createAsyncPageable } from "../orchestration/page";
@@ -47,12 +49,19 @@ export interface TaskHubGrpcClientOptions {
4749
metadataGenerator?: MetadataGenerator;
4850
/** Optional logger instance. Defaults to ConsoleLogger. */
4951
logger?: Logger;
52+
/**
53+
* The default version to use when starting new orchestrations without an explicit version.
54+
* If specified, this will be used as the version for orchestrations that don't provide
55+
* their own version in StartOrchestrationOptions.
56+
*/
57+
defaultVersion?: string;
5058
}
5159

5260
export class TaskHubGrpcClient {
5361
private _stub: stubs.TaskHubSidecarServiceClient;
5462
private _metadataGenerator?: MetadataGenerator;
5563
private _logger: Logger;
64+
private _defaultVersion?: string;
5665

5766
/**
5867
* Creates a new TaskHubGrpcClient instance.
@@ -95,6 +104,7 @@ export class TaskHubGrpcClient {
95104
let resolvedCredentials: grpc.ChannelCredentials | undefined;
96105
let resolvedMetadataGenerator: MetadataGenerator | undefined;
97106
let resolvedLogger: Logger | undefined;
107+
let resolvedDefaultVersion: string | undefined;
98108

99109
if (typeof hostAddressOrOptions === "object" && hostAddressOrOptions !== null) {
100110
// Options object constructor
@@ -104,6 +114,7 @@ export class TaskHubGrpcClient {
104114
resolvedCredentials = hostAddressOrOptions.credentials;
105115
resolvedMetadataGenerator = hostAddressOrOptions.metadataGenerator;
106116
resolvedLogger = hostAddressOrOptions.logger;
117+
resolvedDefaultVersion = hostAddressOrOptions.defaultVersion;
107118
} else {
108119
// Deprecated positional parameters constructor
109120
resolvedHostAddress = hostAddressOrOptions;
@@ -117,6 +128,7 @@ export class TaskHubGrpcClient {
117128
this._stub = new GrpcClient(resolvedHostAddress, resolvedOptions, resolvedUseTLS, resolvedCredentials).stub;
118129
this._metadataGenerator = resolvedMetadataGenerator;
119130
this._logger = resolvedLogger ?? new ConsoleLogger();
131+
this._defaultVersion = resolvedDefaultVersion;
120132
}
121133

122134
async stop(): Promise<void> {
@@ -177,6 +189,13 @@ export class TaskHubGrpcClient {
177189
typeof instanceIdOrOptions === "string" || instanceIdOrOptions === undefined
178190
? undefined
179191
: instanceIdOrOptions.tags;
192+
const version =
193+
typeof instanceIdOrOptions === "string" || instanceIdOrOptions === undefined
194+
? undefined
195+
: instanceIdOrOptions.version;
196+
197+
// Use provided version, or fall back to client's default version
198+
const effectiveVersion = version ?? this._defaultVersion;
180199

181200
const req = new pb.CreateInstanceRequest();
182201
req.setName(name);
@@ -191,9 +210,15 @@ export class TaskHubGrpcClient {
191210
req.setInput(i);
192211
req.setScheduledstarttimestamp(ts);
193212

213+
if (effectiveVersion) {
214+
const v = new StringValue();
215+
v.setValue(effectiveVersion);
216+
req.setVersion(v);
217+
}
218+
194219
populateTagsMap(req.getTagsMap(), tags);
195220

196-
this._logger.info(`Starting new ${name} instance with ID = ${req.getInstanceid()}`);
221+
this._logger.info(`Starting new ${name} instance with ID = ${req.getInstanceid()}${effectiveVersion ? ` (version: ${effectiveVersion})` : ""}`);
197222

198223
const res = await callWithMetadata<pb.CreateInstanceRequest, pb.CreateInstanceResponse>(
199224
this._stub.startInstance.bind(this._stub),
@@ -364,18 +389,49 @@ export class TaskHubGrpcClient {
364389
* Terminates the orchestrator associated with the provided instance id.
365390
*
366391
* @param {string} instanceId - orchestrator instance id to terminate.
367-
* @param {any} output - The optional output to set for the terminated orchestrator instance.
392+
* @param {any | TerminateInstanceOptions} outputOrOptions - The optional output to set for the terminated orchestrator instance,
393+
* or a TerminateInstanceOptions object created with `terminateOptions()` that can include both
394+
* output and recursive termination settings.
395+
*
396+
* @example
397+
* ```typescript
398+
* // Simple termination with output
399+
* await client.terminateOrchestration(instanceId, { reason: "cancelled" });
400+
*
401+
* // Recursive termination with options (use terminateOptions helper)
402+
* import { terminateOptions } from "@microsoft/durabletask-js";
403+
* await client.terminateOrchestration(instanceId, terminateOptions({
404+
* output: { reason: "cancelled" },
405+
* recursive: true
406+
* }));
407+
* ```
368408
*/
369-
async terminateOrchestration(instanceId: string, output: any = null): Promise<void> {
409+
async terminateOrchestration(
410+
instanceId: string,
411+
outputOrOptions: any | TerminateInstanceOptions = null,
412+
): Promise<void> {
370413
const req = new pb.TerminateRequest();
371414
req.setInstanceid(instanceId);
372415

416+
let output: any = null;
417+
let recursive = false;
418+
419+
// Use type guard to safely detect TerminateInstanceOptions
420+
// This avoids false positives when user output happens to have 'recursive' or 'output' properties
421+
if (isTerminateInstanceOptions(outputOrOptions)) {
422+
output = outputOrOptions.output ?? null;
423+
recursive = outputOrOptions.recursive ?? false;
424+
} else {
425+
output = outputOrOptions;
426+
}
427+
373428
const i = new StringValue();
374429
i.setValue(JSON.stringify(output));
375430

376431
req.setOutput(i);
432+
req.setRecursive(recursive);
377433

378-
this._logger.info(`Terminating '${instanceId}'`);
434+
this._logger.info(`Terminating '${instanceId}'${recursive ? ' (recursive)' : ''}`);
379435

380436
await callWithMetadata<pb.TerminateRequest, pb.TerminateResponse>(
381437
this._stub.terminateInstance.bind(this._stub),
@@ -537,16 +593,21 @@ export class TaskHubGrpcClient {
537593
*
538594
* @param value - The unique ID of the orchestration instance to purge or orchestration instance filter criteria used
539595
* to determine which instances to purge.
596+
* @param options - Optional options to control the purge behavior, such as recursive purging of sub-orchestrations.
540597
* @returns A Promise that resolves to a {@link PurgeResult} or `undefined` if the purge operation was not successful.
541598
*/
542-
async purgeOrchestration(value: string | PurgeInstanceCriteria): Promise<PurgeResult | undefined> {
599+
async purgeOrchestration(
600+
value: string | PurgeInstanceCriteria,
601+
options?: PurgeInstanceOptions,
602+
): Promise<PurgeResult | undefined> {
543603
let res;
544-
if (typeof value === `string`) {
604+
if (typeof value === "string") {
545605
const instanceId = value;
546606
const req = new pb.PurgeInstancesRequest();
547607
req.setInstanceid(instanceId);
608+
req.setRecursive(options?.recursive ?? false);
548609

549-
this._logger.info(`Purging Instance '${instanceId}'`);
610+
this._logger.info(`Purging Instance '${instanceId}'${options?.recursive ? ' (recursive)' : ''}`);
550611

551612
res = await callWithMetadata<pb.PurgeInstancesRequest, pb.PurgeInstancesResponse>(
552613
this._stub.purgeInstances.bind(this._stub),
@@ -574,9 +635,10 @@ export class TaskHubGrpcClient {
574635
filter.addRuntimestatus(toProtobuf(status));
575636
}
576637
req.setPurgeinstancefilter(filter);
638+
req.setRecursive(options?.recursive ?? false);
577639
const timeout = purgeInstanceCriteria.getTimeout();
578640

579-
this._logger.info("Purging Instance using purging criteria");
641+
this._logger.info(`Purging Instances using purging criteria${options?.recursive ? " (recursive)" : ""}`);
580642

581643
const callPromise = callWithMetadata<pb.PurgeInstancesRequest, pb.PurgeInstancesResponse>(
582644
this._stub.purgeInstances.bind(this._stub),

packages/durabletask-js/src/index.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,16 @@
44
// Client and Worker
55
export { TaskHubGrpcClient, TaskHubGrpcClientOptions, MetadataGenerator } from "./client/client";
66
export { TaskHubGrpcWorker, TaskHubGrpcWorkerOptions } from "./worker/task-hub-grpc-worker";
7+
export { VersioningOptions, VersionMatchStrategy, VersionFailureStrategy } from "./worker/versioning-options";
78

89
// Contexts
910
export { OrchestrationContext } from "./task/context/orchestration-context";
1011
export { ActivityContext } from "./task/context/activity-context";
1112

1213
// Orchestration types and utilities
1314
export { PurgeInstanceCriteria } from "./orchestration/orchestration-purge-criteria";
15+
export { PurgeInstanceOptions } from "./orchestration/orchestration-purge-options";
16+
export { TerminateInstanceOptions, terminateOptions, isTerminateInstanceOptions, TERMINATE_OPTIONS_SYMBOL } from "./orchestration/orchestration-terminate-options";
1417
export { OrchestrationStatus } from "./orchestration/enum/orchestration-status.enum";
1518
export { OrchestrationState } from "./orchestration/orchestration-state";
1619

@@ -85,3 +88,7 @@ export { ParentOrchestrationInstance } from "./types/parent-orchestration-instan
8588

8689
// Logger
8790
export { Logger, ConsoleLogger, NoOpLogger } from "./types/logger.type";
91+
export { ReplaySafeLogger, ReplayContext } from "./types/replay-safe-logger";
92+
93+
// Versioning utilities
94+
export { compareVersions } from "./utils/versioning.util";
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
/**
5+
* Options for purging orchestration instances.
6+
*/
7+
export interface PurgeInstanceOptions {
8+
/**
9+
* Whether to recursively purge sub-orchestrations as well.
10+
* When true, all child orchestrations spawned by the target orchestration
11+
* will also be purged.
12+
*
13+
* Note: Recursive purging may not be supported by all backend implementations.
14+
*
15+
* @default false
16+
*/
17+
recursive?: boolean;
18+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
/**
5+
* Symbol used to identify TerminateInstanceOptions objects.
6+
* This prevents confusion when user output happens to have 'recursive' or 'output' properties.
7+
*/
8+
export const TERMINATE_OPTIONS_SYMBOL = Symbol.for("durabletask.TerminateInstanceOptions");
9+
10+
/**
11+
* Options for terminating orchestration instances.
12+
*/
13+
export interface TerminateInstanceOptions {
14+
/**
15+
* Internal marker to identify this as a TerminateInstanceOptions object.
16+
* @internal
17+
*/
18+
readonly [TERMINATE_OPTIONS_SYMBOL]?: true;
19+
20+
/**
21+
* Whether to recursively terminate sub-orchestrations as well.
22+
* When true, all child orchestrations spawned by the target orchestration
23+
* will also be terminated.
24+
*
25+
* @default false
26+
*/
27+
recursive?: boolean;
28+
29+
/**
30+
* The optional output to set for the terminated orchestrator instance.
31+
*/
32+
output?: any;
33+
}
34+
35+
/**
36+
* Creates a TerminateInstanceOptions object with proper type identification.
37+
* Use this function instead of creating plain objects to ensure correct behavior.
38+
*
39+
* @param options - The terminate options
40+
* @returns A properly typed TerminateInstanceOptions object
41+
*
42+
* @example
43+
* ```typescript
44+
* // Terminate with recursive option
45+
* await client.terminateOrchestration(instanceId, terminateOptions({ recursive: true }));
46+
*
47+
* // Terminate with output and recursive
48+
* await client.terminateOrchestration(instanceId, terminateOptions({
49+
* output: { reason: "cancelled by user" },
50+
* recursive: true
51+
* }));
52+
* ```
53+
*/
54+
export function terminateOptions(options: Omit<TerminateInstanceOptions, typeof TERMINATE_OPTIONS_SYMBOL>): TerminateInstanceOptions {
55+
return {
56+
...options,
57+
[TERMINATE_OPTIONS_SYMBOL]: true as const,
58+
};
59+
}
60+
61+
/**
62+
* Type guard to check if a value is a TerminateInstanceOptions object.
63+
* @internal
64+
*/
65+
export function isTerminateInstanceOptions(value: unknown): value is TerminateInstanceOptions {
66+
return (
67+
value !== null &&
68+
typeof value === "object" &&
69+
TERMINATE_OPTIONS_SYMBOL in value &&
70+
(value as TerminateInstanceOptions)[TERMINATE_OPTIONS_SYMBOL] === true
71+
);
72+
}

0 commit comments

Comments
 (0)