diff --git a/server/internal/config/config.go b/server/internal/config/config.go index 86ecd18d..a9c1316b 100644 --- a/server/internal/config/config.go +++ b/server/internal/config/config.go @@ -82,6 +82,12 @@ type DockerSwarm struct { BridgeNetworksSubnetBits int `koanf:"bridge_networks_subnet_bits" json:"bridge_networks_subnet_bits,omitempty"` DatabaseNetworksCIDR string `koanf:"database_networks_cidr" json:"database_networks_cidr,omitempty"` DatabaseNetworksSubnetBits int `koanf:"database_networks_subnet_bits" json:"database_networks_subnet_bits,omitempty"` + // ManifestURL is the URL from which the version manifest is fetched. + // Defaults to the pgEdge CDN URL if not set. + ManifestURL string `koanf:"manifest_url" json:"manifest_url,omitempty"` + // ManifestPath points to a local manifest file that bypasses URL fetching + // entirely. Useful for air-gapped environments or testing. + ManifestPath string `koanf:"manifest_path" json:"manifest_path,omitempty"` } func (d DockerSwarm) validate() []error { @@ -102,6 +108,10 @@ func (d DockerSwarm) validate() []error { return errs } +// DefaultManifestURL is the pgEdge CDN URL used when no manifest_url is configured. +// TODO(PLAT-598): Replace with the real URL once the hosting location is confirmed. +const DefaultManifestURL = "https://download.pgedge.com/manifests/version-manifest.json" + var defaultDockerSwarm = DockerSwarm{ ImageRepositoryHost: "ghcr.io/pgedge", // This combination gives us 256 subnets with 16 addresses each. @@ -110,6 +120,7 @@ var defaultDockerSwarm = DockerSwarm{ // This combination gives us 256 subnets with 64 addresses each. DatabaseNetworksCIDR: "10.128.128.0/18", DatabaseNetworksSubnetBits: 26, + ManifestURL: DefaultManifestURL, } type SystemD struct { diff --git a/server/internal/database/orchestrator.go b/server/internal/database/orchestrator.go index 8f42a569..e101bbb2 100644 --- a/server/internal/database/orchestrator.go +++ b/server/internal/database/orchestrator.go @@ -181,4 +181,9 @@ type Orchestrator interface { StartInstance(ctx context.Context, instanceID string) error NodeDSN(ctx context.Context, rc *resource.Context, nodeName string, fromInstanceID string, dbName string) (*postgres.DSN, error) InstancePaths(pgVersion *ds.Version, instanceID string) (InstancePaths, error) + // ReconcileInstanceSpec is called during spec reconciliation to allow the + // orchestrator to update computed fields (e.g. resolved image) on the new + // spec before it is persisted. old is nil when the instance is being created + // for the first time. + ReconcileInstanceSpec(old, new *InstanceSpec) error } diff --git a/server/internal/database/service.go b/server/internal/database/service.go index d1bc1590..b2287d4c 100644 --- a/server/internal/database/service.go +++ b/server/internal/database/service.go @@ -669,7 +669,13 @@ func (s *Service) ReconcileInstanceSpec(ctx context.Context, spec *InstanceSpec) case err == nil: previous = stored.Spec spec.CopySettingsFrom(previous) + if err := s.orchestrator.ReconcileInstanceSpec(previous, spec); err != nil { + return nil, fmt.Errorf("failed to reconcile instance spec: %w", err) + } case errors.Is(err, storage.ErrNotFound): + if err := s.orchestrator.ReconcileInstanceSpec(nil, spec); err != nil { + return nil, fmt.Errorf("failed to reconcile instance spec: %w", err) + } stored = &StoredInstanceSpec{} default: return nil, fmt.Errorf("failed to get current spec for instance '%s': %w", spec.InstanceID, err) diff --git a/server/internal/database/spec.go b/server/internal/database/spec.go index e573263b..99b5b930 100644 --- a/server/internal/database/spec.go +++ b/server/internal/database/spec.go @@ -30,7 +30,12 @@ type ExtraNetworkSpec struct { type SwarmOpts struct { ExtraVolumes []ExtraVolumesSpec `json:"extra_volumes,omitempty"` ExtraNetworks []ExtraNetworkSpec `json:"extra_networks,omitempty"` - ExtraLabels map[string]string `json:"extra_labels,omitempty"` // optional, used for custom labels on the swarm service + ExtraLabels map[string]string `json:"extra_labels,omitempty"` + // Image is a user-specified override. Never written by the CP. + Image string `json:"image,omitempty"` + // ResolvedImage is the CP-managed image tag. Written at instance creation, + // upgrade application, and lazy backfill. Never set simultaneously with Image. + ResolvedImage string `json:"resolved_image,omitempty"` } type OrchestratorOpts struct { Swarm *SwarmOpts `json:"docker,omitempty"` @@ -301,6 +306,8 @@ func (d *SwarmOpts) Clone() *SwarmOpts { ExtraVolumes: clonedVolumes, ExtraNetworks: clonedNetworks, ExtraLabels: maps.Clone(d.ExtraLabels), + Image: d.Image, + ResolvedImage: d.ResolvedImage, } } diff --git a/server/internal/database/spec_test.go b/server/internal/database/spec_test.go index 9acdbf23..a8451972 100644 --- a/server/internal/database/spec_test.go +++ b/server/internal/database/spec_test.go @@ -510,6 +510,38 @@ func TestSpec(t *testing.T) { }) } +func TestSwarmOptsClone(t *testing.T) { + t.Run("copies Image and ResolvedImage", func(t *testing.T) { + orig := &database.SwarmOpts{ + Image: "custom-registry/pgedge:dev", + ResolvedImage: "registry/pgedge:17.9-spock5.0.6-standard-1", + ExtraLabels: map[string]string{"k": "v"}, + } + cloned := orig.Clone() + + assert.Equal(t, orig.Image, cloned.Image) + assert.Equal(t, orig.ResolvedImage, cloned.ResolvedImage) + }) + + t.Run("clone is independent of original", func(t *testing.T) { + orig := &database.SwarmOpts{ + Image: "original-image", + ResolvedImage: "original-resolved", + } + cloned := orig.Clone() + cloned.Image = "mutated-image" + cloned.ResolvedImage = "mutated-resolved" + + assert.Equal(t, "original-image", orig.Image) + assert.Equal(t, "original-resolved", orig.ResolvedImage) + }) + + t.Run("nil clone returns nil", func(t *testing.T) { + var s *database.SwarmOpts + assert.Nil(t, s.Clone()) + }) +} + func TestSpec_NodeInstances_DBOwner(t *testing.T) { minimalSpec := func(users []*database.User) *database.Spec { return &database.Spec{ diff --git a/server/internal/logging/factory.go b/server/internal/logging/factory.go index dbe9ede5..7532c8ff 100644 --- a/server/internal/logging/factory.go +++ b/server/internal/logging/factory.go @@ -19,6 +19,7 @@ const ( ComponentDatabaseService Component = "database_service" ComponentElectionCandidate Component = "election_candidate" ComponentEmbeddedEtcd Component = "embedded_etcd" + ComponentManifestLoader Component = "manifest_loader" ComponentMigration Component = "migration" ComponentMigrationRunner Component = "migration_runner" ComponentPortsService Component = "ports_service" diff --git a/server/internal/orchestrator/swarm/images.go b/server/internal/orchestrator/swarm/images.go index 4388a727..7d61ac78 100644 --- a/server/internal/orchestrator/swarm/images.go +++ b/server/internal/orchestrator/swarm/images.go @@ -80,11 +80,11 @@ func NewVersions(cfg config.Config) *Versions { return versions } -func (v *Versions) Supported() []*ds.PgEdgeVersion { +func (v Versions) Supported() []*ds.PgEdgeVersion { return v.supportedVersions } -func (v *Versions) Default() *ds.PgEdgeVersion { +func (v Versions) Default() *ds.PgEdgeVersion { return v.defaultVersion } @@ -100,7 +100,7 @@ func (v *Versions) addImage(version *ds.PgEdgeVersion, images *Images) { v.supportedVersions = append(v.supportedVersions, version) } -func (v *Versions) GetImages(version *ds.PgEdgeVersion) (*Images, error) { +func (v Versions) GetImages(version *ds.PgEdgeVersion) (*Images, error) { pgv := version.PostgresVersion.String() sv := version.SpockVersion.String() diff --git a/server/internal/orchestrator/swarm/manifest_loader.go b/server/internal/orchestrator/swarm/manifest_loader.go new file mode 100644 index 00000000..238a1dab --- /dev/null +++ b/server/internal/orchestrator/swarm/manifest_loader.go @@ -0,0 +1,426 @@ +package swarm + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "sync" + "time" + + _ "embed" + + "github.com/rs/zerolog" + + "github.com/pgEdge/control-plane/server/internal/config" + "github.com/pgEdge/control-plane/server/internal/ds" + "github.com/pgEdge/control-plane/server/internal/logging" +) + +//go:embed version-manifest.json +var embeddedManifest []byte + +const ( + refreshInterval = time.Hour + fetchTimeout = 10 * time.Second + supportedSchemaVersion = 1 +) + +// manifestCachePath returns the cache file path for a given manifest URL. +// The filename embeds a short hash of the URL so that switching to a different +// URL automatically uses a fresh cache file instead of a stale one. +func manifestCachePath(dir, url string) string { + sum := sha256.Sum256([]byte(url)) + return filepath.Join(dir, fmt.Sprintf("version-manifest-cache-%s.json", hex.EncodeToString(sum[:8]))) +} + +// versionManifestImages is the typed images block inside version-manifest.json. +type versionManifestImages struct { + Postgres []manifestPostgresEntry `json:"postgres"` + PostgREST []manifestServiceEntry `json:"postgrest"` + MCP []manifestServiceEntry `json:"mcp"` + RAG []manifestServiceEntry `json:"rag"` +} + +// versionManifest is the top-level structure of version-manifest.json. +type versionManifest struct { + SchemaVersion int `json:"schema_version"` + Images versionManifestImages `json:"images"` +} + +// manifestPostgresEntry is one entry under images.postgres in the manifest. +type manifestPostgresEntry struct { + PostgresVersion string `json:"postgres_version"` + SpockVersion string `json:"spock_version"` + Image string `json:"image"` + Stability string `json:"stability"` + Default bool `json:"default"` +} + +// manifestServiceEntry is one entry under images. in the manifest. +type manifestServiceEntry struct { + Version string `json:"version"` + Image string `json:"image"` + Stability string `json:"stability"` + Default bool `json:"default"` +} + +// manifestLoaderOption is a functional option for ManifestLoader, used in +// tests to inject test doubles. +type manifestLoaderOption func(*ManifestLoader) + +func withCachePath(path string) manifestLoaderOption { + return func(m *ManifestLoader) { m.cachePath = path } +} + +func withHTTPClient(c *http.Client) manifestLoaderOption { + return func(m *ManifestLoader) { m.httpClient = c } +} + +func withTickerC(ch <-chan time.Time) manifestLoaderOption { + return func(m *ManifestLoader) { m.tickerC = ch } +} + +// withEmbeddedFallback enables the embedded-manifest fallback regardless of +// the configured URL. Used in tests that simulate URL failure while still +// exercising the default-URL resolution chain. +func withEmbeddedFallback() manifestLoaderOption { + return func(m *ManifestLoader) { m.embeddedFallback = true } +} + +// ManifestLoader loads the version manifest from a remote URL (with disk +// caching and hourly refresh) or from a local file, and exposes the parsed +// *Versions and *ServiceVersions it produces. +// +// Resolution chains: +// +// 1. manifest_path set (local file): local file only — no fallback, returns +// error if the file is missing or invalid. +// +// 2. Custom manifest_url (differs from DefaultManifestURL): remote URL → +// disk cache — no embedded fallback; returns error if all sources fail. +// +// 3. Default manifest_url: remote URL → disk cache → embedded manifest +// (always succeeds; panics only if the embedded JSON is corrupt, which +// indicates a broken build). +type ManifestLoader struct { + cfg config.Config + logger zerolog.Logger + cachePath string + httpClient *http.Client + tickerC <-chan time.Time // nil → use default hourly ticker; injectable for tests + embeddedFallback bool // set when using the default URL; injectable for tests + + mu sync.RWMutex + versions *Versions + svcVersions *ServiceVersions +} + +// NewManifestLoader creates and starts a ManifestLoader. It loads the +// manifest synchronously before returning so callers always get a valid +// *Versions / *ServiceVersions immediately. Returns an error when using a +// custom manifest_path or manifest_url and the manifest cannot be loaded. +// Background refresh (if applicable) is started as a goroutine tied to ctx. +func NewManifestLoader(ctx context.Context, cfg config.Config, loggerFactory *logging.Factory, opts ...manifestLoaderOption) (*ManifestLoader, error) { + m := &ManifestLoader{ + cfg: cfg, + logger: loggerFactory.Logger(logging.ComponentManifestLoader), + cachePath: manifestCachePath(filepath.Join(cfg.DataDir, "manifests"), cfg.DockerSwarm.ManifestURL), + embeddedFallback: cfg.DockerSwarm.ManifestURL == config.DefaultManifestURL, + httpClient: &http.Client{ + Timeout: fetchTimeout, + }, + } + for _, o := range opts { + o(m) + } + + if err := m.load(ctx); err != nil { + return nil, err + } + + if cfg.DockerSwarm.ManifestPath == "" { + go m.refreshLoop(ctx) + } + + return m, nil +} + +// Versions returns a snapshot of the current parsed Versions. +func (m *ManifestLoader) Versions() Versions { + m.mu.RLock() + defer m.mu.RUnlock() + return *m.versions +} + +// ServiceVersions returns a snapshot of the current parsed ServiceVersions. +func (m *ManifestLoader) ServiceVersions() ServiceVersions { + m.mu.RLock() + defer m.mu.RUnlock() + return *m.svcVersions +} + +// load performs the initial synchronous load. +func (m *ManifestLoader) load(ctx context.Context) error { + data, src, err := m.resolve(ctx) + if err != nil { + return err + } + v, sv, err := m.parseManifestData(data) + if err != nil { + if src == "embedded" { + // The embedded manifest is corrupt — this is a build-time error. + panic(fmt.Sprintf("manifest_loader: corrupt embedded manifest: %v", err)) + } + return fmt.Errorf("parse manifest from %s: %w", src, err) + } + m.mu.Lock() + m.versions = v + m.svcVersions = sv + m.mu.Unlock() + m.logger.Info().Str("source", src).Msg("version manifest loaded") + return nil +} + +// resolve selects the appropriate resolution chain and returns raw manifest +// bytes plus a human-readable source label. +func (m *ManifestLoader) resolve(ctx context.Context) ([]byte, string, error) { + if p := m.cfg.DockerSwarm.ManifestPath; p != "" { + return m.resolveLocalPath(p) + } + return m.resolveURL(ctx) +} + +// resolveLocalPath reads and validates a manifest from a local file path. +// No fallback — returns an error if the file is missing or invalid. +func (m *ManifestLoader) resolveLocalPath(p string) ([]byte, string, error) { + data, err := os.ReadFile(p) + if err != nil { + return nil, "", fmt.Errorf("manifest_path %s: %w", p, err) + } + if err = m.validateManifest(data); err != nil { + return nil, "", fmt.Errorf("manifest_path %s validation failed: %w", p, err) + } + return data, "file:" + p, nil +} + +// resolveURL tries to load the manifest from the configured URL, falling back +// to the disk cache. For the default URL it also falls back to the embedded +// manifest. For a custom URL it returns an error if all sources fail so the +// operator knows their configuration is broken. +func (m *ManifestLoader) resolveURL(ctx context.Context) ([]byte, string, error) { + u := m.cfg.DockerSwarm.ManifestURL + + data, err := m.fetchURL(ctx, u) + if err != nil { + m.logger.Warn().Err(err).Str("url", u).Msg("failed to fetch manifest from URL; trying disk cache") + } else if err = m.validateManifest(data); err != nil { + m.logger.Warn().Err(err).Str("url", u).Msg("remote manifest validation failed; trying disk cache") + } else { + _ = m.writeCache(data) + return data, "url:" + u, nil + } + + if cached, err := os.ReadFile(m.cachePath); err == nil { + if err = m.validateManifest(cached); err != nil { + m.logger.Warn().Err(err).Str("path", m.cachePath).Msg("cached manifest validation failed; skipping cache") + } else { + m.logger.Info().Str("path", m.cachePath).Msg("using cached manifest") + return cached, "cache:" + m.cachePath, nil + } + } + + if m.embeddedFallback { + m.logger.Warn().Msg("using embedded manifest; consider checking connectivity to manifest URL") + return embeddedManifest, "embedded", nil + } + + return nil, "", fmt.Errorf("failed to load manifest from %s: URL and cache both unavailable", u) +} + +// fetchURL fetches bytes from the given URL using the configured http.Client. +func (m *ManifestLoader) fetchURL(ctx context.Context, url string) ([]byte, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, err + } + resp, err := m.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected HTTP status %d", resp.StatusCode) + } + return io.ReadAll(resp.Body) +} + +// writeCache persists data to the cache path, creating parent directories as +// needed. Errors are logged but not returned (caching is best-effort). +func (m *ManifestLoader) writeCache(data []byte) error { + if err := os.MkdirAll(filepath.Dir(m.cachePath), 0o700); err != nil { + m.logger.Warn().Err(err).Str("path", m.cachePath).Msg("failed to create manifest cache directory") + return err + } + if err := os.WriteFile(m.cachePath, data, 0o600); err != nil { + m.logger.Warn().Err(err).Str("path", m.cachePath).Msg("failed to write manifest cache") + return err + } + return nil +} + +// refreshLoop runs in the background and refreshes the manifest every hour. +// It stops when ctx is cancelled. +func (m *ManifestLoader) refreshLoop(ctx context.Context) { + tickC := m.tickerC + var ticker *time.Ticker + if tickC == nil { + ticker = time.NewTicker(refreshInterval) + defer ticker.Stop() + tickC = ticker.C + } + for { + select { + case <-ctx.Done(): + return + case <-tickC: + m.refresh(ctx) + } + } +} + +// refresh fetches a fresh manifest from the URL, validates it, updates the +// disk cache, and swaps the in-memory versions atomically. Failures are +// logged but do not disturb the current in-memory versions. +func (m *ManifestLoader) refresh(ctx context.Context) { + u := m.cfg.DockerSwarm.ManifestURL + data, err := m.fetchURL(ctx, u) + if err != nil { + m.logger.Warn().Err(err).Str("url", u).Msg("manifest refresh: fetch failed; keeping current versions") + return + } + if err = m.validateManifest(data); err != nil { + m.logger.Warn().Err(err).Str("url", u).Msg("manifest refresh: validation failed; keeping current versions") + return + } + v, sv, err := m.parseManifestData(data) + if err != nil { + m.logger.Warn().Err(err).Str("url", u).Msg("manifest refresh: parse failed; keeping current versions") + return + } + _ = m.writeCache(data) + + m.mu.Lock() + m.versions = v + m.svcVersions = sv + m.mu.Unlock() + m.logger.Info().Str("url", u).Msg("version manifest refreshed") +} + +// validateManifest checks that data is valid JSON, has a supported +// schema_version, and can be fully parsed into *Versions/*ServiceVersions. +// Performing a full parse here ensures resolve() only returns data that will +// succeed in parseManifestData. +func (m *ManifestLoader) validateManifest(data []byte) error { + var mf versionManifest + if err := json.Unmarshal(data, &mf); err != nil { + return fmt.Errorf("invalid JSON: %w", err) + } + if mf.SchemaVersion != supportedSchemaVersion { + return fmt.Errorf("unsupported schema_version %d (want %d)", mf.SchemaVersion, supportedSchemaVersion) + } + if _, _, err := m.parseManifestData(data); err != nil { + return fmt.Errorf("manifest not parseable: %w", err) + } + return nil +} + +// parseManifestData unmarshals data into *Versions and *ServiceVersions. +func (m *ManifestLoader) parseManifestData(data []byte) (*Versions, *ServiceVersions, error) { + var mf versionManifest + if err := json.Unmarshal(data, &mf); err != nil { + return nil, nil, fmt.Errorf("unmarshal manifest: %w", err) + } + + v, err := buildVersions(m.cfg, &mf) + if err != nil { + return nil, nil, fmt.Errorf("build versions: %w", err) + } + + sv, err := buildServiceVersions(m.cfg, &mf) + if err != nil { + return nil, nil, fmt.Errorf("build service versions: %w", err) + } + + return v, sv, nil +} + +// buildVersions parses the images.postgres section of the manifest and +// returns a *Versions equivalent to NewVersions(cfg) for the same data. +func buildVersions(cfg config.Config, mf *versionManifest) (*Versions, error) { + entries := mf.Images.Postgres + if len(entries) == 0 { + return nil, fmt.Errorf("manifest missing images.postgres") + } + + versions := &Versions{ + cfg: cfg, + images: make(map[string]map[string]*Images), + } + + var defaultVer *ds.PgEdgeVersion + for _, e := range entries { + pv, err := ds.ParsePgEdgeVersion(e.PostgresVersion, e.SpockVersion) + if err != nil { + return nil, fmt.Errorf("invalid version entry {postgres:%s spock:%s}: %w", + e.PostgresVersion, e.SpockVersion, err) + } + img := &Images{ + PgEdgeImage: serviceImageTag(cfg, e.Image), + } + versions.addImage(pv, img) + if e.Default { + defaultVer = pv + } + } + + if defaultVer == nil { + // Fall back to the last entry if no default is marked. + defaultVer = versions.supportedVersions[len(versions.supportedVersions)-1] + } + versions.defaultVersion = defaultVer + + return versions, nil +} + +// buildServiceVersions parses all non-postgres image sections and returns a +// *ServiceVersions equivalent to NewServiceVersions(cfg) for the same data. +func buildServiceVersions(cfg config.Config, mf *versionManifest) (*ServiceVersions, error) { + sv := &ServiceVersions{ + cfg: cfg, + images: make(map[string]map[string]*ServiceImage), + } + + type svcSection struct { + name string + entries []manifestServiceEntry + } + for _, s := range []svcSection{ + {"postgrest", mf.Images.PostgREST}, + {"mcp", mf.Images.MCP}, + {"rag", mf.Images.RAG}, + } { + for _, e := range s.entries { + sv.addServiceImage(s.name, e.Version, &ServiceImage{ + Tag: serviceImageTag(cfg, e.Image), + }) + } + } + + return sv, nil +} diff --git a/server/internal/orchestrator/swarm/manifest_loader_test.go b/server/internal/orchestrator/swarm/manifest_loader_test.go new file mode 100644 index 00000000..36260f03 --- /dev/null +++ b/server/internal/orchestrator/swarm/manifest_loader_test.go @@ -0,0 +1,695 @@ +package swarm + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/pgEdge/control-plane/server/internal/config" + "github.com/pgEdge/control-plane/server/internal/testutils" +) + +// testCfg returns a config with an isolated cache directory in t.TempDir(). +func testCfg(t *testing.T, extra ...func(*config.DockerSwarm)) (config.Config, string) { + t.Helper() + cacheDir := t.TempDir() + cfg := config.Config{ + DockerSwarm: config.DockerSwarm{ + ImageRepositoryHost: "ghcr.io/pgedge", + }, + } + for _, fn := range extra { + fn(&cfg.DockerSwarm) + } + return cfg, filepath.Join(cacheDir, "manifest-cache.json") +} + +// validManifest returns a well-formed manifest JSON matching the embedded one. +func validManifest(t *testing.T) []byte { + t.Helper() + data, err := json.Marshal(map[string]any{ + "schema_version": 1, + "images": map[string]any{ + "postgres": []map[string]any{ + { + "postgres_version": "17.10", + "spock_version": "5", + "image": "pgedge-postgres:17.10-spock5.0.8-standard-1", + "stability": "stable", + "default": true, + }, + }, + "mcp": []map[string]any{ + {"version": "latest", "image": "postgres-mcp:latest", "stability": "stable", "default": true}, + }, + "postgrest": []map[string]any{ + {"version": "14.5", "image": "postgrest:14.5", "stability": "stable", "default": true}, + }, + "rag": []map[string]any{ + {"version": "latest", "image": "rag-server:latest", "stability": "stable", "default": true}, + }, + }, + }) + if err != nil { + t.Fatalf("marshal test manifest: %v", err) + } + return data +} + +// TestManifestLoader_LoadFromEmbedded verifies the loader falls back to the +// embedded manifest when no URL, cache, or manifest_path is available. +func TestManifestLoader_LoadFromEmbedded(t *testing.T) { + cfg, cachePath := testCfg(t) + // Point at a server that always 500s to force URL failure. + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer srv.Close() + cfg.DockerSwarm.ManifestURL = srv.URL + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + loader, err := NewManifestLoader(ctx, cfg, testutils.LoggerFactory(t), + withCachePath(cachePath), + withHTTPClient(srv.Client()), + withEmbeddedFallback(), + ) + if err != nil { + t.Fatalf("NewManifestLoader: %v", err) + } + + v := loader.Versions() + if v.Default() == nil { + t.Fatal("expected non-nil default version from embedded manifest") + } + if len(v.Supported()) == 0 { + t.Fatal("expected at least one supported version from embedded manifest") + } + + sv := loader.ServiceVersions() + if _, err := sv.SupportedServiceVersions("mcp"); err != nil { + t.Fatalf("expected ServiceVersions to be populated from embedded manifest: %v", err) + } +} + +// TestManifestLoader_LoadFromURL verifies the happy-path URL fetch. +func TestManifestLoader_LoadFromURL(t *testing.T) { + cfg, cachePath := testCfg(t) + manifest := validManifest(t) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write(manifest) + })) + defer srv.Close() + cfg.DockerSwarm.ManifestURL = srv.URL + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + loader, err := NewManifestLoader(ctx, cfg, testutils.LoggerFactory(t), + withCachePath(cachePath), + withHTTPClient(srv.Client()), + ) + if err != nil { + t.Fatalf("NewManifestLoader: %v", err) + } + + v := loader.Versions() + def := v.Default() + if def == nil { + t.Fatal("expected non-nil default version") + } + if def.PostgresVersion.String() != "17.10" { + t.Errorf("default postgres version = %s, want 17.10", def.PostgresVersion) + } + + // Cache should have been written. + if _, err := os.Stat(cachePath); err != nil { + t.Errorf("expected cache file at %s: %v", cachePath, err) + } +} + +// TestManifestLoader_LoadFromCache verifies that a stale URL causes the loader +// to fall back to the disk cache. +func TestManifestLoader_LoadFromCache(t *testing.T) { + cfg, cachePath := testCfg(t) + manifest := validManifest(t) + + // Pre-populate the cache. + if err := os.MkdirAll(filepath.Dir(cachePath), 0o700); err != nil { + t.Fatal(err) + } + if err := os.WriteFile(cachePath, manifest, 0o600); err != nil { + t.Fatal(err) + } + + // URL always fails. + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusServiceUnavailable) + })) + defer srv.Close() + cfg.DockerSwarm.ManifestURL = srv.URL + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + loader, err := NewManifestLoader(ctx, cfg, testutils.LoggerFactory(t), + withCachePath(cachePath), + withHTTPClient(srv.Client()), + ) + if err != nil { + t.Fatalf("NewManifestLoader: %v", err) + } + + v := loader.Versions() + if v.Default() == nil { + t.Fatal("expected non-nil default version from cache") + } +} + +// TestManifestLoader_CustomURLNoFallbackToEmbedded verifies that a custom +// manifest_url that fails does NOT fall back to the embedded manifest and +// instead returns an error. +func TestManifestLoader_CustomURLNoFallbackToEmbedded(t *testing.T) { + cfg, cachePath := testCfg(t) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer srv.Close() + // Use a non-default URL to trigger the custom-URL chain. + cfg.DockerSwarm.ManifestURL = srv.URL + "/custom" + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, err := NewManifestLoader(ctx, cfg, testutils.LoggerFactory(t), + withCachePath(cachePath), + withHTTPClient(srv.Client()), + ) + if err == nil { + t.Fatal("expected error for custom URL with no cache and failing server") + } +} + +// TestManifestLoader_LoadFromManifestPath verifies the local file override. +func TestManifestLoader_LoadFromManifestPath(t *testing.T) { + manifest := validManifest(t) + mfFile := filepath.Join(t.TempDir(), "local-manifest.json") + if err := os.WriteFile(mfFile, manifest, 0o644); err != nil { + t.Fatal(err) + } + + _, cachePath := testCfg(t) + cfg := config.Config{ + DockerSwarm: config.DockerSwarm{ + ImageRepositoryHost: "ghcr.io/pgedge", + ManifestPath: mfFile, + }, + } + + loader, err := NewManifestLoader(context.Background(), cfg, testutils.LoggerFactory(t), + withCachePath(cachePath), + ) + if err != nil { + t.Fatalf("NewManifestLoader: %v", err) + } + + v := loader.Versions() + if v.Default().PostgresVersion.String() != "17.10" { + t.Errorf("default version = %s, want 17.10", v.Default().PostgresVersion) + } +} + +// TestManifestLoader_ManifestPathMissing verifies that a missing manifest_path +// returns an error (no fallback to embedded). +func TestManifestLoader_ManifestPathMissing(t *testing.T) { + _, cachePath := testCfg(t) + cfg := config.Config{ + DockerSwarm: config.DockerSwarm{ + ImageRepositoryHost: "ghcr.io/pgedge", + ManifestPath: "/does/not/exist/manifest.json", + }, + } + + _, err := NewManifestLoader(context.Background(), cfg, testutils.LoggerFactory(t), + withCachePath(cachePath), + ) + if err == nil { + t.Fatal("expected error when manifest_path points to a non-existent file") + } +} + +// TestManifestLoader_InvalidSchemaVersion verifies that a manifest with an +// unsupported schema_version causes URL/cache to be skipped and falls back to +// the embedded manifest (default URL chain). +func TestManifestLoader_InvalidSchemaVersion(t *testing.T) { + cfg, cachePath := testCfg(t) + badManifest := []byte(`{"schema_version":99,"images":{}}`) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _, _ = w.Write(badManifest) + })) + defer srv.Close() + cfg.DockerSwarm.ManifestURL = srv.URL + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + loader, err := NewManifestLoader(ctx, cfg, testutils.LoggerFactory(t), + withCachePath(cachePath), + withHTTPClient(srv.Client()), + withEmbeddedFallback(), + ) + if err != nil { + t.Fatalf("NewManifestLoader: %v", err) + } + + // Falls back to embedded — should still work. + if loader.Versions().Default() == nil { + t.Fatal("expected fallback to embedded on invalid schema_version") + } +} + +// TestManifestLoader_MalformedJSON verifies that malformed JSON causes fallback. +func TestManifestLoader_MalformedJSON(t *testing.T) { + cfg, cachePath := testCfg(t) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _, _ = w.Write([]byte(`{not valid json`)) + })) + defer srv.Close() + cfg.DockerSwarm.ManifestURL = srv.URL + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + loader, err := NewManifestLoader(ctx, cfg, testutils.LoggerFactory(t), + withCachePath(cachePath), + withHTTPClient(srv.Client()), + withEmbeddedFallback(), + ) + if err != nil { + t.Fatalf("NewManifestLoader: %v", err) + } + + if loader.Versions().Default() == nil { + t.Fatal("expected fallback to embedded on malformed JSON") + } +} + +// TestManifestLoader_NoRefreshWhenManifestPathSet verifies that when +// manifest_path is set the background refresh goroutine is NOT started. +// +// Strategy: inject a pre-fired ticker via withTickerC. If the goroutine were +// started, it would immediately consume the tick, fetch the URL (which returns +// PG 16.14), and update the default version. We then assert the version is +// still 17.10 (from the local file), proving no goroutine ran. +func TestManifestLoader_NoRefreshWhenManifestPathSet(t *testing.T) { + manifest := validManifest(t) + mfFile := filepath.Join(t.TempDir(), "local-manifest.json") + if err := os.WriteFile(mfFile, manifest, 0o644); err != nil { + t.Fatal(err) + } + + // Serve a different default version at the URL so any refresh would be detectable. + updated, _ := json.Marshal(map[string]any{ + "schema_version": 1, + "images": map[string]any{ + "postgres": []map[string]any{ + { + "postgres_version": "16.14", + "spock_version": "5", + "image": "pgedge-postgres:16.14-spock5.0.8-standard-1", + "stability": "stable", + "default": true, + }, + }, + }, + }) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _, _ = w.Write(updated) + })) + defer srv.Close() + + _, cachePath := testCfg(t) + cfg := config.Config{ + DockerSwarm: config.DockerSwarm{ + ImageRepositoryHost: "ghcr.io/pgedge", + ManifestPath: mfFile, + ManifestURL: srv.URL, + }, + } + + // Pre-fired ticker: if the refresh goroutine were started it would consume + // this tick immediately and refresh from the URL. + immediateTick := make(chan time.Time, 1) + immediateTick <- time.Now() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + loader, err := NewManifestLoader(ctx, cfg, testutils.LoggerFactory(t), + withCachePath(cachePath), + withHTTPClient(srv.Client()), + withTickerC(immediateTick), + ) + if err != nil { + t.Fatalf("NewManifestLoader: %v", err) + } + + // Allow enough time for the goroutine (if it existed) to process the tick + // and complete an HTTP round-trip to localhost. + time.Sleep(50 * time.Millisecond) + + if got := loader.Versions().Default().PostgresVersion.String(); got != "17.10" { + t.Errorf("default version = %q; refresh goroutine must not run when manifest_path is set", got) + } +} + +// TestManifestLoader_RefreshSuccess verifies that refresh() updates in-memory +// versions when a new valid manifest is served. +func TestManifestLoader_RefreshSuccess(t *testing.T) { + cfg, cachePath := testCfg(t) + + // First serve a manifest with PG 17.10. + current := validManifest(t) + + // Prepare an updated manifest with PG 16.14 as default. + updated, err := json.Marshal(map[string]any{ + "schema_version": 1, + "images": map[string]any{ + "postgres": []map[string]any{ + { + "postgres_version": "16.14", + "spock_version": "5", + "image": "pgedge-postgres:16.14-spock5.0.8-standard-1", + "stability": "stable", + "default": true, + }, + }, + }, + }) + if err != nil { + t.Fatal(err) + } + + serve := current + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _, _ = w.Write(serve) + })) + defer srv.Close() + cfg.DockerSwarm.ManifestURL = srv.URL + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + loader, err := NewManifestLoader(ctx, cfg, testutils.LoggerFactory(t), + withCachePath(cachePath), + withHTTPClient(srv.Client()), + ) + if err != nil { + t.Fatalf("NewManifestLoader: %v", err) + } + + if loader.Versions().Default().PostgresVersion.String() != "17.10" { + t.Fatalf("expected initial default 17.10, got %s", loader.Versions().Default().PostgresVersion) + } + + // Swap what the server returns, then trigger a refresh. + serve = updated + loader.refresh(context.Background()) + + if loader.Versions().Default().PostgresVersion.String() != "16.14" { + t.Errorf("expected refreshed default 16.14, got %s", loader.Versions().Default().PostgresVersion) + } +} + +// TestManifestLoader_RefreshFailure verifies that a failing refresh leaves +// in-memory versions unchanged. +func TestManifestLoader_RefreshFailure(t *testing.T) { + cfg, cachePath := testCfg(t) + manifest := validManifest(t) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _, _ = w.Write(manifest) + })) + defer srv.Close() + cfg.DockerSwarm.ManifestURL = srv.URL + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + loader, err := NewManifestLoader(ctx, cfg, testutils.LoggerFactory(t), + withCachePath(cachePath), + withHTTPClient(srv.Client()), + ) + if err != nil { + t.Fatalf("NewManifestLoader: %v", err) + } + + origDefault := loader.Versions().Default().PostgresVersion.String() + + // Now make the server return garbage. + manifest = []byte(`{invalid}`) + loader.refresh(context.Background()) + + if loader.Versions().Default().PostgresVersion.String() != origDefault { + t.Error("versions changed after a failed refresh") + } +} + +// TestBuildVersions_MatchesNewVersions verifies that buildVersions produces +// the same set of supported versions as the hardcoded NewVersions function +// when given the embedded manifest. +func TestBuildVersions_MatchesNewVersions(t *testing.T) { + cfg := config.Config{ + DockerSwarm: config.DockerSwarm{ + ImageRepositoryHost: "ghcr.io/pgedge", + }, + } + + var mf versionManifest + if err := json.Unmarshal(embeddedManifest, &mf); err != nil { + t.Fatalf("unmarshal embedded manifest: %v", err) + } + + got, err := buildVersions(cfg, &mf) + if err != nil { + t.Fatalf("buildVersions: %v", err) + } + + want := NewVersions(cfg) + + if len(got.Supported()) != len(want.Supported()) { + t.Errorf("Supported() len = %d, want %d", len(got.Supported()), len(want.Supported())) + } + + for _, wv := range want.Supported() { + imgs, err := got.GetImages(wv) + if err != nil { + t.Errorf("GetImages(%s) not found in manifest-built Versions: %v", wv, err) + continue + } + wantImgs, _ := want.GetImages(wv) + if imgs.PgEdgeImage != wantImgs.PgEdgeImage { + t.Errorf("GetImages(%s).PgEdgeImage = %q, want %q", wv, imgs.PgEdgeImage, wantImgs.PgEdgeImage) + } + } + + if got.Default().PostgresVersion.String() != want.Default().PostgresVersion.String() { + t.Errorf("Default() = %s, want %s", got.Default().PostgresVersion, want.Default().PostgresVersion) + } +} + +// TestBuildServiceVersions_MatchesNewServiceVersions verifies that +// buildServiceVersions produces the same registrations as NewServiceVersions +// for the embedded manifest. +func TestBuildServiceVersions_MatchesNewServiceVersions(t *testing.T) { + cfg := config.Config{ + DockerSwarm: config.DockerSwarm{ + ImageRepositoryHost: "ghcr.io/pgedge", + }, + } + + var mf versionManifest + if err := json.Unmarshal(embeddedManifest, &mf); err != nil { + t.Fatalf("unmarshal embedded manifest: %v", err) + } + + got, err := buildServiceVersions(cfg, &mf) + if err != nil { + t.Fatalf("buildServiceVersions: %v", err) + } + + want := NewServiceVersions(cfg) + + serviceTypes := []string{"mcp", "postgrest", "rag"} + for _, svc := range serviceTypes { + gotVers, err := got.SupportedServiceVersions(svc) + if err != nil { + t.Errorf("SupportedServiceVersions(%q) error: %v", svc, err) + continue + } + wantVers, _ := want.SupportedServiceVersions(svc) + if len(gotVers) != len(wantVers) { + t.Errorf("SupportedServiceVersions(%q) len = %d, want %d", svc, len(gotVers), len(wantVers)) + } + + for _, ver := range wantVers { + gotImg, err := got.GetServiceImage(svc, ver) + if err != nil { + t.Errorf("GetServiceImage(%q, %q) not found: %v", svc, ver, err) + continue + } + wantImg, _ := want.GetServiceImage(svc, ver) + if gotImg.Tag != wantImg.Tag { + t.Errorf("GetServiceImage(%q, %q).Tag = %q, want %q", svc, ver, gotImg.Tag, wantImg.Tag) + } + } + } +} + +// TestManifestLoader_RealURL exercises the loader against a real HTTP server. +// Run with a local file server already serving version-manifest.json: +// +// cd server/internal/orchestrator/swarm && python3 -m http.server 9090 +// MANIFEST_TEST_URL=http://localhost:9090/version-manifest.json \ +// go test -v -run TestManifestLoader_RealURL ./server/internal/orchestrator/swarm/... +func TestManifestLoader_RealURL(t *testing.T) { + url := os.Getenv("MANIFEST_TEST_URL") + if url == "" { + t.Skip("MANIFEST_TEST_URL not set; skipping real-URL integration test") + } + + _, cachePath := testCfg(t) + cfg := config.Config{ + DockerSwarm: config.DockerSwarm{ + ImageRepositoryHost: "ghcr.io/pgedge", + ManifestURL: url, + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + loader, err := NewManifestLoader(ctx, cfg, testutils.LoggerFactory(t), + withCachePath(cachePath), + ) + if err != nil { + t.Fatalf("NewManifestLoader: %v", err) + } + + v := loader.Versions() + if v.Default() == nil { + t.Fatal("expected non-nil default version from real URL") + } + t.Logf("loaded %d supported versions; default=%s", len(v.Supported()), v.Default().PostgresVersion) + + // Cache file must have been written. + if _, err := os.Stat(cachePath); err != nil { + t.Errorf("expected cache file written at %s: %v", cachePath, err) + } + + // Force a refresh and confirm versions are still intact. + loader.refresh(context.Background()) + if loader.Versions().Default().PostgresVersion.String() != v.Default().PostgresVersion.String() { + t.Error("default version changed unexpectedly after refresh") + } + t.Logf("refresh OK; default still %s", loader.Versions().Default().PostgresVersion) +} + +// TestEmbeddedManifestValid fully parses the version-manifest.json embedded in +// the binary. A failure here means NewManifestLoader would panic at startup — +// catching it in CI is much better than catching it in production. +func TestEmbeddedManifestValid(t *testing.T) { + m := &ManifestLoader{logger: testutils.Logger(t)} + v, sv, err := m.parseManifestData(embeddedManifest) + if err != nil { + t.Fatalf("embedded manifest cannot be parsed: %v", err) + } + if v.Default() == nil { + t.Fatal("embedded manifest has no default version") + } + if len(v.Supported()) == 0 { + t.Fatal("embedded manifest has no supported versions") + } + if _, err := sv.SupportedServiceVersions("mcp"); err != nil { + t.Errorf("embedded manifest missing mcp service versions: %v", err) + } +} + +// TestValidateManifest covers schema_version and JSON validation. +func TestValidateManifest(t *testing.T) { + m := &ManifestLoader{logger: testutils.Logger(t)} + + if err := m.validateManifest(embeddedManifest); err != nil { + t.Errorf("embedded manifest should be valid: %v", err) + } + + if err := m.validateManifest([]byte(`{"schema_version":2,"images":{}}`)); err == nil { + t.Error("expected error for schema_version 2") + } + + if err := m.validateManifest([]byte(`not json`)); err == nil { + t.Error("expected error for non-JSON input") + } +} + +// TestManifestLoader_ImageTagsHaveRegistryPrefix verifies that all image tags +// returned by Versions and ServiceVersions include the configured registry +// host. +func TestManifestLoader_ImageTagsHaveRegistryPrefix(t *testing.T) { + cfg, cachePath := testCfg(t) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer srv.Close() + cfg.DockerSwarm.ManifestURL = srv.URL + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + loader, err := NewManifestLoader(ctx, cfg, testutils.LoggerFactory(t), + withCachePath(cachePath), + withHTTPClient(srv.Client()), + withEmbeddedFallback(), + ) + if err != nil { + t.Fatalf("NewManifestLoader: %v", err) + } + + for _, pv := range loader.Versions().Supported() { + imgs, err := loader.Versions().GetImages(pv) + if err != nil { + t.Errorf("GetImages(%s): %v", pv, err) + continue + } + if !strings.HasPrefix(imgs.PgEdgeImage, "ghcr.io/pgedge/") { + t.Errorf("PgEdgeImage %q missing registry prefix", imgs.PgEdgeImage) + } + } + + for _, svc := range []string{"mcp", "postgrest", "rag"} { + vers, _ := loader.ServiceVersions().SupportedServiceVersions(svc) + for _, ver := range vers { + img, err := loader.ServiceVersions().GetServiceImage(svc, ver) + if err != nil { + t.Errorf("GetServiceImage(%q, %q): %v", svc, ver, err) + continue + } + if !strings.HasPrefix(img.Tag, "ghcr.io/pgedge/") { + t.Errorf("service %q version %q Tag %q missing registry prefix", svc, ver, img.Tag) + } + } + } +} diff --git a/server/internal/orchestrator/swarm/orchestrator.go b/server/internal/orchestrator/swarm/orchestrator.go index 5f3c4eea..ed032411 100644 --- a/server/internal/orchestrator/swarm/orchestrator.go +++ b/server/internal/orchestrator/swarm/orchestrator.go @@ -181,10 +181,59 @@ func ServiceInstanceName(databaseID, serviceID, hostID string) string { return fmt.Sprintf("%s-%s-%s", databaseID, serviceID, base36[:8]) } +// resolveInstanceImages returns an Images for the instance using the precedence: +// 1. SwarmOpts.Image (user override) — manifest lookup skipped entirely +// 2. SwarmOpts.ResolvedImage (CP-managed, already stored) +// 3. Manifest lookup — result written to SwarmOpts.ResolvedImage (lazy backfill) +func (o *Orchestrator) resolveInstanceImages(spec *database.InstanceSpec) (*Images, error) { + var swarmOpts *database.SwarmOpts + if spec.OrchestratorOpts != nil { + swarmOpts = spec.OrchestratorOpts.Swarm + } + + switch { + case swarmOpts != nil && swarmOpts.Image != "": + return &Images{PgEdgeImage: swarmOpts.Image}, nil + case swarmOpts != nil && swarmOpts.ResolvedImage != "": + return &Images{PgEdgeImage: swarmOpts.ResolvedImage}, nil + default: + manifested, err := o.versions.GetImages(spec.PgEdgeVersion) + if err != nil { + return nil, fmt.Errorf("failed to get images: %w", err) + } + if spec.OrchestratorOpts == nil { + spec.OrchestratorOpts = &database.OrchestratorOpts{} + } + if spec.OrchestratorOpts.Swarm == nil { + spec.OrchestratorOpts.Swarm = &database.SwarmOpts{} + } + spec.OrchestratorOpts.Swarm.ResolvedImage = manifested.PgEdgeImage + return &Images{PgEdgeImage: manifested.PgEdgeImage}, nil + } +} + +// ReconcileInstanceSpec resolves the container image for the new spec and +// clears a stale ResolvedImage if PgEdgeVersion changed since the last +// reconciliation. +func (o *Orchestrator) ReconcileInstanceSpec(old, new *database.InstanceSpec) error { + // If the Postgres version changed, the previously resolved image no longer + // matches — clear it so resolveInstanceImages fetches the correct one from + // the manifest. + if old != nil && old.PgEdgeVersion != nil && new.PgEdgeVersion != nil { + if !old.PgEdgeVersion.Equals(new.PgEdgeVersion) { + if new.OrchestratorOpts != nil && new.OrchestratorOpts.Swarm != nil { + new.OrchestratorOpts.Swarm.ResolvedImage = "" + } + } + } + _, err := o.resolveInstanceImages(new) + return err +} + func (o *Orchestrator) instanceResources(spec *database.InstanceSpec, scripts database.Scripts) (*database.InstanceResource, []resource.Resource, []resource.Resource, error) { - images, err := o.versions.GetImages(spec.PgEdgeVersion) + images, err := o.resolveInstanceImages(spec) if err != nil { - return nil, nil, nil, fmt.Errorf("failed to get images: %w", err) + return nil, nil, nil, err } instanceHostname := fmt.Sprintf("postgres-%s", spec.InstanceID) diff --git a/server/internal/orchestrator/swarm/resolve_instance_images_test.go b/server/internal/orchestrator/swarm/resolve_instance_images_test.go new file mode 100644 index 00000000..1eabe184 --- /dev/null +++ b/server/internal/orchestrator/swarm/resolve_instance_images_test.go @@ -0,0 +1,154 @@ +package swarm + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/pgEdge/control-plane/server/internal/config" + "github.com/pgEdge/control-plane/server/internal/database" + "github.com/pgEdge/control-plane/server/internal/ds" +) + +func TestResolveInstanceImages(t *testing.T) { + o := &Orchestrator{ + versions: NewVersions(config.Config{ + DockerSwarm: config.DockerSwarm{ + ImageRepositoryHost: "registry.example.com/pgedge", + }, + }), + } + + knownVersion := ds.MustParsePgEdgeVersion("17.9", "5") + unknownVersion := ds.MustParsePgEdgeVersion("99.99", "5") + + t.Run("Image override used directly, manifest not consulted", func(t *testing.T) { + spec := &database.InstanceSpec{ + PgEdgeVersion: knownVersion, + OrchestratorOpts: &database.OrchestratorOpts{ + Swarm: &database.SwarmOpts{ + Image: "my-registry/pgedge:dev-build", + }, + }, + } + + images, err := o.resolveInstanceImages(spec) + require.NoError(t, err) + assert.Equal(t, "my-registry/pgedge:dev-build", images.PgEdgeImage) + // ResolvedImage must not be written when Image is set + assert.Empty(t, spec.OrchestratorOpts.Swarm.ResolvedImage) + }) + + t.Run("Image override works for unknown version (bypasses manifest)", func(t *testing.T) { + spec := &database.InstanceSpec{ + PgEdgeVersion: unknownVersion, + OrchestratorOpts: &database.OrchestratorOpts{ + Swarm: &database.SwarmOpts{ + Image: "my-registry/pgedge:dev-build", + }, + }, + } + + images, err := o.resolveInstanceImages(spec) + require.NoError(t, err) + assert.Equal(t, "my-registry/pgedge:dev-build", images.PgEdgeImage) + }) + + t.Run("Image takes precedence over ResolvedImage", func(t *testing.T) { + spec := &database.InstanceSpec{ + PgEdgeVersion: knownVersion, + OrchestratorOpts: &database.OrchestratorOpts{ + Swarm: &database.SwarmOpts{ + Image: "custom-override:latest", + ResolvedImage: "previously-resolved:tag", + }, + }, + } + + images, err := o.resolveInstanceImages(spec) + require.NoError(t, err) + assert.Equal(t, "custom-override:latest", images.PgEdgeImage) + // ResolvedImage must not be touched when Image wins + assert.Equal(t, "previously-resolved:tag", spec.OrchestratorOpts.Swarm.ResolvedImage) + }) + + t.Run("ResolvedImage used when Image is empty", func(t *testing.T) { + spec := &database.InstanceSpec{ + PgEdgeVersion: knownVersion, + OrchestratorOpts: &database.OrchestratorOpts{ + Swarm: &database.SwarmOpts{ + ResolvedImage: "registry.example.com/pgedge:previously-pinned", + }, + }, + } + + images, err := o.resolveInstanceImages(spec) + require.NoError(t, err) + assert.Equal(t, "registry.example.com/pgedge:previously-pinned", images.PgEdgeImage) + // ResolvedImage must not be modified + assert.Equal(t, "registry.example.com/pgedge:previously-pinned", spec.OrchestratorOpts.Swarm.ResolvedImage) + }) + + t.Run("lazy backfill: resolves from manifest and writes ResolvedImage", func(t *testing.T) { + spec := &database.InstanceSpec{ + PgEdgeVersion: knownVersion, + } + + images, err := o.resolveInstanceImages(spec) + require.NoError(t, err) + assert.NotEmpty(t, images.PgEdgeImage) + require.NotNil(t, spec.OrchestratorOpts) + require.NotNil(t, spec.OrchestratorOpts.Swarm) + assert.Equal(t, images.PgEdgeImage, spec.OrchestratorOpts.Swarm.ResolvedImage) + }) + + t.Run("lazy backfill: initialises nil OrchestratorOpts", func(t *testing.T) { + spec := &database.InstanceSpec{ + PgEdgeVersion: knownVersion, + OrchestratorOpts: nil, + } + + _, err := o.resolveInstanceImages(spec) + require.NoError(t, err) + require.NotNil(t, spec.OrchestratorOpts) + require.NotNil(t, spec.OrchestratorOpts.Swarm) + assert.NotEmpty(t, spec.OrchestratorOpts.Swarm.ResolvedImage) + }) + + t.Run("lazy backfill: initialises nil Swarm inside existing OrchestratorOpts", func(t *testing.T) { + spec := &database.InstanceSpec{ + PgEdgeVersion: knownVersion, + OrchestratorOpts: &database.OrchestratorOpts{Swarm: nil}, + } + + _, err := o.resolveInstanceImages(spec) + require.NoError(t, err) + require.NotNil(t, spec.OrchestratorOpts.Swarm) + assert.NotEmpty(t, spec.OrchestratorOpts.Swarm.ResolvedImage) + }) + + t.Run("lazy backfill: unknown version returns error", func(t *testing.T) { + spec := &database.InstanceSpec{ + PgEdgeVersion: unknownVersion, + } + + _, err := o.resolveInstanceImages(spec) + assert.Error(t, err) + }) + + t.Run("manifest cache not mutated by lazy backfill", func(t *testing.T) { + manifestImage, err := o.versions.GetImages(knownVersion) + require.NoError(t, err) + original := manifestImage.PgEdgeImage + + spec := &database.InstanceSpec{PgEdgeVersion: knownVersion} + _, err = o.resolveInstanceImages(spec) + require.NoError(t, err) + + // Calling GetImages again must return the same value — cache untouched + after, err := o.versions.GetImages(knownVersion) + require.NoError(t, err) + assert.Equal(t, original, after.PgEdgeImage) + }) +} diff --git a/server/internal/orchestrator/swarm/service_images.go b/server/internal/orchestrator/swarm/service_images.go index 4826dbab..82fb3adc 100644 --- a/server/internal/orchestrator/swarm/service_images.go +++ b/server/internal/orchestrator/swarm/service_images.go @@ -94,7 +94,7 @@ func (sv *ServiceVersions) addServiceImage(serviceType string, version string, i } // GetServiceImage returns the full ServiceImage for the given service type and version. -func (sv *ServiceVersions) GetServiceImage(serviceType string, version string) (*ServiceImage, error) { +func (sv ServiceVersions) GetServiceImage(serviceType string, version string) (*ServiceImage, error) { versionMap, ok := sv.images[serviceType] if !ok { return nil, fmt.Errorf("unsupported service type %q", serviceType) @@ -108,7 +108,7 @@ func (sv *ServiceVersions) GetServiceImage(serviceType string, version string) ( return image, nil } -func (sv *ServiceVersions) SupportedServiceVersions(serviceType string) ([]string, error) { +func (sv ServiceVersions) SupportedServiceVersions(serviceType string) ([]string, error) { versionMap, ok := sv.images[serviceType] if !ok { return nil, fmt.Errorf("unsupported service type %q", serviceType) diff --git a/server/internal/orchestrator/swarm/version-manifest.json b/server/internal/orchestrator/swarm/version-manifest.json new file mode 100644 index 00000000..2ac84227 --- /dev/null +++ b/server/internal/orchestrator/swarm/version-manifest.json @@ -0,0 +1,127 @@ +{ + "schema_version": 1, + "images": { + "postgres": [ + { + "postgres_version": "16.10", + "spock_version": "5", + "image": "pgedge-postgres:16.10-spock5.0.4-standard-3", + "stability": "stable" + }, + { + "postgres_version": "16.11", + "spock_version": "5", + "image": "pgedge-postgres:16.11-spock5.0.4-standard-4", + "stability": "stable" + }, + { + "postgres_version": "16.12", + "spock_version": "5", + "image": "pgedge-postgres:16.12-spock5.0.5-standard-1", + "stability": "stable" + }, + { + "postgres_version": "16.13", + "spock_version": "5", + "image": "pgedge-postgres:16.13-spock5.0.6-standard-2", + "stability": "stable" + }, + { + "postgres_version": "16.14", + "spock_version": "5", + "image": "pgedge-postgres:16.14-spock5.0.9-standard-1", + "stability": "stable" + }, + { + "postgres_version": "17.6", + "spock_version": "5", + "image": "pgedge-postgres:17.6-spock5.0.4-standard-3", + "stability": "stable" + }, + { + "postgres_version": "17.7", + "spock_version": "5", + "image": "pgedge-postgres:17.7-spock5.0.4-standard-4", + "stability": "stable" + }, + { + "postgres_version": "17.8", + "spock_version": "5", + "image": "pgedge-postgres:17.8-spock5.0.5-standard-1", + "stability": "stable" + }, + { + "postgres_version": "17.9", + "spock_version": "5", + "image": "pgedge-postgres:17.9-spock5.0.6-standard-2", + "stability": "stable" + }, + { + "postgres_version": "17.10", + "spock_version": "5", + "image": "pgedge-postgres:17.10-spock5.0.9-standard-1", + "stability": "stable" + }, + { + "postgres_version": "18.0", + "spock_version": "5", + "image": "pgedge-postgres:18.0-spock5.0.4-standard-3", + "stability": "stable" + }, + { + "postgres_version": "18.1", + "spock_version": "5", + "image": "pgedge-postgres:18.1-spock5.0.4-standard-4", + "stability": "stable" + }, + { + "postgres_version": "18.2", + "spock_version": "5", + "image": "pgedge-postgres:18.2-spock5.0.5-standard-1", + "stability": "stable" + }, + { + "postgres_version": "18.3", + "spock_version": "5", + "image": "pgedge-postgres:18.3-spock5.0.6-standard-2", + "stability": "stable" + }, + { + "postgres_version": "18.4", + "spock_version": "5", + "image": "pgedge-postgres:18.4-spock5.0.9-standard-1", + "stability": "stable", + "default": true + } + ], + "postgrest": [ + { + "version": "latest", + "image": "postgrest:latest", + "stability": "stable" + }, + { + "version": "14.5", + "image": "postgrest:14.5", + "stability": "stable", + "default": true + } + ], + "mcp": [ + { + "version": "latest", + "image": "postgres-mcp:latest", + "stability": "stable", + "default": true + } + ], + "rag": [ + { + "version": "latest", + "image": "rag-server:latest", + "stability": "stable", + "default": true + } + ] + } +} diff --git a/server/internal/orchestrator/systemd/orchestrator.go b/server/internal/orchestrator/systemd/orchestrator.go index 48abb8e4..3141af6e 100644 --- a/server/internal/orchestrator/systemd/orchestrator.go +++ b/server/internal/orchestrator/systemd/orchestrator.go @@ -147,6 +147,10 @@ func (o *Orchestrator) PopulateHostStatus(ctx context.Context, h *host.HostStatu return nil } +func (o *Orchestrator) ReconcileInstanceSpec(_, _ *database.InstanceSpec) error { + return nil +} + func (o *Orchestrator) GenerateInstanceResources(spec *database.InstanceSpec, scripts database.Scripts) (*database.InstanceResources, error) { paths, err := o.InstancePaths(spec.PgEdgeVersion.PostgresVersion, spec.InstanceID) if err != nil {