Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions idempotency/idempotency.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Package idempotency adds optional, dependency-free dedupe to a babelqueue
// consumer: it wraps a [babelqueue.Handler] so a message whose meta.id was already
// processed is skipped instead of run again (ADR-0022). It is the Go mirror of the
// PHP BabelQueue\Idempotency helper.
//
// It lives in the core module (zero dependencies, stdlib only) rather than a
// transport submodule, so it ships with every consumer.
package idempotency

import (
"context"
"sync"

babelqueue "github.com/babelqueue/babelqueue-go"
)

// Store is a pluggable record of message ids that have already been processed,
// keyed on the envelope's meta.id (the canonical per-message identity). The
// reference [InMemoryStore] is for tests / single-process consumers; production
// backends (Redis, a database table) implement the same three methods.
//
// The contract is "seen-set" dedupe: it answers "was this id processed?", not
// "what did it return" — queue handlers have no response to replay. It provides
// post-success dedupe under at-least-once + idempotent handlers (error-handling.md
// §1), not exactly-once and not in-flight concurrency locking. A transactional /
// outbox mode is a documented future direction (ADR-0022).
type Store interface {
// Seen reports whether this message id has already been processed (remembered).
Seen(ctx context.Context, messageID string) (bool, error)
// Remember records this message id as processed.
Remember(ctx context.Context, messageID string) error
// Forget drops a message id from the store (manual eviction; a backend may also
// expire ids on its own TTL).
Forget(ctx context.Context, messageID string) error
}

// InMemoryStore is a process-local, goroutine-safe [Store] backed by a map. It is
// suitable for tests and single-process consumers; it is not shared across workers
// and not persistent — use a Redis- or database-backed store for production fleets.
type InMemoryStore struct {
mu sync.Mutex
seen map[string]struct{}
}

// NewInMemoryStore returns an empty in-memory store.
func NewInMemoryStore() *InMemoryStore {
return &InMemoryStore{seen: make(map[string]struct{})}
}

// Seen implements [Store].
func (s *InMemoryStore) Seen(_ context.Context, messageID string) (bool, error) {
s.mu.Lock()
defer s.mu.Unlock()
_, ok := s.seen[messageID]
return ok, nil
}

// Remember implements [Store].
func (s *InMemoryStore) Remember(_ context.Context, messageID string) error {
s.mu.Lock()
defer s.mu.Unlock()
s.seen[messageID] = struct{}{}
return nil
}

// Forget implements [Store].
func (s *InMemoryStore) Forget(_ context.Context, messageID string) error {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.seen, messageID)
return nil
}

// Wrap returns handler decorated so a message whose meta.id was already processed
// successfully is skipped. It composes with the App's ack-on-return /
// retry-on-error contract:
//
// - an already-seen id → returns nil, so the runtime acks it and the broker
// stops redelivering;
// - the handler returns an error → the id is left unmarked and the error
// propagates, so retry / dead-letter still apply and a later delivery runs the
// handler again;
// - a message with no usable meta.id runs unchanged (fail-open).
//
// Register it like any handler: app.Handle(urn, idempotency.Wrap(store, handler)).
func Wrap(store Store, handler babelqueue.Handler) babelqueue.Handler {
return func(ctx context.Context, env babelqueue.Envelope) error {
id := env.Meta.ID

// No usable id → cannot dedupe; run the handler unchanged.
if id == "" {
return handler(ctx, env)
}

seen, err := store.Seen(ctx, id)
if err != nil {
// Don't guess the dedup state — surface the error so the message is
// retried (the runtime's retry / dead-letter path) once the store recovers.
return err
}
if seen {
return nil
}

if err := handler(ctx, env); err != nil {
return err
}

// Best-effort mark: the handler already succeeded, so we ack regardless. A
// failed Remember leaves the id unrecorded (the documented at-least-once
// dual-write window); a later redelivery may reprocess.
_ = store.Remember(ctx, id)
return nil
}
}
139 changes: 139 additions & 0 deletions idempotency/idempotency_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package idempotency

import (
"context"
"errors"
"testing"

babelqueue "github.com/babelqueue/babelqueue-go"
)

