Skip to content

Commit f60b42d

Browse files
soccermaxMax Gruenfelder
andauthored
Event Callbacks improvements (#453)
* docu and triggerEvent * fixes * linter * fix and add more tests * docu update * wip * fix tests * finish up * changelog and fix tests * fix tests --------- Co-authored-by: Max Gruenfelder <maximilian.gruenfelder@scheer-group.com>
1 parent 012ab5e commit f60b42d

9 files changed

Lines changed: 895 additions & 126 deletions

File tree

CHANGELOG.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,20 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
66
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
77

8-
## v2.1.0 - 2025-03-XX
8+
## v2.1.0 - 2025-04-06
99

1010
### Added
1111

12-
- [CAP Queue] Add support for defining successor and failed events of event handlers. See documentation for how to use it.
12+
- [CAP Queue] Add support for event chaining: register `#succeeded` and `#failed` successor handlers that are automatically triggered based on the outcome of an event handler. See [documentation](https://cap-js-community.github.io/event-queue/use-as-cap-outbox/#event-chaining).
13+
- [CAP Queue] Add `#done` unconditional successor — fires after every event regardless of success or failure, analogous to a `finally` block. Useful for cleanup that must always run (releasing locks, audit events, etc.).
14+
- [CAP Queue] Successor handlers (`#succeeded`, `#failed`, `#done`) can be configured independently in the `events` section of the service's `queued` configuration, including event-specific variants (e.g. `orderCreated/#succeeded`).
15+
- [Telemetry] Add opt-in event queue metrics (`collectEventQueueMetrics`). When enabled together with Redis and an OpenTelemetry MeterProvider, the module publishes `cap.event_queue.jobs.pending`, `cap.event_queue.jobs.in_progress`, and `cap.event_queue.stats.refresh_age` as Observable Gauges, broken down by namespace. See [documentation](https://cap-js-community.github.io/event-queue/telemetry/).
1316

1417
## v2.0.5 - 2025-03-10
1518

1619
### Added
1720

1821
- [CAP Queue] Allow to propagate cds.context properties (e.g. features). This can be configured per event (`cds.env.requires[<SERVICE>].queued.propagateContextProperties = ["features"]`)
19-
- [Telemetry] Add opt-in event queue metrics (`collectEventQueueMetrics`). When enabled together with Redis and an OpenTelemetry MeterProvider, the module publishes `cap.event_queue.jobs.pending`, `cap.event_queue.jobs.in_progress`, and `cap.event_queue.stats.refresh_age` as Observable Gauges, broken down by namespace. See [documentation](https://cap-js-community.github.io/event-queue/telemetry/).
2022

2123
### Fixed
2224

docs/setup/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ The table includes the parameter name, a description of its purpose, and the def
7777
| cleanupLocksAndEventsForDev | Deletes all semantic locks and sets all events that are in progress to error during server start. This is used to clean up leftovers from server crashes or restarts during processing. | true | no |
7878
| insertEventsBeforeCommit | If enabled, this feature allows events (including those for queued services) to be inserted in bulk using the before commit handler. This is performed to improve performance by mass inserting events instead of single insert operations. This can be disabled by the parameter `skipInsertEventsBeforeCommit` in the function publishEvent. | true | yes |
7979
| enableTelemetry | If enabled, OpenTelemetry traces for all event-queue activities are written. An OpenTelemetry exporter must be configured. | true | yes |
80+
| collectEventQueueMetrics | If enabled, publishes `cap.event_queue.jobs.pending`, `cap.event_queue.jobs.in_progress`, and `cap.event_queue.stats.refresh_age` as OpenTelemetry Observable Gauges. Requires `enableTelemetry: true`, Redis, and an OpenTelemetry MeterProvider. See [documentation](https://cap-js-community.github.io/event-queue/telemetry/). | false | no |
8081
| cronTimezone | Determines whether to apply the central `cronTimezone` setting for scheduling events. If set to `true` and the property `utc` is not enabled for the given event, it will use the defined `cronTimezone`. If set to `false`, the event will use UTC or the server's local time, based on the `utc` setting. | null | yes |
8182
| randomOffsetPeriodicEvents | Specifies the default maximum random offset (in seconds) applied to all periodic events to help stagger their start times and reduce simultaneous execution spikes. This setting is especially useful in multi-tenant environments where many events may be triggered at the same time. The actual offset for each event will be a random number between `0` and this value. Can be overridden per event using the `randomOffset` property. | null | yes |
8283
| disableRedis | Whether or not to disable Redis. | false | no |

docs/telemetry/index.md

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -148,12 +148,14 @@ Or via `cds.env` (e.g. in `package.json`):
148148

149149
The full set of conditions required for metrics to be active:
150150

151-
| Condition | Required value | Notes |
152-
| -------------------------- | -------------- | ---------------------------------------------------- |
153-
| `collectEventQueueMetrics` | `true` | Master switch; default `false` |
154-
| `enableTelemetry` | `true` | Default `true`; global telemetry kill-switch |
155-
| Redis enabled | yes | Stats are stored in Redis hashes |
156-
| OpenTelemetry metrics SDK | present | `@opentelemetry/api` with a configured MeterProvider |
151+
| Condition | Required value | Notes |
152+
| -------------------------- | -------------- | -------------------------------------------------------------------------------------------- |
153+
| `collectEventQueueMetrics` | `true` | Master switch; default `false` |
154+
| `enableTelemetry` | `true` | Default `true`; global telemetry kill-switch |
155+
| Redis enabled | yes | Stats are stored in Redis hashes |
156+
| `registerAsEventProcessor` | `true` | Metrics are only initialised on instances that process events |
157+
| CF instance index | `0` | Only the first application instance registers gauges to avoid duplicate metric registrations |
158+
| OpenTelemetry metrics SDK | present | `@opentelemetry/api` with a configured MeterProvider |
157159

158160
If any condition is not met, `initMetrics()` returns immediately and no gauges are registered.
159161

docs/use-as-cap-outbox/index.md

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,3 +409,207 @@ this.on("myBatchEvent", (req) => {
409409
}));
410410
});
411411
```
412+
413+
## Event Chaining
414+
415+
The event-queue supports chaining event handlers based on outcome. When a handler completes, the event-queue
416+
automatically publishes designated **successor events** — allowing you to model multi-step asynchronous workflows
417+
with clear separation of concerns.
418+
419+
### How It Works
420+
421+
Register successor handlers using the special suffixes `#succeeded`, `#failed`, and `#done`:
422+
423+
- **`<event>/#succeeded`** — triggered when the handler for `<event>` returns `EventProcessingStatus.Done`
424+
- **`<event>/#failed`** — triggered when the handler returns `EventProcessingStatus.Error` or throws
425+
- **`<event>/#done`** — triggered unconditionally after `<event>` completes, regardless of outcome (analogous to `finally`)
426+
427+
You can also register **generic** successor handlers (`#succeeded` / `#failed` / `#done`) that apply to every event
428+
in the service that does not have a dedicated successor.
429+
430+
```javascript
431+
class MyService extends cds.Service {
432+
async init() {
433+
await super.init();
434+
435+
// Primary handler
436+
this.on("orderCreated", async (req) => {
437+
// ... business logic
438+
return EventProcessingStatus.Done;
439+
});
440+
441+
// Runs on success — event-specific
442+
this.on("orderCreated/#succeeded", async (req) => {
443+
// req.eventQueue.triggerEvent is available here (see below)
444+
});
445+
446+
// Runs on failure — event-specific
447+
this.on("orderCreated/#failed", async (req) => {
448+
// req.data.error contains the serialised error message
449+
});
450+
451+
// Runs unconditionally — event-specific
452+
this.on("orderCreated/#done", async (req) => {
453+
// always runs; req.data.error is set when the parent failed
454+
});
455+
456+
// Generic fallbacks — run for any event without a dedicated handler
457+
this.on("#succeeded", async (req) => {
458+
/* ... */
459+
});
460+
this.on("#failed", async (req) => {
461+
/* ... */
462+
});
463+
this.on("#done", async (req) => {
464+
/* ... */
465+
});
466+
}
467+
}
468+
```
469+
470+
### Passing Data to the Successor
471+
472+
Return a `nextData` property from the primary handler to forward arbitrary data to the successor's `req.data`:
473+
474+
```javascript
475+
this.on("orderCreated", async (req) => {
476+
const orderId = await createOrder(req.data);
477+
return {
478+
status: EventProcessingStatus.Done,
479+
nextData: { orderId }, // available as req.data.orderId in the successor
480+
};
481+
});
482+
```
483+
484+
`nextData` is forwarded to all active successors (`#succeeded`, `#failed`, `#done`). For `#done` this is useful when cleanup logic needs to act on data produced by the primary handler.
485+
486+
### Accessing the Trigger Event Context (`req.eventQueue.triggerEvent`)
487+
488+
When a successor handler is invoked, `req.eventQueue.triggerEvent` is populated with context from the parent event.
489+
This gives the successor full visibility into what happened in the previous step.
490+
491+
| Field | Type | Description |
492+
| --------------------- | ------ | ---------------------------------------------------------------------------------- |
493+
| `triggerEventResult` | any | The raw return value of the parent handler (e.g. `{ status: 2, nextData: {...} }`) |
494+
| `ID` | string | UUID of the parent queue entry |
495+
| `status` | number | Status of the parent queue entry at processing time |
496+
| `payload` | any | Payload of the parent queue entry |
497+
| `referenceEntity` | string | Reference entity of the parent event (if set) |
498+
| `referenceEntityKey` | string | Reference entity key of the parent event (if set) |
499+
| `lastAttempTimestamp` | string | Timestamp of the last processing attempt of the parent event |
500+
501+
`req.eventQueue.triggerEvent` is set in **`#succeeded`**, **`#failed`**, and **`#done`** handlers, but only when the
502+
event was processed as a single entry (no clustering). When the parent handler **threw an exception**,
503+
`triggerEventResult` will be `undefined` — the queue entry fields (`ID`, `status`, `payload`, etc.) are still present.
504+
505+
```javascript
506+
this.on("orderCreated/#succeeded", async (req) => {
507+
const { triggerEventResult, ID } = req.eventQueue.triggerEvent;
508+
509+
// triggerEventResult is exactly what the parent handler returned
510+
console.log(triggerEventResult);
511+
// → { status: 2, nextData: { orderId: "..." } }
512+
513+
// ID is the UUID of the parent queue entry
514+
console.log(ID); // → "3f2e1a..."
515+
});
516+
```
517+
518+
### Failure Handling and Error Propagation
519+
520+
When a primary handler throws or returns `EventProcessingStatus.Error`, the `#failed` and `#done` successors both
521+
receive the serialised error message in `req.data.error`:
522+
523+
```javascript
524+
this.on("orderCreated/#failed", async (req) => {
525+
console.log(req.data.error); // → "Error: Payment gateway timeout"
526+
// compensate, notify, etc.
527+
return EventProcessingStatus.Done;
528+
});
529+
```
530+
531+
### Unconditional Follow-up (`#done`)
532+
533+
The `#done` handler fires after every event, regardless of whether the primary handler succeeded, failed, or threw.
534+
It is the equivalent of a `finally` block and is intended for cleanup that must always run:
535+
536+
- Releasing locks or counters
537+
- Emitting audit events
538+
- Notifying monitoring systems
539+
540+
`req.data.error` is populated when the parent failed (identical to `#failed`). `req.eventQueue.triggerEvent` is
541+
available under the same rules as `#succeeded` and `#failed` — use `triggerEventResult.status` to distinguish
542+
outcomes, or check `triggerEventResult === undefined` to detect an unhandled exception.
543+
544+
Both a specific handler (`<event>/#done`) and a generic handler (`#done`) are supported. The specific handler takes
545+
priority over the generic one.
546+
547+
### Stopping the Chain
548+
549+
`#failed` and `#done` are always terminal — the event-queue will **not** trigger any further successors after them,
550+
even if handlers are registered.
551+
552+
`#succeeded` is not fully terminal: if a `#succeeded` handler returns `EventProcessingStatus.Error`, the
553+
event-queue will trigger `#failed` for that event (the chain continues on the failure path). However, `#succeeded`
554+
never triggers `#done` a second time.
555+
556+
### Service-Specific vs. Generic Handlers
557+
558+
| Pattern | Applies to |
559+
| -------------------- | ------------------------------------- |
560+
| `<event>/#succeeded` | Only `<event>` |
561+
| `<event>/#failed` | Only `<event>` |
562+
| `<event>/#done` | Only `<event>` |
563+
| `#succeeded` | All events without a specific handler |
564+
| `#failed` | All events without a specific handler |
565+
| `#done` | All events without a specific handler |
566+
567+
Event-specific handlers take priority over generic ones.
568+
569+
### Configuring Successor Handlers
570+
571+
Successor handlers (`#succeeded`, `#failed`, `#done`) can be configured independently in the `events` section of the
572+
service's `queued` configuration, using the same keys as the handler names.
573+
574+
**Generic successor config** — applies to all handlers of that type across the service:
575+
576+
```json
577+
{
578+
"cds": {
579+
"requires": {
580+
"my-service": {
581+
"queued": {
582+
"kind": "persistent-queue",
583+
"events": {
584+
"#succeeded": { "propagateHeaders": ["x-correlation-id"] },
585+
"#failed": { "retryAttempts": 0 },
586+
"#done": { "propagateHeaders": ["x-correlation-id"] }
587+
}
588+
}
589+
}
590+
}
591+
}
592+
}
593+
```
594+
595+
**Event-specific successor config** — applies only to the successor of a particular action:
596+
597+
```json
598+
{
599+
"cds": {
600+
"requires": {
601+
"my-service": {
602+
"queued": {
603+
"kind": "persistent-queue",
604+
"events": {
605+
"orderCreated/#succeeded": { "propagateHeaders": ["x-correlation-id"] }
606+
}
607+
}
608+
}
609+
}
610+
}
611+
}
612+
```
613+
614+
When both a generic and an event-specific config exist for the same successor type, the event-specific config takes
615+
precedence. If neither is set, the successor inherits the configuration of its parent action.

src/config.js

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ const UTC_DEFAULT = false;
3030
const USE_CRON_TZ_DEFAULT = true;
3131
const SAGA_SUCCESS = "#succeeded";
3232
const SAGA_FAILED = "#failed";
33+
const SAGA_DONE = "#done";
3334

3435
const BASE_TABLES = {
3536
EVENT: "sap.eventqueue.Event",
@@ -387,19 +388,20 @@ class Config {
387388
result.adHoc
388389
);
389390
result.adHoc[key] = specificEventConfig;
390-
const sagaSuccessKey = [fnName, SAGA_SUCCESS].join("/");
391-
if (config.events[sagaSuccessKey]) {
392-
const [sagaKey, sagaSpecificEventConfig] = this.addCAPOutboxEventSpecificAction(
393-
srvConfig,
394-
name,
395-
fnName,
396-
result.adHoc
397-
);
398-
result.adHoc[sagaKey] = sagaSpecificEventConfig;
399-
} else {
400-
const sagaConfig = { ...specificEventConfig };
401-
sagaConfig.subType = [sagaConfig.subType, SAGA_SUCCESS].join("/");
402-
result.adHoc[[key, SAGA_SUCCESS].join("/")] = sagaConfig;
391+
for (const sagaSuffix of [SAGA_SUCCESS, SAGA_DONE, SAGA_FAILED]) {
392+
if (config.events[sagaSuffix]) {
393+
const [adHocKey, sagaSpecificEventConfig] = this.addCAPOutboxEventSpecificAction(
394+
srvConfig,
395+
name,
396+
fnName,
397+
result.adHoc
398+
);
399+
result.adHoc[adHocKey] = sagaSpecificEventConfig;
400+
} else {
401+
const sagaConfig = { ...specificEventConfig };
402+
sagaConfig.subType = [sagaConfig.subType, sagaSuffix].join("/");
403+
result.adHoc[[key, sagaSuffix].join("/")] = sagaConfig;
404+
}
403405
}
404406
}
405407
}
@@ -434,7 +436,7 @@ class Config {
434436
}
435437

436438
const [withoutSaga, sagaSuffix] = action.split("/");
437-
if ([SAGA_FAILED, SAGA_SUCCESS].includes(sagaSuffix)) {
439+
if ([SAGA_FAILED, SAGA_SUCCESS, SAGA_DONE].includes(sagaSuffix)) {
438440
if (config?.events?.[withoutSaga]) {
439441
return this.#mixCAPPropertyNamesWithEventQueueNames(config.events[withoutSaga]);
440442
}

0 commit comments

Comments
 (0)