From 6502e9eca69d1f733b23068d794a52585e58e79f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muhammet=20=C5=9Eafak?= Date: Thu, 18 Jun 2026 13:43:58 +0300 Subject: [PATCH] feat: optional idempotency (ADR-0022) and per-URN schema validation (ADR-0024) Opt-in, dependency-free helpers; wire envelope stays frozen. Includes the vendored payload_schema cross-SDK conformance cases. --- idempotency/idempotency.go | 115 ++++++++++++++ idempotency/idempotency_test.go | 139 +++++++++++++++++ schema/conformance_test.go | 49 ++++++ schema/consumer.go | 59 ++++++++ schema/consumer_test.go | 81 ++++++++++ schema/provider.go | 114 ++++++++++++++ schema/provider_test.go | 98 ++++++++++++ schema/schema.go | 233 +++++++++++++++++++++++++++++ schema/schema_test.go | 105 +++++++++++++ testdata/conformance/manifest.json | 23 +++ 10 files changed, 1016 insertions(+) create mode 100644 idempotency/idempotency.go create mode 100644 idempotency/idempotency_test.go create mode 100644 schema/conformance_test.go create mode 100644 schema/consumer.go create mode 100644 schema/consumer_test.go create mode 100644 schema/provider.go create mode 100644 schema/provider_test.go create mode 100644 schema/schema.go create mode 100644 schema/schema_test.go diff --git a/idempotency/idempotency.go b/idempotency/idempotency.go new file mode 100644 index 0000000..6fb779e --- /dev/null +++ b/idempotency/idempotency.go @@ -0,0 +1,115 @@ +// Package idempotency adds optional, dependency-free dedupe to a babelqueue +// consumer: it wraps a [babelqueue.Handler] so a message whose meta.id was already +// processed is skipped instead of run again (ADR-0022). It is the Go mirror of the +// PHP BabelQueue\Idempotency helper. +// +// It lives in the core module (zero dependencies, stdlib only) rather than a +// transport submodule, so it ships with every consumer. +package idempotency + +import ( + "context" + "sync" + + babelqueue "github.com/babelqueue/babelqueue-go" +) + +// Store is a pluggable record of message ids that have already been processed, +// keyed on the envelope's meta.id (the canonical per-message identity). The +// reference [InMemoryStore] is for tests / single-process consumers; production +// backends (Redis, a database table) implement the same three methods. +// +// The contract is "seen-set" dedupe: it answers "was this id processed?", not +// "what did it return" — queue handlers have no response to replay. It provides +// post-success dedupe under at-least-once + idempotent handlers (error-handling.md +// §1), not exactly-once and not in-flight concurrency locking. A transactional / +// outbox mode is a documented future direction (ADR-0022). +type Store interface { + // Seen reports whether this message id has already been processed (remembered). + Seen(ctx context.Context, messageID string) (bool, error) + // Remember records this message id as processed. + Remember(ctx context.Context, messageID string) error + // Forget drops a message id from the store (manual eviction; a backend may also + // expire ids on its own TTL). + Forget(ctx context.Context, messageID string) error +} + +// InMemoryStore is a process-local, goroutine-safe [Store] backed by a map. It is +// suitable for tests and single-process consumers; it is not shared across workers +// and not persistent — use a Redis- or database-backed store for production fleets. +type InMemoryStore struct { + mu sync.Mutex + seen map[string]struct{} +} + +// NewInMemoryStore returns an empty in-memory store. +func NewInMemoryStore() *InMemoryStore { + return &InMemoryStore{seen: make(map[string]struct{})} +} + +// Seen implements [Store]. +func (s *InMemoryStore) Seen(_ context.Context, messageID string) (bool, error) { + s.mu.Lock() + defer s.mu.Unlock() + _, ok := s.seen[messageID] + return ok, nil +} + +// Remember implements [Store]. +func (s *InMemoryStore) Remember(_ context.Context, messageID string) error { + s.mu.Lock() + defer s.mu.Unlock() + s.seen[messageID] = struct{}{} + return nil +} + +// Forget implements [Store]. +func (s *InMemoryStore) Forget(_ context.Context, messageID string) error { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.seen, messageID) + return nil +} + +// Wrap returns handler decorated so a message whose meta.id was already processed +// successfully is skipped. It composes with the App's ack-on-return / +// retry-on-error contract: +// +// - an already-seen id → returns nil, so the runtime acks it and the broker +// stops redelivering; +// - the handler returns an error → the id is left unmarked and the error +// propagates, so retry / dead-letter still apply and a later delivery runs the +// handler again; +// - a message with no usable meta.id runs unchanged (fail-open). +// +// Register it like any handler: app.Handle(urn, idempotency.Wrap(store, handler)). +func Wrap(store Store, handler babelqueue.Handler) babelqueue.Handler { + return func(ctx context.Context, env babelqueue.Envelope) error { + id := env.Meta.ID + + // No usable id → cannot dedupe; run the handler unchanged. + if id == "" { + return handler(ctx, env) + } + + seen, err := store.Seen(ctx, id) + if err != nil { + // Don't guess the dedup state — surface the error so the message is + // retried (the runtime's retry / dead-letter path) once the store recovers. + return err + } + if seen { + return nil + } + + if err := handler(ctx, env); err != nil { + return err + } + + // Best-effort mark: the handler already succeeded, so we ack regardless. A + // failed Remember leaves the id unrecorded (the documented at-least-once + // dual-write window); a later redelivery may reprocess. + _ = store.Remember(ctx, id) + return nil + } +} diff --git a/idempotency/idempotency_test.go b/idempotency/idempotency_test.go new file mode 100644 index 0000000..85f93a3 --- /dev/null +++ b/idempotency/idempotency_test.go @@ -0,0 +1,139 @@ +package idempotency + +import ( + "context" + "errors" + "testing" + + babelqueue "github.com/babelqueue/babelqueue-go" +) + +func msg(id string) babelqueue.Envelope { + return babelqueue.Envelope{ + Job: "urn:babel:orders:created", + TraceID: "trace-1", + Data: map[string]any{"order_id": 7}, + Meta: babelqueue.Meta{ID: id, Queue: "orders", Lang: "go", SchemaVersion: 1}, + } +} + +func TestWrap_RunsAndRemembersOnFirstDelivery(t *testing.T) { + store := NewInMemoryStore() + calls := 0 + h := Wrap(store, func(_ context.Context, _ babelqueue.Envelope) error { calls++; return nil }) + + if err := h(context.Background(), msg("msg-1")); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if calls != 1 { + t.Fatalf("calls = %d, want 1", calls) + } + if seen, _ := store.Seen(context.Background(), "msg-1"); !seen { + t.Fatal("id should be remembered after a successful handler") + } +} + +func TestWrap_SkipsRedeliveryOfSameID(t *testing.T) { + store := NewInMemoryStore() + calls := 0 + h := Wrap(store, func(_ context.Context, _ babelqueue.Envelope) error { calls++; return nil }) + + _ = h(context.Background(), msg("msg-1")) + if err := h(context.Background(), msg("msg-1")); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if calls != 1 { + t.Fatalf("calls = %d, want 1 (redelivery must be skipped)", calls) + } +} + +func TestWrap_RunsAgainForDifferentID(t *testing.T) { + store := NewInMemoryStore() + calls := 0 + h := Wrap(store, func(_ context.Context, _ babelqueue.Envelope) error { calls++; return nil }) + + _ = h(context.Background(), msg("msg-1")) + _ = h(context.Background(), msg("msg-2")) + if calls != 2 { + t.Fatalf("calls = %d, want 2", calls) + } +} + +func TestWrap_DoesNotRememberWhenHandlerErrors(t *testing.T) { + store := NewInMemoryStore() + calls := 0 + boom := errors.New("boom") + h := Wrap(store, func(_ context.Context, _ babelqueue.Envelope) error { calls++; return boom }) + + if err := h(context.Background(), msg("msg-1")); !errors.Is(err, boom) { + t.Fatalf("err = %v, want boom to propagate so the runtime retries", err) + } + if seen, _ := store.Seen(context.Background(), "msg-1"); seen { + t.Fatal("an errored id must not be remembered") + } + + // A redelivery runs the handler again — retry works. + _ = h(context.Background(), msg("msg-1")) + if calls != 2 { + t.Fatalf("calls = %d, want 2", calls) + } +} + +func TestWrap_RunsWhenNoUsableID(t *testing.T) { + store := NewInMemoryStore() + calls := 0 + h := Wrap(store, func(_ context.Context, _ babelqueue.Envelope) error { calls++; return nil }) + + _ = h(context.Background(), msg("")) // empty id → cannot dedupe → runs + _ = h(context.Background(), msg("")) // still runs + if calls != 2 { + t.Fatalf("calls = %d, want 2 (no id → no dedupe)", calls) + } +} + +func TestWrap_SeenErrorPropagates(t *testing.T) { + failing := &failingStore{seenErr: errors.New("store down")} + calls := 0 + h := Wrap(failing, func(_ context.Context, _ babelqueue.Envelope) error { calls++; return nil }) + + if err := h(context.Background(), msg("msg-1")); err == nil { + t.Fatal("a Seen error must propagate so the message retries") + } + if calls != 0 { + t.Fatalf("handler must not run when the dedup check fails; calls = %d", calls) + } +} + +func TestWrap_RememberErrorStillAcks(t *testing.T) { + failing := &failingStore{rememberErr: errors.New("write failed")} + calls := 0 + h := Wrap(failing, func(_ context.Context, _ babelqueue.Envelope) error { calls++; return nil }) + + if err := h(context.Background(), msg("msg-1")); err != nil { + t.Fatalf("a Remember failure after a successful handler must still ack; got %v", err) + } + if calls != 1 { + t.Fatalf("calls = %d, want 1", calls) + } +} + +func TestInMemoryStore_Forget(t *testing.T) { + store := NewInMemoryStore() + _ = store.Remember(context.Background(), "msg-1") + if seen, _ := store.Seen(context.Background(), "msg-1"); !seen { + t.Fatal("should be seen after Remember") + } + _ = store.Forget(context.Background(), "msg-1") + if seen, _ := store.Seen(context.Background(), "msg-1"); seen { + t.Fatal("should not be seen after Forget") + } +} + +type failingStore struct { + seenErr error + rememberErr error +} + +func (f *failingStore) Seen(_ context.Context, _ string) (bool, error) { return false, f.seenErr } +func (f *failingStore) Remember(_ context.Context, _ string) error { return f.rememberErr } +func (f *failingStore) Forget(_ context.Context, _ string) error { return nil } diff --git a/schema/conformance_test.go b/schema/conformance_test.go new file mode 100644 index 0000000..7ae9e40 --- /dev/null +++ b/schema/conformance_test.go @@ -0,0 +1,49 @@ +package schema + +import ( + "encoding/json" + "os" + "path/filepath" + "testing" +) + +// TestPayloadConformance runs the shared cross-SDK payload-schema cases (ADR-0024) from the +// vendored conformance suite: every BabelQueue SDK's payload validator must agree with this +// one on each case's `valid` flag, so the hand-rolled subset validators cannot drift apart. +func TestPayloadConformance(t *testing.T) { + raw, err := os.ReadFile(filepath.Join("..", "testdata", "conformance", "manifest.json")) + if err != nil { + t.Skipf("vendored conformance suite not present: %v", err) + } + + var manifest struct { + PayloadSchema *struct { + Schema json.RawMessage `json:"schema"` + Cases []struct { + Name string `json:"name"` + Valid bool `json:"valid"` + Data map[string]any `json:"data"` + } `json:"cases"` + } `json:"payload_schema"` + } + if err := json.Unmarshal(raw, &manifest); err != nil { + t.Fatalf("decode manifest: %v", err) + } + if manifest.PayloadSchema == nil { + t.Skip("manifest has no payload_schema section") + } + + s, err := Parse(manifest.PayloadSchema.Schema) + if err != nil { + t.Fatalf("parse schema: %v", err) + } + if len(manifest.PayloadSchema.Cases) == 0 { + t.Fatal("payload_schema has no cases") + } + for _, c := range manifest.PayloadSchema.Cases { + got := len(s.Validate(c.Data)) == 0 + if got != c.Valid { + t.Errorf("case %q: got valid=%v, want %v", c.Name, got, c.Valid) + } + } +} diff --git a/schema/consumer.go b/schema/consumer.go new file mode 100644 index 0000000..456849e --- /dev/null +++ b/schema/consumer.go @@ -0,0 +1,59 @@ +package schema + +import ( + "context" + "errors" + "fmt" + "strings" + + babelqueue "github.com/babelqueue/babelqueue-go" +) + +// ErrInvalidPayload is returned (wrapped) when a message's data does not match the schema +// registered for its URN. Detect it with errors.Is. +var ErrInvalidPayload = errors.New("babelqueue/schema: message data does not match its URN schema") + +// Check validates a (urn, data) pair against the schema registered for urn. It is the +// producer-side guard: call it before publishing so invalid data never enters the queue. +// It returns nil when the data is valid OR when no schema is registered for the URN +// (opt-in); a provider lookup error is returned wrapped (transient — e.g. the registry file +// is briefly unavailable during a deploy). +func Check(provider Provider, urn string, data map[string]any) error { + sch, found, err := provider.Schema(urn) + if err != nil { + return fmt.Errorf("schema: lookup %q: %w", urn, err) + } + if !found { + return nil + } + if violations := sch.Validate(data); len(violations) > 0 { + return fmt.Errorf("%w for %q: %s", ErrInvalidPayload, urn, strings.Join(violations, "; ")) + } + return nil +} + +// Validate is the envelope form of [Check]: it validates env.Data against env.URN()'s schema. +func Validate(provider Provider, env babelqueue.Envelope) error { + return Check(provider, env.URN(), env.Data) +} + +// Wrap returns handler decorated to validate each message's data against its URN schema +// before the handler runs (consumer-side safety net). It composes with the App's +// ack-on-return / retry-on-error contract: +// +// - valid data (or no schema registered for the URN) → the handler runs unchanged; +// - invalid data → [ErrInvalidPayload] is returned, so the message takes the retry / +// dead-letter path. Because invalid data will not become valid on retry, such a poison +// message exhausts its attempts and is dead-lettered — prefer [Check] producer-side to +// keep invalid data out of the queue entirely; +// - a provider lookup error → returned, so the message is retried once the source recovers. +// +// Register it like any handler: app.Handle(urn, schema.Wrap(provider, handler)). +func Wrap(provider Provider, handler babelqueue.Handler) babelqueue.Handler { + return func(ctx context.Context, env babelqueue.Envelope) error { + if err := Validate(provider, env); err != nil { + return err + } + return handler(ctx, env) + } +} diff --git a/schema/consumer_test.go b/schema/consumer_test.go new file mode 100644 index 0000000..7ba7977 --- /dev/null +++ b/schema/consumer_test.go @@ -0,0 +1,81 @@ +package schema + +import ( + "context" + "errors" + "testing" + + babelqueue "github.com/babelqueue/babelqueue-go" +) + +func newProvider(t *testing.T) Provider { + t.Helper() + p, err := NewMapProvider(map[string][]byte{ + "urn:babel:orders:created": []byte(`{"type":"object","required":["order_id"],"properties":{"order_id":{"type":"integer"}},"additionalProperties":false}`), + }) + if err != nil { + t.Fatal(err) + } + return p +} + +type errProvider struct{} + +func (errProvider) Schema(string) (*Schema, bool, error) { return nil, false, errors.New("boom") } + +func TestCheck(t *testing.T) { + p := newProvider(t) + if err := Check(p, "urn:babel:orders:created", map[string]any{"order_id": 1.0}); err != nil { + t.Fatalf("valid data should pass: %v", err) + } + err := Check(p, "urn:babel:orders:created", map[string]any{}) + if !errors.Is(err, ErrInvalidPayload) { + t.Fatalf("invalid data should wrap ErrInvalidPayload, got %v", err) + } + if err := Check(p, "urn:babel:unknown", map[string]any{"x": 1.0}); err != nil { + t.Fatalf("an unregistered urn should pass (opt-in), got %v", err) + } + if err := Check(errProvider{}, "u", map[string]any{}); err == nil { + t.Fatal("a provider lookup error should propagate") + } +} + +func TestValidate_Envelope(t *testing.T) { + p := newProvider(t) + env, _ := babelqueue.Make("urn:babel:orders:created", map[string]any{"order_id": 1.0}) + if err := Validate(p, env); err != nil { + t.Fatalf("valid envelope should pass: %v", err) + } + bad, _ := babelqueue.Make("urn:babel:orders:created", map[string]any{"order_id": "x"}) + if err := Validate(p, bad); !errors.Is(err, ErrInvalidPayload) { + t.Fatalf("invalid envelope should wrap ErrInvalidPayload, got %v", err) + } +} + +func TestWrap(t *testing.T) { + p := newProvider(t) + called := 0 + h := func(_ context.Context, _ babelqueue.Envelope) error { called++; return nil } + wrapped := Wrap(p, h) + ctx := context.Background() + + env, _ := babelqueue.Make("urn:babel:orders:created", map[string]any{"order_id": 1.0}) + if err := wrapped(ctx, env); err != nil || called != 1 { + t.Fatalf("valid data → handler runs: err=%v called=%d", err, called) + } + + bad, _ := babelqueue.Make("urn:babel:orders:created", map[string]any{}) + if err := wrapped(ctx, bad); !errors.Is(err, ErrInvalidPayload) || called != 1 { + t.Fatalf("invalid data → ErrInvalidPayload, handler skipped: err=%v called=%d", err, called) + } + + un, _ := babelqueue.Make("urn:babel:unknown", map[string]any{"x": 1.0}) + if err := wrapped(ctx, un); err != nil || called != 2 { + t.Fatalf("unregistered urn → handler runs: err=%v called=%d", err, called) + } + + wrappedErr := Wrap(errProvider{}, h) + if err := wrappedErr(ctx, env); err == nil || called != 2 { + t.Fatalf("provider error → propagate, handler skipped: err=%v called=%d", err, called) + } +} diff --git a/schema/provider.go b/schema/provider.go new file mode 100644 index 0000000..f05dac4 --- /dev/null +++ b/schema/provider.go @@ -0,0 +1,114 @@ +package schema + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" +) + +// Provider is a source of per-URN data schemas. The reference [MapProvider] is in-memory; +// [DirProvider] reads a babelqueue-registry registry.json. A production provider (a service +// client, an embedded bundle) implements the same single method. +type Provider interface { + // Schema returns the schema registered for urn. found is false when the URN has no + // registered schema, in which case the caller skips validation (opt-in). + Schema(urn string) (sch *Schema, found bool, err error) +} + +// MapProvider is an in-memory [Provider], suitable for tests and for embedding schemas in +// code. It is read-only after construction and therefore safe for concurrent use. +type MapProvider struct { + schemas map[string]*Schema +} + +// NewMapProvider builds a MapProvider from URN → raw JSON Schema bytes, parsing each. +func NewMapProvider(raw map[string][]byte) (*MapProvider, error) { + m := &MapProvider{schemas: make(map[string]*Schema, len(raw))} + for urn, body := range raw { + s, err := Parse(body) + if err != nil { + return nil, fmt.Errorf("schema: %q: %w", urn, err) + } + m.schemas[urn] = s + } + return m, nil +} + +// Schema implements [Provider]. +func (m *MapProvider) Schema(urn string) (*Schema, bool, error) { + s, ok := m.schemas[urn] + return s, ok, nil +} + +// DirProvider reads schemas from a babelqueue-registry manifest (registry.json): a list of +// {urn, schema} entries mapping each URN to a draft-07 schema file for its data block. This +// is the bridge that makes the registry's governed schemas enforceable at runtime. The +// manifest is read once; schema files are parsed lazily and cached. +type DirProvider struct { + dir string + files map[string]string // urn -> schema file path (relative to dir) + + mu sync.Mutex + cache map[string]*Schema +} + +type manifest struct { + Schemas []struct { + URN string `json:"urn"` + Schema string `json:"schema"` + } `json:"schemas"` +} + +// NewDirProvider loads the registry manifest at manifestPath (e.g. ".../registry.json"). +func NewDirProvider(manifestPath string) (*DirProvider, error) { + data, err := os.ReadFile(manifestPath) + if err != nil { + return nil, fmt.Errorf("schema: read %s: %w", manifestPath, err) + } + var man manifest + if err := json.Unmarshal(data, &man); err != nil { + return nil, fmt.Errorf("schema: parse %s: %w", manifestPath, err) + } + p := &DirProvider{ + dir: filepath.Dir(manifestPath), + files: make(map[string]string, len(man.Schemas)), + cache: make(map[string]*Schema), + } + for _, e := range man.Schemas { + if e.URN == "" || e.Schema == "" { + continue + } + p.files[e.URN] = e.Schema + } + return p, nil +} + +// Schema implements [Provider], reading and caching the schema file on first use. +func (p *DirProvider) Schema(urn string) (*Schema, bool, error) { + p.mu.Lock() + defer p.mu.Unlock() + + if s, ok := p.cache[urn]; ok { + return s, true, nil + } + file, ok := p.files[urn] + if !ok { + return nil, false, nil + } + path := file + if !filepath.IsAbs(path) { + path = filepath.Join(p.dir, file) + } + raw, err := os.ReadFile(path) + if err != nil { + return nil, true, fmt.Errorf("schema: read schema for %q (%s): %w", urn, file, err) + } + s, err := Parse(raw) + if err != nil { + return nil, true, fmt.Errorf("schema: %q: %w", urn, err) + } + p.cache[urn] = s + return s, true, nil +} diff --git a/schema/provider_test.go b/schema/provider_test.go new file mode 100644 index 0000000..d546d8c --- /dev/null +++ b/schema/provider_test.go @@ -0,0 +1,98 @@ +package schema + +import ( + "os" + "path/filepath" + "testing" +) + +func TestMapProvider(t *testing.T) { + p, err := NewMapProvider(map[string][]byte{ + "urn:babel:orders:created": []byte(`{"type":"object","required":["order_id"],"properties":{"order_id":{"type":"integer"}}}`), + }) + if err != nil { + t.Fatal(err) + } + s, found, err := p.Schema("urn:babel:orders:created") + if err != nil || !found { + t.Fatalf("found=%v err=%v", found, err) + } + if errs := s.Validate(map[string]any{"order_id": 1.0}); len(errs) != 0 { + t.Fatalf("valid payload rejected: %v", errs) + } + if _, found, _ := p.Schema("urn:babel:unknown"); found { + t.Fatal("an unregistered urn should be not-found") + } +} + +func TestNewMapProvider_BadSchema(t *testing.T) { + if _, err := NewMapProvider(map[string][]byte{"u": []byte("not json")}); err == nil { + t.Fatal("a bad schema should error") + } +} + +func writeFile(t *testing.T, path, body string) { + t.Helper() + if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { + t.Fatal(err) + } + if err := os.WriteFile(path, []byte(body), 0o644); err != nil { + t.Fatal(err) + } +} + +func TestDirProvider_LazyLoadAndCache(t *testing.T) { + dir := t.TempDir() + writeFile(t, filepath.Join(dir, "schemas/orders-created.json"), + `{"type":"object","required":["order_id"],"properties":{"order_id":{"type":"integer"}}}`) + // the empty-urn entry is ignored on load + writeFile(t, filepath.Join(dir, "registry.json"), + `{"schemas":[{"urn":"urn:babel:orders:created","schema":"schemas/orders-created.json"},{"urn":"","schema":"x"}]}`) + + p, err := NewDirProvider(filepath.Join(dir, "registry.json")) + if err != nil { + t.Fatal(err) + } + for i := 0; i < 2; i++ { // second call hits the cache + s, found, err := p.Schema("urn:babel:orders:created") + if err != nil || !found { + t.Fatalf("found=%v err=%v", found, err) + } + if errs := s.Validate(map[string]any{"order_id": 1.0}); len(errs) != 0 { + t.Fatalf("valid payload rejected: %v", errs) + } + } + if _, found, _ := p.Schema("urn:babel:unknown"); found { + t.Fatal("an unregistered urn should be not-found") + } +} + +func TestDirProvider_Errors(t *testing.T) { + if _, err := NewDirProvider(filepath.Join(t.TempDir(), "nope.json")); err == nil { + t.Fatal("a missing manifest should error") + } + + bad := t.TempDir() + writeFile(t, filepath.Join(bad, "registry.json"), `not json`) + if _, err := NewDirProvider(filepath.Join(bad, "registry.json")); err == nil { + t.Fatal("an invalid manifest should error") + } + + missing := t.TempDir() + writeFile(t, filepath.Join(missing, "registry.json"), `{"schemas":[{"urn":"u","schema":"missing.json"}]}`) + p, err := NewDirProvider(filepath.Join(missing, "registry.json")) + if err != nil { + t.Fatal(err) + } + if _, found, err := p.Schema("u"); !found || err == nil { + t.Fatalf("a missing schema file should report found=true with an error; got found=%v err=%v", found, err) + } + + invalid := t.TempDir() + writeFile(t, filepath.Join(invalid, "bad.json"), `not json`) + writeFile(t, filepath.Join(invalid, "registry.json"), `{"schemas":[{"urn":"u","schema":"bad.json"}]}`) + p2, _ := NewDirProvider(filepath.Join(invalid, "registry.json")) + if _, found, err := p2.Schema("u"); !found || err == nil { + t.Fatalf("an invalid schema file should report found=true with an error; got found=%v err=%v", found, err) + } +} diff --git a/schema/schema.go b/schema/schema.go new file mode 100644 index 0000000..fca340c --- /dev/null +++ b/schema/schema.go @@ -0,0 +1,233 @@ +// Package schema adds optional, dependency-free per-URN payload validation to a babelqueue +// producer or consumer (ADR-0024). A [Provider] supplies a JSON Schema for a message URN — +// typically read from a babelqueue-registry registry.json — and the message's data block is +// validated against it. It is opt-in: a URN with no registered schema is never validated. +// +// The validator is an intentionally small subset of JSON Schema (draft-07) — enough to +// describe a message's data, mirroring the subset babelqueue-registry and php-sdk's envelope +// validator enforce. It is NOT a full draft-07 implementation; unknown keywords are ignored. +// It lives in the core module (stdlib only, zero dependencies) so it ships with every SDK. +package schema + +import ( + "encoding/json" + "fmt" + "math" + "reflect" + "sort" +) + +// Schema is a parsed (subset) JSON Schema node. +type Schema struct { + Type string // object|string|integer|number|boolean|array|null; "" = unconstrained + Required []string // object: required property names + Properties map[string]*Schema // object: per-property schemas + AdditionalProperties *bool // object: nil = open (draft-07 default true) + Items *Schema // array: item schema + Enum []any // allowed values (nil = any) + Const any // fixed value when HasConst + HasConst bool + MinLength *int // string + Minimum *float64 // integer|number +} + +// Parse decodes a (subset) JSON Schema document. +func Parse(raw []byte) (*Schema, error) { + var m map[string]any + if err := json.Unmarshal(raw, &m); err != nil { + return nil, fmt.Errorf("schema: parse: %w", err) + } + return fromMap(m), nil +} + +func fromMap(m map[string]any) *Schema { + s := &Schema{} + if t, ok := m["type"].(string); ok { + s.Type = t + } + if req, ok := m["required"].([]any); ok { + for _, r := range req { + if name, ok := r.(string); ok { + s.Required = append(s.Required, name) + } + } + } + if props, ok := m["properties"].(map[string]any); ok { + s.Properties = make(map[string]*Schema, len(props)) + for name, ps := range props { + if pm, ok := ps.(map[string]any); ok { + s.Properties[name] = fromMap(pm) + } + } + } + if ap, ok := m["additionalProperties"].(bool); ok { + s.AdditionalProperties = &ap + } + if items, ok := m["items"].(map[string]any); ok { + s.Items = fromMap(items) + } + if enum, ok := m["enum"].([]any); ok { + s.Enum = enum + } + if c, ok := m["const"]; ok { + s.Const = c + s.HasConst = true + } + if ml, ok := m["minLength"].(float64); ok { + v := int(ml) + s.MinLength = &v + } + if min, ok := m["minimum"].(float64); ok { + v := min + s.Minimum = &v + } + return s +} + +// Validate checks value against the schema and returns a sorted list of human-readable +// violations. An empty result means the value is valid. +func (s *Schema) Validate(value any) []string { + var errs []string + s.validate("", value, &errs) + sort.Strings(errs) + return errs +} + +func (s *Schema) validate(path string, value any, errs *[]string) { + if s == nil { + return + } + at := func(msg string) string { + if path == "" { + return msg + } + return path + ": " + msg + } + + if s.HasConst && !reflect.DeepEqual(value, s.Const) { + *errs = append(*errs, at(fmt.Sprintf("must equal const %v", s.Const))) + } + if s.Enum != nil && !inEnum(value, s.Enum) { + *errs = append(*errs, at("value is not one of the allowed enum values")) + } + + switch s.Type { + case "object": + s.validateObject(path, value, errs, at) + case "array": + s.validateArray(path, value, errs, at) + case "string": + str, ok := value.(string) + if !ok { + *errs = append(*errs, at("must be a string")) + return + } + if s.MinLength != nil && len(str) < *s.MinLength { + *errs = append(*errs, at(fmt.Sprintf("must be at least %d characters", *s.MinLength))) + } + case "integer": + if !isInteger(value) { + *errs = append(*errs, at("must be an integer")) + return + } + s.checkMinimum(value, errs, at) + case "number": + if !isNumber(value) { + *errs = append(*errs, at("must be a number")) + return + } + s.checkMinimum(value, errs, at) + case "boolean": + if _, ok := value.(bool); !ok { + *errs = append(*errs, at("must be a boolean")) + } + case "null": + if value != nil { + *errs = append(*errs, at("must be null")) + } + } +} + +func (s *Schema) validateObject(path string, value any, errs *[]string, at func(string) string) { + obj, ok := value.(map[string]any) + if !ok { + *errs = append(*errs, at("must be an object")) + return + } + for _, req := range s.Required { + if _, present := obj[req]; !present { + *errs = append(*errs, at(fmt.Sprintf("missing required property %q", req))) + } + } + for key, v := range obj { + if sub, ok := s.Properties[key]; ok { + sub.validate(join(path, key), v, errs) + continue + } + if s.AdditionalProperties != nil && !*s.AdditionalProperties { + *errs = append(*errs, at(fmt.Sprintf("additional property %q is not allowed", key))) + } + } +} + +func (s *Schema) validateArray(path string, value any, errs *[]string, at func(string) string) { + arr, ok := value.([]any) + if !ok { + *errs = append(*errs, at("must be an array")) + return + } + if s.Items == nil { + return + } + for i, item := range arr { + s.Items.validate(fmt.Sprintf("%s[%d]", path, i), item, errs) + } +} + +func (s *Schema) checkMinimum(value any, errs *[]string, at func(string) string) { + if s.Minimum == nil { + return + } + if f, ok := toFloat(value); ok && f < *s.Minimum { + *errs = append(*errs, at(fmt.Sprintf("must be >= %v", *s.Minimum))) + } +} + +func join(path, key string) string { + if path == "" { + return key + } + return path + "." + key +} + +func inEnum(value any, enum []any) bool { + for _, e := range enum { + if reflect.DeepEqual(value, e) { + return true + } + } + return false +} + +func toFloat(value any) (float64, bool) { + switch n := value.(type) { + case float64: + return n, true + case int: + return float64(n), true + default: + return 0, false + } +} + +func isNumber(value any) bool { + _, ok := toFloat(value) + return ok +} + +// isInteger reports whether value is a JSON number with no fractional part. JSON decodes +// numbers to float64, so "1" arrives as 1.0 — an integer for our purposes. +func isInteger(value any) bool { + f, ok := toFloat(value) + return ok && math.Trunc(f) == f +} diff --git a/schema/schema_test.go b/schema/schema_test.go new file mode 100644 index 0000000..434087e --- /dev/null +++ b/schema/schema_test.go @@ -0,0 +1,105 @@ +package schema + +import "testing" + +func parse(t *testing.T, src string) *Schema { + t.Helper() + s, err := Parse([]byte(src)) + if err != nil { + t.Fatalf("parse: %v", err) + } + return s +} + +func TestValidate_ObjectRequiredTypesAndAdditional(t *testing.T) { + s := parse(t, `{ + "type":"object", + "required":["order_id"], + "properties":{"order_id":{"type":"integer"},"note":{"type":"string","minLength":1}}, + "additionalProperties":false + }`) + if errs := s.Validate(map[string]any{"order_id": 7.0}); len(errs) != 0 { + t.Fatalf("expected valid, got %v", errs) + } + if errs := s.Validate(map[string]any{}); len(errs) == 0 { + t.Fatal("expected a missing-required violation") + } + if errs := s.Validate(map[string]any{"order_id": "x"}); len(errs) == 0 { + t.Fatal("expected an integer-type violation") + } + if errs := s.Validate(map[string]any{"order_id": 7.0, "extra": 1.0}); len(errs) == 0 { + t.Fatal("expected an additionalProperties violation") + } + if errs := s.Validate(map[string]any{"order_id": 7.0, "note": ""}); len(errs) == 0 { + t.Fatal("expected a minLength violation") + } +} + +func TestValidate_EnumConstMinimumArray(t *testing.T) { + s := parse(t, `{ + "type":"object", + "properties":{ + "status":{"enum":["new","paid"]}, + "qty":{"type":"integer","minimum":1}, + "tags":{"type":"array","items":{"type":"string"}} + } + }`) + if errs := s.Validate(map[string]any{"status": "paid", "qty": 2.0, "tags": []any{"a", "b"}}); len(errs) != 0 { + t.Fatalf("expected valid, got %v", errs) + } + if errs := s.Validate(map[string]any{"status": "cancelled"}); len(errs) == 0 { + t.Fatal("expected an enum violation") + } + if errs := s.Validate(map[string]any{"qty": 0.0}); len(errs) == 0 { + t.Fatal("expected a minimum violation") + } + if errs := s.Validate(map[string]any{"tags": []any{"a", 1.0}}); len(errs) == 0 { + t.Fatal("expected an array-item type violation") + } +} + +func TestValidate_Const(t *testing.T) { + s := parse(t, `{"const":"v1"}`) + if errs := s.Validate("v1"); len(errs) != 0 { + t.Fatalf("matching const should validate: %v", errs) + } + if errs := s.Validate("v2"); len(errs) == 0 { + t.Fatal("mismatched const should fail") + } +} + +func TestValidate_ScalarTypes(t *testing.T) { + cases := []struct { + src string + value any + valid bool + }{ + {`{"type":"boolean"}`, true, true}, + {`{"type":"boolean"}`, "x", false}, + {`{"type":"null"}`, nil, true}, + {`{"type":"null"}`, 1.0, false}, + {`{"type":"number","minimum":0.5}`, 0.6, true}, + {`{"type":"number","minimum":0.5}`, 0.4, false}, + {`{"type":"number"}`, "x", false}, + {`{"type":"string"}`, 5.0, false}, + {`{"type":"integer"}`, 1.0, true}, + {`{"type":"integer"}`, 1.5, false}, + {`{"type":"object"}`, "x", false}, + {`{"type":"array"}`, "x", false}, + } + for _, c := range cases { + errs := parse(t, c.src).Validate(c.value) + if c.valid && len(errs) != 0 { + t.Errorf("%s with %v: expected valid, got %v", c.src, c.value, errs) + } + if !c.valid && len(errs) == 0 { + t.Errorf("%s with %v: expected a violation, got none", c.src, c.value) + } + } +} + +func TestParse_Invalid(t *testing.T) { + if _, err := Parse([]byte("not json")); err == nil { + t.Fatal("invalid JSON should error") + } +} diff --git a/testdata/conformance/manifest.json b/testdata/conformance/manifest.json index 78e5c3a..5b2fee4 100644 --- a/testdata/conformance/manifest.json +++ b/testdata/conformance/manifest.json @@ -226,5 +226,28 @@ "x-attempts": 0 } } + }, + "payload_schema": { + "description": "Per-URN data schema validation (ADR-0024). Each case validates `data` against `schema`; every SDK's optional payload validator (Go schema, PHP BabelQueue\\Schema, Python babelqueue.schema) MUST agree on `valid`. The wire envelope stays frozen — this governs the data block only, and is opt-in (consumers/producers without a registered schema skip it).", + "schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "required": ["order_id"], + "properties": { + "order_id": { "type": "integer", "minimum": 1 }, + "amount": { "type": "number", "minimum": 0 }, + "currency": { "enum": ["USD", "EUR", "TRY"] } + }, + "additionalProperties": false + }, + "cases": [ + { "name": "valid-minimal", "valid": true, "data": { "order_id": 1042 } }, + { "name": "valid-full", "valid": true, "data": { "order_id": 1042, "amount": 99.9, "currency": "USD" } }, + { "name": "invalid-missing-required", "valid": false, "data": { "amount": 10 } }, + { "name": "invalid-wrong-type", "valid": false, "data": { "order_id": "x" } }, + { "name": "invalid-additional-property", "valid": false, "data": { "order_id": 1, "extra": true } }, + { "name": "invalid-enum", "valid": false, "data": { "order_id": 1, "currency": "GBP" } }, + { "name": "invalid-below-minimum", "valid": false, "data": { "order_id": 0 } } + ] } }