feat: propagate task cancellation to in-progress activities#419
feat: propagate task cancellation to in-progress activities#419GabriellePoncey wants to merge 1 commit into
Conversation
When a task is cancelled, ApplyEvent now detects it via an etcd watch and cancels its own context, interrupting long-running operations rather than running to completion. PLAT-583
📝 WalkthroughWalkthroughAdds a task terminal-state watcher subsystem: ChangesTask Terminal-State Watcher
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (3 passed)
✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Up to standards ✅🟢 Issues
|
| Category | Results |
|---|---|
| Complexity | 1 medium |
🟢 Metrics 35 complexity · 0 duplication
Metric Results Complexity 35 Duplication 0
NEW Get contextual insights on your PRs based on Codacy's metrics, along with PR and Jira context, without leaving GitHub. Enable AI reviewer
TIP This summary will be updated as you push new changes.
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
server/internal/workflows/activities/apply_event.go (1)
71-73: 🩺 Stability & Availability | 🔵 Trivial | ⚡ Quick winLog watcher stream failures before disabling interruption support.
Setup failures are warned on Line 64, but runtime watcher failures silently stop cancellation propagation. Capture the error value here so operators can diagnose why task cancellation no longer interrupts this activity.
Proposed adjustment
- case <-watcher.Error(): + case err := <-watcher.Error(): // Watch stream died; stop monitoring without cancelling // the activity — we don't know the task's current state. + activity.Logger(ctx).Warn("task watcher stopped; activity won't be interrupted on task cancellation", "error", err)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@server/internal/workflows/activities/apply_event.go` around lines 71 - 73, The watcher error path in apply_event.go currently stops interruption support without recording why, so update the watch loop around watcher.Error() in applyEventActivity to capture the received error and log it before returning or breaking. Reuse the same logging pattern used for watcher setup failures, and include the actual error value plus enough context to identify the activity/task so operators can diagnose runtime watcher stream failures.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@server/internal/task/watcher.go`:
- Around line 142-158: The shared watcher error handling in propagateErrors
leaves a dead sharedWatcher in registry.entries after watchOp.Error() fires, so
later acquire/newSubscription calls can attach to a stale watcher and miss
Done/Error. Update sharedWatcher/registry teardown so stream errors are
persisted on sharedWatcher and the registry entry is removed or marked failed
immediately after the error is observed, and make newSubscription populate errCh
from that stored error for late subscribers; also ensure propagateErrors
continues to use the sharedWatcher state consistently for both current and
future subscribers.
- Around line 105-114: The shared watcher completion state is being overwritten
on subsequent terminal events, so once finishAll marks sharedWatcher as terminal
it should not update terminalErr again. Add an early return at the start of
sharedWatcher.finishAll when sw.terminal is already true, and keep the existing
subscriber fan-out only for the first terminal transition so new subscribers
don’t see a later status replace the original canceling state.
---
Nitpick comments:
In `@server/internal/workflows/activities/apply_event.go`:
- Around line 71-73: The watcher error path in apply_event.go currently stops
interruption support without recording why, so update the watch loop around
watcher.Error() in applyEventActivity to capture the received error and log it
before returning or breaking. Reuse the same logging pattern used for watcher
setup failures, and include the actual error value plus enough context to
identify the activity/task so operators can diagnose runtime watcher stream
failures.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 0d012b0d-53bb-4144-acce-773016814c0c
📒 Files selected for processing (4)
server/internal/task/service.goserver/internal/task/task_store.goserver/internal/task/watcher.goserver/internal/workflows/activities/apply_event.go
| func (sw *sharedWatcher) finishAll(err error) { | ||
| sw.mu.Lock() | ||
| sw.terminal = true | ||
| sw.terminalErr = err | ||
| subs := make([]*Watcher, len(sw.subscribers)) | ||
| copy(subs, sw.subscribers) | ||
| sw.mu.Unlock() | ||
| for _, sub := range subs { | ||
| sub.finish(err) | ||
| } |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟠 Major | ⚡ Quick win
Make terminal completion sticky.
finishAll overwrites terminalErr on later events, so a task seen as StatusCanceling can later be reported as failed/completed/deleted to new subscribers while the same shared watcher remains registered. Return early once sw.terminal is already set.
Proposed fix
func (sw *sharedWatcher) finishAll(err error) {
sw.mu.Lock()
+ if sw.terminal {
+ sw.mu.Unlock()
+ return
+ }
sw.terminal = true
sw.terminalErr = err
subs := make([]*Watcher, len(sw.subscribers))
copy(subs, sw.subscribers)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (sw *sharedWatcher) finishAll(err error) { | |
| sw.mu.Lock() | |
| sw.terminal = true | |
| sw.terminalErr = err | |
| subs := make([]*Watcher, len(sw.subscribers)) | |
| copy(subs, sw.subscribers) | |
| sw.mu.Unlock() | |
| for _, sub := range subs { | |
| sub.finish(err) | |
| } | |
| func (sw *sharedWatcher) finishAll(err error) { | |
| sw.mu.Lock() | |
| if sw.terminal { | |
| sw.mu.Unlock() | |
| return | |
| } | |
| sw.terminal = true | |
| sw.terminalErr = err | |
| subs := make([]*Watcher, len(sw.subscribers)) | |
| copy(subs, sw.subscribers) | |
| sw.mu.Unlock() | |
| for _, sub := range subs { | |
| sub.finish(err) | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@server/internal/task/watcher.go` around lines 105 - 114, The shared watcher
completion state is being overwritten on subsequent terminal events, so once
finishAll marks sharedWatcher as terminal it should not update terminalErr
again. Add an early return at the start of sharedWatcher.finishAll when
sw.terminal is already true, and keep the existing subscriber fan-out only for
the first terminal transition so new subscribers don’t see a later status
replace the original canceling state.
| func (sw *sharedWatcher) propagateErrors() { | ||
| select { | ||
| case <-sw.shutdownCh: | ||
| case err := <-sw.watchOp.Error(): | ||
| if errors.Is(err, context.Canceled) { | ||
| return | ||
| } | ||
| sw.mu.Lock() | ||
| subs := make([]*Watcher, len(sw.subscribers)) | ||
| copy(subs, sw.subscribers) | ||
| sw.mu.Unlock() | ||
| for _, w := range subs { | ||
| select { | ||
| case w.errCh <- err: | ||
| default: | ||
| } | ||
| } |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟠 Major | ⚡ Quick win
Remove or mark failed shared watchers after stream errors.
After watchOp.Error() fires, propagateErrors notifies only the subscribers copied at that moment and then exits. The dead sharedWatcher remains in registry.entries, so a concurrent/later acquire can attach to a watcher with no active stream and never receive Done or Error.
Possible direction
case err := <-sw.watchOp.Error():
if errors.Is(err, context.Canceled) {
return
}
+ sw.shutdown()
sw.mu.Lock()
subs := make([]*Watcher, len(sw.subscribers))If there is a race with acquire, also persist the stream error on sharedWatcher so newSubscription can immediately populate errCh for late subscribers before the registry entry is removed.
Also applies to: 213-215
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@server/internal/task/watcher.go` around lines 142 - 158, The shared watcher
error handling in propagateErrors leaves a dead sharedWatcher in
registry.entries after watchOp.Error() fires, so later acquire/newSubscription
calls can attach to a stale watcher and miss Done/Error. Update
sharedWatcher/registry teardown so stream errors are persisted on sharedWatcher
and the registry entry is removed or marked failed immediately after the error
is observed, and make newSubscription populate errCh from that stored error for
late subscribers; also ensure propagateErrors continues to use the sharedWatcher
state consistently for both current and future subscribers.
When a task is cancelled, ApplyEvent now detects it via an etcd watch and cancels its own context, interrupting long-running operations rather than running to completion. Operations that respect context cancellation (eg Patroni HTTP calls) are interrupted promptly while others complete their current step before the activity exits.
Changes
task.Watcher: subscribes to a task key in etcd and closes aDonechannel when the task reaches a terminal state or is deleted;Errreturns the reason (ErrTaskCanceled,ErrTaskFailed, or nil).task.Service.NewWatcherandtask.TaskStore.Watchto createwatchers from the service layer.
watcherRegistrytotask.Service: concurrentApplyEventcallswatching the same task share one etcd stream rather than each opening
their own; events are fanned out to all subscribers.
ApplyEvent: a goroutine cancels the activitycontext when
watcher.Done()fires. Watcher setup failure isnon-fatal — the activity continues without interruption support.
StatusCancelingthe same asStatusCanceledso activities areinterrupted during the canceling phase, not only after it completes.
PLAT-583