Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
30b14db
feat: add WithDrain option for graceful Stop
klaidliadon May 1, 2026
c586f84
feat: add NewTicker primitive
klaidliadon May 1, 2026
03bfd97
chore: fix go vet and staticcheck warnings
klaidliadon May 1, 2026
6188660
chore: fix two more linter warnings
klaidliadon May 1, 2026
63caae5
fix: make Stop concurrency-safe under WithDrain
klaidliadon May 1, 2026
2dc534c
feat: distinguish drain timeout from clean drain via ErrDrainTimedOut
klaidliadon May 1, 2026
2707d2a
docs: warn about Stopping foot-gun and ticker+retry cadence
klaidliadon May 1, 2026
101ee9f
fix: use independent timer for WithDrain timeout
klaidliadon May 1, 2026
6db7242
docs(examples): use pristine ctx for Run so SIGTERM doesn't bypass drain
klaidliadon May 1, 2026
4dbbc85
fix: preserve drain semantics under concurrent Stop
klaidliadon May 2, 2026
1cbd251
docs(examples): exit promptly when worker dies before signal
klaidliadon May 2, 2026
fe7b1fd
fix: snapshot runCancel under mutex to avoid cancelling a future Run
klaidliadon May 2, 2026
00b32bc
fix: WithRetry stops retrying after WithDrain's Stopping fires
klaidliadon May 2, 2026
b47ae7e
test: replace vacuous round-2 test with same-runnable Run/Stop/Run
klaidliadon May 2, 2026
a33e08c
test: rename round-2 test, drop unfounded snapshot-coverage claim
klaidliadon May 2, 2026
0758ed9
fix(retry): scope lastTime per Run cycle, drop the struct field
klaidliadon May 2, 2026
b703cdb
test: drop vacuous "retry budget is per-Run-cycle" test
klaidliadon May 2, 2026
b564459
feat(adapters): scaffold subpackage with doc.go
klaidliadon May 11, 2026
1f60e22
feat(adapters): Draining + Stopping + ErrDrainTimedOut
klaidliadon May 11, 2026
569157a
feat(adapters): Ticker that composes with Draining
klaidliadon May 11, 2026
b0cb5ae
test: NewGroup propagates shutdown to Draining children
klaidliadon May 11, 2026
8548804
refactor: strip drain logic from core Stop
klaidliadon May 11, 2026
3591d79
test(runnable): revert stop-timeout strengthening (drain is now exter…
klaidliadon May 11, 2026
05e09d7
refactor(retry): remove Stopping coupling (drain is no longer in core)
klaidliadon May 11, 2026
ed3ac1e
refactor: drop v0.1 WithDrain Option and NewTicker constructor
klaidliadon May 11, 2026
f5f0a36
docs(examples): rewrite ticker-with-drain for adapters API
klaidliadon May 11, 2026
dd00f18
docs(readme): replace drain/ticker sections with adapters + migration…
klaidliadon May 11, 2026
12d2601
refactor: clarify runCancel snapshot reason; simplify README example
klaidliadon May 11, 2026
64a09e1
fix(adapters): recover panics in Draining's work goroutine
klaidliadon May 12, 2026
2164585
test(adapters): make TestTicker_FiresOnInterval wall-clock independent
klaidliadon May 12, 2026
92b3741
docs(readme): make legacy examples ctx-responsive
klaidliadon May 12, 2026
296c14b
ci: bump Go to match go.mod and refresh action versions
klaidliadon May 12, 2026
b80a033
refactor(adapters): chi-style middleware via runnable.WithAdapters
klaidliadon May 12, 2026
92b5063
feat(adapters): migrate Recovering and Retry; drop Status.Restarts
klaidliadon May 12, 2026
c5f4101
docs(readme,adapters): rewrite for runnable.WithAdapters API
klaidliadon May 12, 2026
e2bf019
docs(adapters): tighten godoc on exported symbols
klaidliadon May 12, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v4
uses: actions/setup-go@v5
with:
go-version: '1.20'
go-version-file: 'go.mod'

- name: Build
run: go build -v ./...
Expand Down
127 changes: 95 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@ r := runnable.New(func(ctx context.Context) error {
select {
case <-ctx.Done():
return nil
default:
case <-time.After(time.Second):
}
time.Sleep(1 * time.Second)
fmt.Println("Running...")
}
})
Expand All @@ -71,7 +70,8 @@ if err != nil {
### Runnable Function with timeout
```go
fmt.Println("Simple function with timeout...")
ctxWithTimeout, _ := context.WithTimeout(context.Background(), 5*time.Second)
ctxWithTimeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = runnable.New(func(ctx context.Context) error {
fmt.Println("Starting...")
defer fmt.Println("Stopping...")
Expand All @@ -80,9 +80,8 @@ err = runnable.New(func(ctx context.Context) error {
select {
case <-ctx.Done():
return nil
default:
case <-time.After(time.Second):
}
time.Sleep(1 * time.Second)
fmt.Println("Running...")
}
}).Run(ctxWithTimeout)
Expand All @@ -91,36 +90,100 @@ if err != nil {
}
```

### Runnable Function with retry
### Adapters

Cross-cutting behaviors that aren't part of the core lifecycle live in
the `runnable/adapters` subpackage as chi-style middleware: each
`runnable.Adapter` has the shape `func(next RunFunc) RunFunc`. Apply
them with `runnable.WithAdapters` (left-to-right = outermost-to-innermost):

```go
fmt.Println("Simple function with retry...")
errorReturned := false
err = runnable.New(func(ctx context.Context) error {
fmt.Println("Starting...")
defer fmt.Println("Stopping...")

if !errorReturned {
errorReturned = true
return fmt.Errorf("error")
}

// do something
for i := 0; i < 5; i++ {
select {
case <-ctx.Done():
return nil
default:
}
time.Sleep(1 * time.Second)
fmt.Println("Running...")
}
return nil
}, runnable.WithRetry(3, runnable.ResetNever)).Run(context.Background())
if err != nil {
fmt.Println(err)
}
r := runnable.New(reconcile, runnable.WithAdapters(
adapters.Draining(10*time.Second),
adapters.Recovering(reportPanic),
adapters.Retry(3, time.Minute),
adapters.Ticker(30*time.Second),
))
```

**Draining** — graceful shutdown with a grace window. When the outer
ctx is cancelled, the wrapped work has `timeout` to return via
`adapters.Stopping(ctx)` before its ctx is force-cancelled and
`adapters.ErrDrainTimedOut` is returned.

**Ticker** — calls the wrapped work once per interval until ctx is
cancelled or the work returns an error. Composes with Draining: an
in-flight tick is allowed to finish before the loop exits.

**Recovering** — turns panics in the wrapped work into errors and
invokes the optional handler before returning. Place inside Draining
when both are in use.

**Retry** — re-invokes the wrapped work up to `maxRetries` times on
non-context errors. If `resetAfter` is non-zero and at least that long
has passed since the previous attempt, the retry budget resets.

Inside long-running work, always select on both `ctx.Done()` and
`adapters.Stopping(ctx)` — `Stopping` signals drain start, `ctx.Done()`
fires only when the drain timer expires.

A full SIGTERM-safe service shape lives in
[`examples/ticker-with-drain`](examples/ticker-with-drain/main.go).

### Migrating from v0.1 to v0.2

v0.2 moves drain, ticker, retry, and panic recovery out of the core
package. `WithDrain`, `NewTicker`, `WithRetry`, and `WithRecoverer`
are removed; their replacements live at `runnable/adapters` as
chi-style middleware.

Before (v0.1):

r := runnable.NewTicker(30*time.Second, doWork,
runnable.WithDrain(10*time.Second),
runnable.WithRecoverer(reporter, nil),
runnable.WithRetry(3, time.Minute),
)

After (v0.2):

r := runnable.New(doWork, runnable.WithAdapters(
adapters.Draining(10*time.Second),
adapters.Recovering(handler),
adapters.Retry(3, time.Minute),
adapters.Ticker(30*time.Second),
))

Symbol mapping:

- `runnable.WithDrain` → `adapters.Draining` under `runnable.WithAdapters`.
- `runnable.NewTicker` → `adapters.Ticker` under `runnable.WithAdapters`
(no longer takes the work argument; pass work to `runnable.New`).
- `runnable.WithRetry` / `runnable.ResetNever` → `adapters.Retry` /
`adapters.ResetNever`.
- `runnable.WithRecoverer` → `adapters.Recovering` with a single
`PanicHandler` callback (the two-interface `RecoveryReporter` /
`StackPrinter` split is gone).
- `runnable.Stopping` → `adapters.Stopping`.
- `runnable.ErrDrainTimedOut` → `adapters.ErrDrainTimedOut`.

**Behavioral change:** `Stop(ctx)`'s ctx no longer shortens the drain
window. In v0.1, a caller ctx shorter than `WithDrain`'s timeout would
force-cancel mid-drain. In v0.2, `Stop`'s ctx only governs how long
the caller waits for `Stop` to return; the drain runs on its own
fixed-duration timer regardless. If you need a shorter drain budget,
configure `Draining` with the shorter duration.

**Status.Restarts removed.** The `Restarts` field on `Status` counted
`WithRetry` re-entries via the deprecated `onStart` coupling; with
retry moved into adapters it had no clean way to surface. Pending a
proper event/observer hook in a later release.

**NewGroup interaction:** drain-enabled children of `NewGroup` now
drain when the group is stopped (v0.1 silently bypassed the child's
drain). No code change required at call sites — the adapter design
fixes this by construction.

### Runnable Object
```go
package main
Expand Down
28 changes: 28 additions & 0 deletions adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package runnable

import "context"

// RunFunc is the lifecycle function wrapped by runnable.New.
type RunFunc func(ctx context.Context) error

// Adapter wraps a RunFunc with cross-cutting behavior, mirroring the
// chi middleware shape. Concrete adapters live in runnable/adapters.
type Adapter func(next RunFunc) RunFunc

type withAdapters struct {
adapters []Adapter
}

// WithAdapters wraps the runnable's runFunc left-to-right (first listed
// = outermost). Apply order across Options matters.
func WithAdapters(adapters ...Adapter) Option {
return &withAdapters{adapters: adapters}
}

func (w *withAdapters) apply(r *runnable) {
next := RunFunc(r.runFunc)
for i := len(w.adapters) - 1; i >= 0; i-- {
next = w.adapters[i](next)
}
r.runFunc = next
}
9 changes: 9 additions & 0 deletions adapters/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Package adapters provides chi-style middleware around the runnable
// RunFunc signature. Each constructor returns a runnable.Adapter;
// compose them via runnable.WithAdapters (first listed = outermost):
//
// r := runnable.New(reconcile, runnable.WithAdapters(
// adapters.Draining(10*time.Second),
// adapters.Ticker(time.Second),
// ))
package adapters
72 changes: 72 additions & 0 deletions adapters/draining.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package adapters

import (
"context"
"errors"
"fmt"
"runtime/debug"
"time"

"github.com/0xsequence/runnable"
)

// ErrDrainTimedOut is returned by Draining when work did not exit
// within the drain timeout and was force-cancelled.
var ErrDrainTimedOut = errors.New("adapters: drain timed out")

type stoppingKey struct{}

// Stopping returns a channel that closes when Draining begins shutdown,
// or nil outside Draining. Select on this alongside ctx.Done() — ctx is
// force-cancelled only after the drain timer expires.
func Stopping(ctx context.Context) <-chan struct{} {
ch, _ := ctx.Value(stoppingKey{}).(<-chan struct{})
return ch
}

// Draining returns an Adapter that delays cancellation: when outerCtx
// is cancelled, next has up to timeout to return via Stopping(workCtx)
// before workCtx is force-cancelled and ErrDrainTimedOut is returned.
// Panics in next are recovered into an error (they would otherwise
// crash the process, since next runs on its own goroutine).
func Draining(timeout time.Duration) runnable.Adapter {
return func(next runnable.RunFunc) runnable.RunFunc {
return func(outerCtx context.Context) error {
// Decoupled from outerCtx so outer cancellation triggers drain
// rather than aborting next directly.
workCtx, cancelWork := context.WithCancel(context.WithoutCancel(outerCtx))
defer cancelWork()

stopping := make(chan struct{})
workCtx = context.WithValue(workCtx, stoppingKey{}, (<-chan struct{})(stopping))

done := make(chan error, 1)
go func() {
defer func() {
if rec := recover(); rec != nil {
done <- fmt.Errorf("adapters: panic in draining work: %v\n%s", rec, debug.Stack())
}
}()
done <- next(workCtx)
}()

select {
case err := <-done:
return err
case <-outerCtx.Done():
close(stopping)
}

timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case err := <-done:
return err
case <-timer.C:
cancelWork()
<-done
return ErrDrainTimedOut
}
}
}
}
Loading
Loading