Skip to content

feat: propagate task cancellation to in-progress activities#419

Open
GabriellePoncey wants to merge 1 commit into
mainfrom
feat/PLAT-583/cancel-lifecycle-operations
Open

feat: propagate task cancellation to in-progress activities#419
GabriellePoncey wants to merge 1 commit into
mainfrom
feat/PLAT-583/cancel-lifecycle-operations

Conversation

@GabriellePoncey

Copy link
Copy Markdown
Contributor

When a task is cancelled, ApplyEvent now detects it via an etcd watch and cancels its own context, interrupting long-running operations rather than running to completion. Operations that respect context cancellation (eg Patroni HTTP calls) are interrupted promptly while others complete their current step before the activity exits.

Changes

  • Add task.Watcher: subscribes to a task key in etcd and closes a
    Done channel when the task reaches a terminal state or is deleted;
    Err returns the reason (ErrTaskCanceled, ErrTaskFailed, or nil).
  • Add task.Service.NewWatcher and task.TaskStore.Watch to create
    watchers from the service layer.
  • Add watcherRegistry to task.Service: concurrent ApplyEvent calls
    watching the same task share one etcd stream rather than each opening
    their own; events are fanned out to all subscribers.
  • Wire the watcher into ApplyEvent: a goroutine cancels the activity
    context when watcher.Done() fires. Watcher setup failure is
    non-fatal — the activity continues without interruption support.
  • Treat StatusCanceling the same as StatusCanceled so activities are
    interrupted during the canceling phase, not only after it completes.

PLAT-583

When a task is cancelled, ApplyEvent now detects it via an etcd watch
and cancels its own context, interrupting long-running operations rather than running to completion.

PLAT-583
@coderabbitai

coderabbitai Bot commented Jun 30, 2026

Copy link
Copy Markdown

Review Change Stack

📝 Walkthrough

Walkthrough

Adds a task terminal-state watcher subsystem: TaskStore.Watch provides a storage-level etcd watch op; watcher.go implements Watcher (subscriber API) and sharedWatcher/watcherRegistry for multiplexed fan-out; Service.NewWatcher exposes registry acquisition; and ApplyEvent uses a watcher goroutine to cancel its derived context when a task reaches a terminal state.

Changes

Task Terminal-State Watcher

Layer / File(s) Summary
Storage watch op and error sentinels
server/internal/task/task_store.go, server/internal/task/watcher.go
TaskStore.Watch computes the task key and returns a storage.NewWatchOp[*StoredTask]. ErrTaskCanceled and ErrTaskFailed sentinel errors are defined.
Watcher subscriber API and sharedWatcher fan-out
server/internal/task/watcher.go
Implements Watcher with Done, Err, Close, and Error channels; finish once-logic; sharedWatcher fan-out interpreting etcd events into terminal outcomes; propagateErrors forwarding watch errors; release/shutdown cleanup; and watcherRegistry maintaining one shared watch per task UUID.
Service wiring
server/internal/task/service.go
Adds registry *watcherRegistry to Service, initializes it in NewService, and exposes NewWatcher delegating to registry.acquire.
ApplyEvent context cancellation
server/internal/workflows/activities/apply_event.go
ApplyEvent wraps its context with context.WithCancel and, for non-nil TaskID, spawns a goroutine that cancels the context when the watcher's Done channel closes.

Poem

🐇 A watcher hops in, ears perked for the end,
When tasks go terminal, signals it'll send.
One shared stream per task, fans out to the rest,
Context gets canceled — the rabbit knows best!
Done channels close, errors fan far and wide,
The registry remembers, with only one guide. 🌿

🚥 Pre-merge checks | ✅ 3 | ❌ 2

❌ Failed checks (2 warnings)

Check name Status Explanation Resolution
Description check ⚠️ Warning The description covers the summary and changes, but it omits the template's Testing, Checklist, and Notes for Reviewers sections. Add the missing Summary heading, Testing steps, Checklist items, and any reviewer notes required by the template.
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title concisely describes the main change: propagating task cancellation into in-progress activities.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/PLAT-583/cancel-lifecycle-operations

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@codacy-production

Copy link
Copy Markdown

Up to standards ✅

🟢 Issues 1 medium

Results:
1 new issue

Category Results
Complexity 1 medium

View in Codacy

🟢 Metrics 35 complexity · 0 duplication

Metric Results
Complexity 35
Duplication 0

View in Codacy

NEW Get contextual insights on your PRs based on Codacy's metrics, along with PR and Jira context, without leaving GitHub. Enable AI reviewer
TIP This summary will be updated as you push new changes.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
server/internal/workflows/activities/apply_event.go (1)

71-73: 🩺 Stability & Availability | 🔵 Trivial | ⚡ Quick win

Log watcher stream failures before disabling interruption support.

Setup failures are warned on Line 64, but runtime watcher failures silently stop cancellation propagation. Capture the error value here so operators can diagnose why task cancellation no longer interrupts this activity.

Proposed adjustment
-				case <-watcher.Error():
+				case err := <-watcher.Error():
 					// Watch stream died; stop monitoring without cancelling
 					// the activity — we don't know the task's current state.