func msg(id string) babelqueue.Envelope {
return babelqueue.Envelope{
Job: "urn:babel:orders:created",
TraceID: "trace-1",
Data: map[string]any{"order_id": 7},
Meta: babelqueue.Meta{ID: id, Queue: "orders", Lang: "go", SchemaVersion: 1},
}
}

func TestWrap_RunsAndRemembersOnFirstDelivery(t *testing.T) {
store := NewInMemoryStore()
calls := 0
h := Wrap(store, func(_ context.Context, _ babelqueue.Envelope) error { calls++; return nil })

if err := h(context.Background(), msg("msg-1")); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if calls != 1 {
t.Fatalf("calls = %d, want 1", calls)
}
if seen, _ := store.Seen(context.Background(), "msg-1"); !seen {
t.Fatal("id should be remembered after a successful handler")
}
}

func TestWrap_SkipsRedeliveryOfSameID(t *testing.T) {
store := NewInMemoryStore()
calls := 0
h := Wrap(store, func(_ context.Context, _ babelqueue.Envelope) error { calls++; return nil })

_ = h(context.Background(), msg("msg-1"))
if err := h(context.Background(), msg("msg-1")); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if calls != 1 {
t.Fatalf("calls = %d, want 1 (redelivery must be skipped)", calls)
}
}

func TestWrap_RunsAgainForDifferentID(t *testing.T) {
store := NewInMemoryStore()
calls := 0
h := Wrap(store, func(_ context.Context, _ babelqueue.Envelope) error { calls++; return nil })

_ = h(context.Background(), msg("msg-1"))
_ = h(context.Background(), msg("msg-2"))
if calls != 2 {
t.Fatalf("calls = %d, want 2", calls)
}
}

func TestWrap_DoesNotRememberWhenHandlerErrors(t *testing.T) {
store := NewInMemoryStore()
calls := 0
boom := errors.New("boom")
h := Wrap(store, func(_ context.Context, _ babelqueue.Envelope) error { calls++; return boom })

if err := h(context.Background(), msg("msg-1")); !errors.Is(err, boom) {
t.Fatalf("err = %v, want boom to propagate so the runtime retries", err)
}
if seen, _ := store.Seen(context.Background(), "msg-1"); seen {
t.Fatal("an errored id must not be remembered")
}

// A redelivery runs the handler again — retry works.
_ = h(context.Background(), msg("msg-1"))
if calls != 2 {
t.Fatalf("calls = %d, want 2", calls)
}
}

func TestWrap_RunsWhenNoUsableID(t *testing.T) {
store := NewInMemoryStore()
calls := 0
h := Wrap(store, func(_ context.Context, _ babelqueue.Envelope) error { calls++; return nil })

_ = h(context.Background(), msg("")) // empty id → cannot dedupe → runs
_ = h(context.Background(), msg("")) // still runs
if calls != 2 {
t.Fatalf("calls = %d, want 2 (no id → no dedupe)", calls)
}
}

func TestWrap_SeenErrorPropagates(t *testing.T) {
failing := &failingStore{seenErr: errors.New("store down")}
calls := 0
h := Wrap(failing, func(_ context.Context, _ babelqueue.Envelope) error { calls++; return nil })

if err := h(context.Background(), msg("msg-1")); err == nil {
t.Fatal("a Seen error must propagate so the message retries")
}
if calls != 0 {
t.Fatalf("handler must not run when the dedup check fails; calls = %d", calls)
}
}

func TestWrap_RememberErrorStillAcks(t *testing.T) {
failing := &failingStore{rememberErr: errors.New("write failed")}
calls := 0
h := Wrap(failing, func(_ context.Context, _ babelqueue.Envelope) error { calls++; return nil })

if err := h(context.Background(), msg("msg-1")); err != nil {
t.Fatalf("a Remember failure after a successful handler must still ack; got %v", err)
}
if calls != 1 {
t.Fatalf("calls = %d, want 1", calls)
}
}

func TestInMemoryStore_Forget(t *testing.T) {
store := NewInMemoryStore()
_ = store.Remember(context.Background(), "msg-1")
if seen, _ := store.Seen(context.Background(), "msg-1"); !seen {
t.Fatal("should be seen after Remember")
}
_ = store.Forget(context.Background(), "msg-1")
if seen, _ := store.Seen(context.Background(), "msg-1"); seen {
t.Fatal("should not be seen after Forget")
}
}

type failingStore struct {
seenErr error
rememberErr error
}

func (f *failingStore) Seen(_ context.Context, _ string) (bool, error) { return false, f.seenErr }
func (f *failingStore) Remember(_ context.Context, _ string) error { return f.rememberErr }
func (f *failingStore) Forget(_ context.Context, _ string) error { return nil }
49 changes: 49 additions & 0 deletions schema/conformance_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package schema

import (
"encoding/json"
"os"
"path/filepath"
"testing"
)

// TestPayloadConformance runs the shared cross-SDK payload-schema cases (ADR-0024) from the
// vendored conformance suite: every BabelQueue SDK's payload validator must agree with this
// one on each case's `valid` flag, so the hand-rolled subset validators cannot drift apart.
func TestPayloadConformance(t *testing.T) {
raw, err := os.ReadFile(filepath.Join("..", "testdata", "conformance", "manifest.json"))
if err != nil {
t.Skipf("vendored conformance suite not present: %v", err)
}

var manifest struct {
PayloadSchema *struct {
Schema json.RawMessage `json:"schema"`
Cases []struct {
Name string `json:"name"`
Valid bool `json:"valid"`
Data map[string]any `json:"data"`
} `json:"cases"`
} `json:"payload_schema"`
}
if err := json.Unmarshal(raw, &manifest); err != nil {
t.Fatalf("decode manifest: %v", err)
}
if manifest.PayloadSchema == nil {
t.Skip("manifest has no payload_schema section")
}

s, err := Parse(manifest.PayloadSchema.Schema)
if err != nil {
t.Fatalf("parse schema: %v", err)
}
if len(manifest.PayloadSchema.Cases) == 0 {
t.Fatal("payload_schema has no cases")
}
for _, c := range manifest.PayloadSchema.Cases {
got := len(s.Validate(c.Data)) == 0
if got != c.Valid {
t.Errorf("case %q: got valid=%v, want %v", c.Name, got, c.Valid)
}
}
}
59 changes: 59 additions & 0 deletions schema/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package schema

import (
"context"
"errors"
"fmt"
"strings"

babelqueue "github.com/babelqueue/babelqueue-go"
)

// ErrInvalidPayload is returned (wrapped) when a message's data does not match the schema
// registered for its URN. Detect it with errors.Is.
var ErrInvalidPayload = errors.New("babelqueue/schema: message data does not match its URN schema")

// Check validates a (urn, data) pair against the schema registered for urn. It is the
// producer-side guard: call it before publishing so invalid data never enters the queue.
// It returns nil when the data is valid OR when no schema is registered for the URN
// (opt-in); a provider lookup error is returned wrapped (transient — e.g. the registry file
// is briefly unavailable during a deploy).
func Check(provider Provider, urn string, data map[string]any) error {
sch, found, err := provider.Schema(urn)
if err != nil {
return fmt.Errorf("schema: lookup %q: %w", urn, err)
}
if !found {
return nil
}
if violations := sch.Validate(data); len(violations) > 0 {
return fmt.Errorf("%w for %q: %s", ErrInvalidPayload, urn, strings.Join(violations, "; "))
}
return nil
}

// Validate is the envelope form of [Check]: it validates env.Data against env.URN()'s schema.
func Validate(provider Provider, env babelqueue.Envelope) error {
return Check(provider, env.URN(), env.Data)
}

// Wrap returns handler decorated to validate each message's data against its URN schema
// before the handler runs (consumer-side safety net). It composes with the App's
// ack-on-return / retry-on-error contract:
//
// - valid data (or no schema registered for the URN) → the handler runs unchanged;
// - invalid data → [ErrInvalidPayload] is returned, so the message takes the retry /
// dead-letter path. Because invalid data will not become valid on retry, such a poison
// message exhausts its attempts and is dead-lettered — prefer [Check] producer-side to
// keep invalid data out of the queue entirely;
// - a provider lookup error → returned, so the message is retried once the source recovers.
//
// Register it like any handler: app.Handle(urn, schema.Wrap(provider, handler)).
func Wrap(provider Provider, handler babelqueue.Handler) babelqueue.Handler {
return func(ctx context.Context, env babelqueue.Envelope) error {
if err := Validate(provider, env); err != nil {
return err
}
return handler(ctx, env)
}
}
Loading