+					activity.Logger(ctx).Warn("task watcher stopped; activity won't be interrupted on task cancellation", "error", err)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@server/internal/workflows/activities/apply_event.go` around lines 71 - 73,
The watcher error path in apply_event.go currently stops interruption support
without recording why, so update the watch loop around watcher.Error() in
applyEventActivity to capture the received error and log it before returning or
breaking. Reuse the same logging pattern used for watcher setup failures, and
include the actual error value plus enough context to identify the activity/task
so operators can diagnose runtime watcher stream failures.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@server/internal/task/watcher.go`:
- Around line 142-158: The shared watcher error handling in propagateErrors
leaves a dead sharedWatcher in registry.entries after watchOp.Error() fires, so
later acquire/newSubscription calls can attach to a stale watcher and miss
Done/Error. Update sharedWatcher/registry teardown so stream errors are
persisted on sharedWatcher and the registry entry is removed or marked failed
immediately after the error is observed, and make newSubscription populate errCh
from that stored error for late subscribers; also ensure propagateErrors
continues to use the sharedWatcher state consistently for both current and
future subscribers.
- Around line 105-114: The shared watcher completion state is being overwritten
on subsequent terminal events, so once finishAll marks sharedWatcher as terminal
it should not update terminalErr again. Add an early return at the start of
sharedWatcher.finishAll when sw.terminal is already true, and keep the existing
subscriber fan-out only for the first terminal transition so new subscribers
don’t see a later status replace the original canceling state.

---

Nitpick comments:
In `@server/internal/workflows/activities/apply_event.go`:
- Around line 71-73: The watcher error path in apply_event.go currently stops
interruption support without recording why, so update the watch loop around
watcher.Error() in applyEventActivity to capture the received error and log it
before returning or breaking. Reuse the same logging pattern used for watcher
setup failures, and include the actual error value plus enough context to
identify the activity/task so operators can diagnose runtime watcher stream
failures.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 0d012b0d-53bb-4144-acce-773016814c0c

📥 Commits

Reviewing files that changed from the base of the PR and between a60c48c and c2f7cba.

📒 Files selected for processing (4)
  • server/internal/task/service.go
  • server/internal/task/task_store.go
  • server/internal/task/watcher.go
  • server/internal/workflows/activities/apply_event.go

Comment on lines +105 to +114
func (sw *sharedWatcher) finishAll(err error) {
sw.mu.Lock()
sw.terminal = true
sw.terminalErr = err
subs := make([]*Watcher, len(sw.subscribers))
copy(subs, sw.subscribers)
sw.mu.Unlock()
for _, sub := range subs {
sub.finish(err)
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎯 Functional Correctness | 🟠 Major | ⚡ Quick win

Make terminal completion sticky.

finishAll overwrites terminalErr on later events, so a task seen as StatusCanceling can later be reported as failed/completed/deleted to new subscribers while the same shared watcher remains registered. Return early once sw.terminal is already set.

Proposed fix
 func (sw *sharedWatcher) finishAll(err error) {
 	sw.mu.Lock()
+	if sw.terminal {
+		sw.mu.Unlock()
+		return
+	}
 	sw.terminal = true
 	sw.terminalErr = err
 	subs := make([]*Watcher, len(sw.subscribers))
 	copy(subs, sw.subscribers)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (sw *sharedWatcher) finishAll(err error) {
sw.mu.Lock()
sw.terminal = true
sw.terminalErr = err
subs := make([]*Watcher, len(sw.subscribers))
copy(subs, sw.subscribers)
sw.mu.Unlock()
for _, sub := range subs {
sub.finish(err)
}
func (sw *sharedWatcher) finishAll(err error) {
sw.mu.Lock()
if sw.terminal {
sw.mu.Unlock()
return
}
sw.terminal = true
sw.terminalErr = err
subs := make([]*Watcher, len(sw.subscribers))
copy(subs, sw.subscribers)
sw.mu.Unlock()
for _, sub := range subs {
sub.finish(err)
}
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@server/internal/task/watcher.go` around lines 105 - 114, The shared watcher
completion state is being overwritten on subsequent terminal events, so once
finishAll marks sharedWatcher as terminal it should not update terminalErr
again. Add an early return at the start of sharedWatcher.finishAll when
sw.terminal is already true, and keep the existing subscriber fan-out only for
the first terminal transition so new subscribers don’t see a later status
replace the original canceling state.

Comment on lines +142 to +158
func (sw *sharedWatcher) propagateErrors() {
select {
case <-sw.shutdownCh:
case err := <-sw.watchOp.Error():
if errors.Is(err, context.Canceled) {
return
}
sw.mu.Lock()
subs := make([]*Watcher, len(sw.subscribers))
copy(subs, sw.subscribers)
sw.mu.Unlock()
for _, w := range subs {
select {
case w.errCh <- err:
default:
}
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟠 Major | ⚡ Quick win

Remove or mark failed shared watchers after stream errors.

After watchOp.Error() fires, propagateErrors notifies only the subscribers copied at that moment and then exits. The dead sharedWatcher remains in registry.entries, so a concurrent/later acquire can attach to a watcher with no active stream and never receive Done or Error.

Possible direction
 	case err := <-sw.watchOp.Error():
 		if errors.Is(err, context.Canceled) {
 			return
 		}
+		sw.shutdown()
 		sw.mu.Lock()
 		subs := make([]*Watcher, len(sw.subscribers))

If there is a race with acquire, also persist the stream error on sharedWatcher so newSubscription can immediately populate errCh for late subscribers before the registry entry is removed.

Also applies to: 213-215

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@server/internal/task/watcher.go` around lines 142 - 158, The shared watcher
error handling in propagateErrors leaves a dead sharedWatcher in
registry.entries after watchOp.Error() fires, so later acquire/newSubscription
calls can attach to a stale watcher and miss Done/Error. Update
sharedWatcher/registry teardown so stream errors are persisted on sharedWatcher
and the registry entry is removed or marked failed immediately after the error
is observed, and make newSubscription populate errCh from that stored error for
late subscribers; also ensure propagateErrors continues to use the sharedWatcher
state consistently for both current and future subscribers.

@GabriellePoncey GabriellePoncey requested review from moizpgedge and tsivaprasad and removed request for tsivaprasad June 30, 2026 13:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant