From 7d94f65753d88a926da37e50f9e51553767543ef Mon Sep 17 00:00:00 2001 From: Siva Date: Mon, 25 May 2026 20:50:07 +0530 Subject: [PATCH 1/9] feat: add Image and ResolvedImage fields to SwarmOpts --- server/internal/database/spec.go | 9 +- server/internal/database/spec_test.go | 32 ++++ .../orchestrator/swarm/orchestrator.go | 35 +++- .../swarm/resolve_instance_images_test.go | 154 ++++++++++++++++++ 4 files changed, 227 insertions(+), 3 deletions(-) create mode 100644 server/internal/orchestrator/swarm/resolve_instance_images_test.go diff --git a/server/internal/database/spec.go b/server/internal/database/spec.go index e573263b..99b5b930 100644 --- a/server/internal/database/spec.go +++ b/server/internal/database/spec.go @@ -30,7 +30,12 @@ type ExtraNetworkSpec struct { type SwarmOpts struct { ExtraVolumes []ExtraVolumesSpec `json:"extra_volumes,omitempty"` ExtraNetworks []ExtraNetworkSpec `json:"extra_networks,omitempty"` - ExtraLabels map[string]string `json:"extra_labels,omitempty"` // optional, used for custom labels on the swarm service + ExtraLabels map[string]string `json:"extra_labels,omitempty"` + // Image is a user-specified override. Never written by the CP. + Image string `json:"image,omitempty"` + // ResolvedImage is the CP-managed image tag. Written at instance creation, + // upgrade application, and lazy backfill. Never set simultaneously with Image. + ResolvedImage string `json:"resolved_image,omitempty"` } type OrchestratorOpts struct { Swarm *SwarmOpts `json:"docker,omitempty"` @@ -301,6 +306,8 @@ func (d *SwarmOpts) Clone() *SwarmOpts { ExtraVolumes: clonedVolumes, ExtraNetworks: clonedNetworks, ExtraLabels: maps.Clone(d.ExtraLabels), + Image: d.Image, + ResolvedImage: d.ResolvedImage, } } diff --git a/server/internal/database/spec_test.go b/server/internal/database/spec_test.go index 9acdbf23..a8451972 100644 --- a/server/internal/database/spec_test.go +++ b/server/internal/database/spec_test.go @@ -510,6 +510,38 @@ func TestSpec(t *testing.T) { }) } +func TestSwarmOptsClone(t *testing.T) { + t.Run("copies Image and ResolvedImage", func(t *testing.T) { + orig := &database.SwarmOpts{ + Image: "custom-registry/pgedge:dev", + ResolvedImage: "registry/pgedge:17.9-spock5.0.6-standard-1", + ExtraLabels: map[string]string{"k": "v"}, + } + cloned := orig.Clone() + + assert.Equal(t, orig.Image, cloned.Image) + assert.Equal(t, orig.ResolvedImage, cloned.ResolvedImage) + }) + + t.Run("clone is independent of original", func(t *testing.T) { + orig := &database.SwarmOpts{ + Image: "original-image", + ResolvedImage: "original-resolved", + } + cloned := orig.Clone() + cloned.Image = "mutated-image" + cloned.ResolvedImage = "mutated-resolved" + + assert.Equal(t, "original-image", orig.Image) + assert.Equal(t, "original-resolved", orig.ResolvedImage) + }) + + t.Run("nil clone returns nil", func(t *testing.T) { + var s *database.SwarmOpts + assert.Nil(t, s.Clone()) + }) +} + func TestSpec_NodeInstances_DBOwner(t *testing.T) { minimalSpec := func(users []*database.User) *database.Spec { return &database.Spec{ diff --git a/server/internal/orchestrator/swarm/orchestrator.go b/server/internal/orchestrator/swarm/orchestrator.go index 5f3c4eea..742c520b 100644 --- a/server/internal/orchestrator/swarm/orchestrator.go +++ b/server/internal/orchestrator/swarm/orchestrator.go @@ -181,10 +181,41 @@ func ServiceInstanceName(databaseID, serviceID, hostID string) string { return fmt.Sprintf("%s-%s-%s", databaseID, serviceID, base36[:8]) } +// resolveInstanceImages returns an Images for the instance using the precedence: +// 1. SwarmOpts.Image (user override) — manifest lookup skipped entirely +// 2. SwarmOpts.ResolvedImage (CP-managed, already stored) +// 3. Manifest lookup — result written to SwarmOpts.ResolvedImage (lazy backfill) +func (o *Orchestrator) resolveInstanceImages(spec *database.InstanceSpec) (*Images, error) { + var swarmOpts *database.SwarmOpts + if spec.OrchestratorOpts != nil { + swarmOpts = spec.OrchestratorOpts.Swarm + } + + switch { + case swarmOpts != nil && swarmOpts.Image != "": + return &Images{PgEdgeImage: swarmOpts.Image}, nil + case swarmOpts != nil && swarmOpts.ResolvedImage != "": + return &Images{PgEdgeImage: swarmOpts.ResolvedImage}, nil + default: + manifested, err := o.versions.GetImages(spec.PgEdgeVersion) + if err != nil { + return nil, fmt.Errorf("failed to get images: %w", err) + } + if spec.OrchestratorOpts == nil { + spec.OrchestratorOpts = &database.OrchestratorOpts{} + } + if spec.OrchestratorOpts.Swarm == nil { + spec.OrchestratorOpts.Swarm = &database.SwarmOpts{} + } + spec.OrchestratorOpts.Swarm.ResolvedImage = manifested.PgEdgeImage + return &Images{PgEdgeImage: manifested.PgEdgeImage}, nil + } +} + func (o *Orchestrator) instanceResources(spec *database.InstanceSpec, scripts database.Scripts) (*database.InstanceResource, []resource.Resource, []resource.Resource, error) { - images, err := o.versions.GetImages(spec.PgEdgeVersion) + images, err := o.resolveInstanceImages(spec) if err != nil { - return nil, nil, nil, fmt.Errorf("failed to get images: %w", err) + return nil, nil, nil, err } instanceHostname := fmt.Sprintf("postgres-%s", spec.InstanceID) diff --git a/server/internal/orchestrator/swarm/resolve_instance_images_test.go b/server/internal/orchestrator/swarm/resolve_instance_images_test.go new file mode 100644 index 00000000..1eabe184 --- /dev/null +++ b/server/internal/orchestrator/swarm/resolve_instance_images_test.go @@ -0,0 +1,154 @@ +package swarm + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/pgEdge/control-plane/server/internal/config" + "github.com/pgEdge/control-plane/server/internal/database" + "github.com/pgEdge/control-plane/server/internal/ds" +) + +func TestResolveInstanceImages(t *testing.T) { + o := &Orchestrator{ + versions: NewVersions(config.Config{ + DockerSwarm: config.DockerSwarm{ + ImageRepositoryHost: "registry.example.com/pgedge", + }, + }), + } + + knownVersion := ds.MustParsePgEdgeVersion("17.9", "5") + unknownVersion := ds.MustParsePgEdgeVersion("99.99", "5") + + t.Run("Image override used directly, manifest not consulted", func(t *testing.T) { + spec := &database.InstanceSpec{ + PgEdgeVersion: knownVersion, + OrchestratorOpts: &database.OrchestratorOpts{ + Swarm: &database.SwarmOpts{ + Image: "my-registry/pgedge:dev-build", + }, + }, + } + + images, err := o.resolveInstanceImages(spec) + require.NoError(t, err) + assert.Equal(t, "my-registry/pgedge:dev-build", images.PgEdgeImage) + // ResolvedImage must not be written when Image is set + assert.Empty(t, spec.OrchestratorOpts.Swarm.ResolvedImage) + }) + + t.Run("Image override works for unknown version (bypasses manifest)", func(t *testing.T) { + spec := &database.InstanceSpec{ + PgEdgeVersion: unknownVersion, + OrchestratorOpts: &database.OrchestratorOpts{ + Swarm: &database.SwarmOpts{ + Image: "my-registry/pgedge:dev-build", + }, + }, + } + + images, err := o.resolveInstanceImages(spec) + require.NoError(t, err) + assert.Equal(t, "my-registry/pgedge:dev-build", images.PgEdgeImage) + }) + + t.Run("Image takes precedence over ResolvedImage", func(t *testing.T) { + spec := &database.InstanceSpec{ + PgEdgeVersion: knownVersion, + OrchestratorOpts: &database.OrchestratorOpts{ + Swarm: &database.SwarmOpts{ + Image: "custom-override:latest", + ResolvedImage: "previously-resolved:tag", + }, + }, + } + + images, err := o.resolveInstanceImages(spec) + require.NoError(t, err) + assert.Equal(t, "custom-override:latest", images.PgEdgeImage) + // ResolvedImage must not be touched when Image wins + assert.Equal(t, "previously-resolved:tag", spec.OrchestratorOpts.Swarm.ResolvedImage) + }) + + t.Run("ResolvedImage used when Image is empty", func(t *testing.T) { + spec := &database.InstanceSpec{ + PgEdgeVersion: knownVersion, + OrchestratorOpts: &database.OrchestratorOpts{ + Swarm: &database.SwarmOpts{ + ResolvedImage: "registry.example.com/pgedge:previously-pinned", + }, + }, + } + + images, err := o.resolveInstanceImages(spec) + require.NoError(t, err) + assert.Equal(t, "registry.example.com/pgedge:previously-pinned", images.PgEdgeImage) + // ResolvedImage must not be modified + assert.Equal(t, "registry.example.com/pgedge:previously-pinned", spec.OrchestratorOpts.Swarm.ResolvedImage) + }) + + t.Run("lazy backfill: resolves from manifest and writes ResolvedImage", func(t *testing.T) { + spec := &database.InstanceSpec{ + PgEdgeVersion: knownVersion, + } + + images, err := o.resolveInstanceImages(spec) + require.NoError(t, err) + assert.NotEmpty(t, images.PgEdgeImage) + require.NotNil(t, spec.OrchestratorOpts) + require.NotNil(t, spec.OrchestratorOpts.Swarm) + assert.Equal(t, images.PgEdgeImage, spec.OrchestratorOpts.Swarm.ResolvedImage) + }) + + t.Run("lazy backfill: initialises nil OrchestratorOpts", func(t *testing.T) { + spec := &database.InstanceSpec{ + PgEdgeVersion: knownVersion, + OrchestratorOpts: nil, + } + + _, err := o.resolveInstanceImages(spec) + require.NoError(t, err) + require.NotNil(t, spec.OrchestratorOpts) + require.NotNil(t, spec.OrchestratorOpts.Swarm) + assert.NotEmpty(t, spec.OrchestratorOpts.Swarm.ResolvedImage) + }) + + t.Run("lazy backfill: initialises nil Swarm inside existing OrchestratorOpts", func(t *testing.T) { + spec := &database.InstanceSpec{ + PgEdgeVersion: knownVersion, + OrchestratorOpts: &database.OrchestratorOpts{Swarm: nil}, + } + + _, err := o.resolveInstanceImages(spec) + require.NoError(t, err) + require.NotNil(t, spec.OrchestratorOpts.Swarm) + assert.NotEmpty(t, spec.OrchestratorOpts.Swarm.ResolvedImage) + }) + + t.Run("lazy backfill: unknown version returns error", func(t *testing.T) { + spec := &database.InstanceSpec{ + PgEdgeVersion: unknownVersion, + } + + _, err := o.resolveInstanceImages(spec) + assert.Error(t, err) + }) + + t.Run("manifest cache not mutated by lazy backfill", func(t *testing.T) { + manifestImage, err := o.versions.GetImages(knownVersion) + require.NoError(t, err) + original := manifestImage.PgEdgeImage + + spec := &database.InstanceSpec{PgEdgeVersion: knownVersion} + _, err = o.resolveInstanceImages(spec) + require.NoError(t, err) + + // Calling GetImages again must return the same value — cache untouched + after, err := o.versions.GetImages(knownVersion) + require.NoError(t, err) + assert.Equal(t, original, after.PgEdgeImage) + }) +} From 6b413d6fc79625729082c6af5430c137246ca971 Mon Sep 17 00:00:00 2001 From: Siva Date: Thu, 28 May 2026 16:44:42 +0530 Subject: [PATCH 2/9] feat: add version manifest with all current image entries (#393) --- .../orchestrator/swarm/version-manifest.json | 127 ++++++++++++++++++ 1 file changed, 127 insertions(+) create mode 100644 server/internal/orchestrator/swarm/version-manifest.json diff --git a/server/internal/orchestrator/swarm/version-manifest.json b/server/internal/orchestrator/swarm/version-manifest.json new file mode 100644 index 00000000..e0ab9524 --- /dev/null +++ b/server/internal/orchestrator/swarm/version-manifest.json @@ -0,0 +1,127 @@ +{ + "schema_version": 1, + "images": { + "postgres": [ + { + "postgres_version": "16.10", + "spock_version": "5", + "image": "pgedge-postgres:16.10-spock5.0.4-standard-3", + "stability": "stable" + }, + { + "postgres_version": "16.11", + "spock_version": "5", + "image": "pgedge-postgres:16.11-spock5.0.4-standard-4", + "stability": "stable" + }, + { + "postgres_version": "16.12", + "spock_version": "5", + "image": "pgedge-postgres:16.12-spock5.0.5-standard-1", + "stability": "stable" + }, + { + "postgres_version": "16.13", + "spock_version": "5", + "image": "pgedge-postgres:16.13-spock5.0.6-standard-2", + "stability": "stable" + }, + { + "postgres_version": "16.14", + "spock_version": "5", + "image": "pgedge-postgres:16.14-spock5.0.8-standard-1", + "stability": "stable" + }, + { + "postgres_version": "17.6", + "spock_version": "5", + "image": "pgedge-postgres:17.6-spock5.0.4-standard-3", + "stability": "stable" + }, + { + "postgres_version": "17.7", + "spock_version": "5", + "image": "pgedge-postgres:17.7-spock5.0.4-standard-4", + "stability": "stable" + }, + { + "postgres_version": "17.8", + "spock_version": "5", + "image": "pgedge-postgres:17.8-spock5.0.5-standard-1", + "stability": "stable" + }, + { + "postgres_version": "17.9", + "spock_version": "5", + "image": "pgedge-postgres:17.9-spock5.0.6-standard-2", + "stability": "stable" + }, + { + "postgres_version": "17.10", + "spock_version": "5", + "image": "pgedge-postgres:17.10-spock5.0.8-standard-1", + "stability": "stable" + }, + { + "postgres_version": "18.0", + "spock_version": "5", + "image": "pgedge-postgres:18.0-spock5.0.4-standard-3", + "stability": "stable" + }, + { + "postgres_version": "18.1", + "spock_version": "5", + "image": "pgedge-postgres:18.1-spock5.0.4-standard-4", + "stability": "stable" + }, + { + "postgres_version": "18.2", + "spock_version": "5", + "image": "pgedge-postgres:18.2-spock5.0.5-standard-1", + "stability": "stable" + }, + { + "postgres_version": "18.3", + "spock_version": "5", + "image": "pgedge-postgres:18.3-spock5.0.6-standard-2", + "stability": "stable" + }, + { + "postgres_version": "18.4", + "spock_version": "5", + "image": "pgedge-postgres:18.4-spock5.0.8-standard-1", + "stability": "stable", + "default": true + } + ], + "postgrest": [ + { + "version": "latest", + "image": "postgrest:latest", + "stability": "stable" + }, + { + "version": "14.5", + "image": "postgrest:14.5", + "stability": "stable", + "default": true + } + ], + "mcp": [ + { + "version": "latest", + "image": "postgres-mcp:latest", + "stability": "stable", + "default": true + } + ], + "rag": [ + { + "version": "latest", + "image": "rag-server:latest", + "stability": "stable", + "default": true + } + ] + } +} From 14a58837e6e7be8ec967819ba96e88990c385aa4 Mon Sep 17 00:00:00 2001 From: Siva Date: Fri, 29 May 2026 18:06:30 +0530 Subject: [PATCH 3/9] feat: add ManifestLoader for version manifest --- server/internal/config/config.go | 6 + .../orchestrator/swarm/manifest_loader.go | 376 ++++++++++++ .../swarm/manifest_loader_test.go | 568 ++++++++++++++++++ 3 files changed, 950 insertions(+) create mode 100644 server/internal/orchestrator/swarm/manifest_loader.go create mode 100644 server/internal/orchestrator/swarm/manifest_loader_test.go diff --git a/server/internal/config/config.go b/server/internal/config/config.go index 86ecd18d..5a05aae2 100644 --- a/server/internal/config/config.go +++ b/server/internal/config/config.go @@ -82,6 +82,12 @@ type DockerSwarm struct { BridgeNetworksSubnetBits int `koanf:"bridge_networks_subnet_bits" json:"bridge_networks_subnet_bits,omitempty"` DatabaseNetworksCIDR string `koanf:"database_networks_cidr" json:"database_networks_cidr,omitempty"` DatabaseNetworksSubnetBits int `koanf:"database_networks_subnet_bits" json:"database_networks_subnet_bits,omitempty"` + // ManifestURL is the URL from which the version manifest is fetched. + // Defaults to the pgEdge CDN URL if not set. + ManifestURL string `koanf:"manifest_url" json:"manifest_url,omitempty"` + // ManifestPath points to a local manifest file that bypasses URL fetching + // entirely. Useful for air-gapped environments or testing. + ManifestPath string `koanf:"manifest_path" json:"manifest_path,omitempty"` } func (d DockerSwarm) validate() []error { diff --git a/server/internal/orchestrator/swarm/manifest_loader.go b/server/internal/orchestrator/swarm/manifest_loader.go new file mode 100644 index 00000000..8af6d795 --- /dev/null +++ b/server/internal/orchestrator/swarm/manifest_loader.go @@ -0,0 +1,376 @@ +package swarm + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "sync" + "time" + + _ "embed" + + "github.com/rs/zerolog" + + "github.com/pgEdge/control-plane/server/internal/config" + "github.com/pgEdge/control-plane/server/internal/ds" +) + +//go:embed version-manifest.json +var embeddedManifest []byte + +const ( + // DefaultManifestURL is the URL used when no manifest_url is configured. + // TODO(PLAT-598): Replace with the real URL once the hosting location is confirmed. + DefaultManifestURL = "https://download.pgedge.com/manifests/version-manifest.json" + + manifestCacheFilename = "version-manifest-cache.json" + refreshInterval = time.Hour + fetchTimeout = 10 * time.Second + supportedSchemaVersion = 1 +) + +// versionManifest is the top-level structure of version-manifest.json. +type versionManifest struct { + SchemaVersion int `json:"schema_version"` + Images map[string]json.RawMessage `json:"images"` +} + +// manifestPostgresEntry is one entry under images.postgres in the manifest. +type manifestPostgresEntry struct { + PostgresVersion string `json:"postgres_version"` + SpockVersion string `json:"spock_version"` + Image string `json:"image"` + Stability string `json:"stability"` + Default bool `json:"default"` +} + +// manifestServiceEntry is one entry under images. in the manifest. +type manifestServiceEntry struct { + Version string `json:"version"` + Image string `json:"image"` + Stability string `json:"stability"` + Default bool `json:"default"` +} + +// manifestLoaderOption is a functional option for ManifestLoader, used in +// tests to inject test doubles. +type manifestLoaderOption func(*ManifestLoader) + +func withCachePath(path string) manifestLoaderOption { + return func(m *ManifestLoader) { m.cachePath = path } +} + +func withHTTPClient(c *http.Client) manifestLoaderOption { + return func(m *ManifestLoader) { m.httpClient = c } +} + +// ManifestLoader loads the version manifest from a remote URL (with disk +// caching and hourly refresh) or from a local file, and exposes the parsed +// *Versions and *ServiceVersions it produces. +// +// Resolution order on startup: +// 1. Local file (cfg.DockerSwarm.ManifestPath), if set — no network fetch, no refresh. +// 2. Remote URL (cfg.DockerSwarm.ManifestURL, defaulting to DefaultManifestURL). +// 3. Disk cache at defaultCachePath (or the path set by withCachePath). +// 4. Embedded binary manifest (always succeeds; panics only if the embedded +// JSON is corrupt, which would indicate a broken build). +type ManifestLoader struct { + cfg config.Config + logger zerolog.Logger + cachePath string + httpClient *http.Client + + mu sync.RWMutex + versions *Versions + svcVersions *ServiceVersions +} + +// NewManifestLoader creates and starts a ManifestLoader. It loads the +// manifest synchronously before returning so callers always get a valid +// *Versions / *ServiceVersions immediately. Background refresh (if +// applicable) is started as a goroutine tied to ctx. +func NewManifestLoader(ctx context.Context, cfg config.Config, logger zerolog.Logger, opts ...manifestLoaderOption) *ManifestLoader { + m := &ManifestLoader{ + cfg: cfg, + logger: logger.With().Str("component", "manifest_loader").Logger(), + cachePath: filepath.Join(cfg.DataDir, "manifests", manifestCacheFilename), + httpClient: &http.Client{ + Timeout: fetchTimeout, + }, + } + for _, o := range opts { + o(m) + } + + m.load() + + if cfg.DockerSwarm.ManifestPath == "" { + go m.refreshLoop(ctx) + } + + return m +} + +// Versions returns the current parsed *Versions. +func (m *ManifestLoader) Versions() *Versions { + m.mu.RLock() + defer m.mu.RUnlock() + return m.versions +} + +// ServiceVersions returns the current parsed *ServiceVersions. +func (m *ManifestLoader) ServiceVersions() *ServiceVersions { + m.mu.RLock() + defer m.mu.RUnlock() + return m.svcVersions +} + +// load performs the initial synchronous load, trying each source in order. +func (m *ManifestLoader) load() { + data, src := m.resolve() + v, sv, err := m.parseManifestData(data) + if err != nil { + // Embedded manifest is malformed — this is a build-time error. + panic(fmt.Sprintf("manifest_loader: failed to parse manifest from %s: %v", src, err)) + } + m.mu.Lock() + m.versions = v + m.svcVersions = sv + m.mu.Unlock() + m.logger.Info().Str("source", src).Msg("version manifest loaded") +} + +// resolve returns the raw manifest bytes and a human-readable source label, +// falling back through the resolution order. +func (m *ManifestLoader) resolve() ([]byte, string) { + // 1. Local file override. + if p := m.cfg.DockerSwarm.ManifestPath; p != "" { + data, err := os.ReadFile(p) + if err != nil { + m.logger.Warn().Err(err).Str("path", p).Msg("failed to read manifest_path; falling back to embedded manifest") + } else if err = m.validateManifest(data); err != nil { + m.logger.Warn().Err(err).Str("path", p).Msg("manifest_path validation failed; falling back to embedded manifest") + } else { + return data, "file:" + p + } + // manifest_path was set but unusable — skip URL/cache and use embedded. + return embeddedManifest, "embedded" + } + + // 2. Remote URL. + u := m.manifestURL() + data, err := m.fetchURL(context.Background(), u) + if err != nil { + m.logger.Warn().Err(err).Str("url", u).Msg("failed to fetch manifest from URL; trying disk cache") + } else if err = m.validateManifest(data); err != nil { + m.logger.Warn().Err(err).Str("url", u).Msg("remote manifest validation failed; trying disk cache") + } else { + _ = m.writeCache(data) + return data, "url:" + u + } + + // 3. Disk cache. + if cached, err := os.ReadFile(m.cachePath); err == nil { + if err = m.validateManifest(cached); err != nil { + m.logger.Warn().Err(err).Str("path", m.cachePath).Msg("cached manifest validation failed; falling back to embedded manifest") + } else { + m.logger.Info().Str("path", m.cachePath).Msg("using cached manifest") + return cached, "cache:" + m.cachePath + } + } + + // 4. Embedded binary manifest. + m.logger.Warn().Msg("using embedded manifest; consider checking connectivity to manifest URL") + return embeddedManifest, "embedded" +} + +// manifestURL returns the configured URL, or DefaultManifestURL if unset. +func (m *ManifestLoader) manifestURL() string { + if u := m.cfg.DockerSwarm.ManifestURL; u != "" { + return u + } + return DefaultManifestURL +} + +// fetchURL fetches bytes from the given URL using the configured http.Client. +func (m *ManifestLoader) fetchURL(ctx context.Context, url string) ([]byte, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, err + } + resp, err := m.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected HTTP status %d", resp.StatusCode) + } + return io.ReadAll(resp.Body) +} + +// writeCache persists data to the cache path, creating parent directories as +// needed. Errors are logged but not returned (caching is best-effort). +func (m *ManifestLoader) writeCache(data []byte) error { + if err := os.MkdirAll(filepath.Dir(m.cachePath), 0o755); err != nil { + m.logger.Warn().Err(err).Str("path", m.cachePath).Msg("failed to create manifest cache directory") + return err + } + if err := os.WriteFile(m.cachePath, data, 0o644); err != nil { + m.logger.Warn().Err(err).Str("path", m.cachePath).Msg("failed to write manifest cache") + return err + } + return nil +} + +// refreshLoop runs in the background and refreshes the manifest every hour. +// It stops when ctx is cancelled. +func (m *ManifestLoader) refreshLoop(ctx context.Context) { + ticker := time.NewTicker(refreshInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + m.refresh(ctx) + } + } +} + +// refresh fetches a fresh manifest from the URL, validates it, updates the +// disk cache, and swaps the in-memory versions atomically. Failures are +// logged but do not disturb the current in-memory versions. +func (m *ManifestLoader) refresh(ctx context.Context) { + u := m.manifestURL() + data, err := m.fetchURL(ctx, u) + if err != nil { + m.logger.Warn().Err(err).Str("url", u).Msg("manifest refresh: fetch failed; keeping current versions") + return + } + if err = m.validateManifest(data); err != nil { + m.logger.Warn().Err(err).Str("url", u).Msg("manifest refresh: validation failed; keeping current versions") + return + } + v, sv, err := m.parseManifestData(data) + if err != nil { + m.logger.Warn().Err(err).Str("url", u).Msg("manifest refresh: parse failed; keeping current versions") + return + } + _ = m.writeCache(data) + + m.mu.Lock() + m.versions = v + m.svcVersions = sv + m.mu.Unlock() + m.logger.Info().Str("url", u).Msg("version manifest refreshed") +} + +// validateManifest checks that data is valid JSON and has a supported +// schema_version. +func (m *ManifestLoader) validateManifest(data []byte) error { + var mf versionManifest + if err := json.Unmarshal(data, &mf); err != nil { + return fmt.Errorf("invalid JSON: %w", err) + } + if mf.SchemaVersion != supportedSchemaVersion { + return fmt.Errorf("unsupported schema_version %d (want %d)", mf.SchemaVersion, supportedSchemaVersion) + } + return nil +} + +// parseManifestData unmarshals data into *Versions and *ServiceVersions. +func (m *ManifestLoader) parseManifestData(data []byte) (*Versions, *ServiceVersions, error) { + var mf versionManifest + if err := json.Unmarshal(data, &mf); err != nil { + return nil, nil, fmt.Errorf("unmarshal manifest: %w", err) + } + + v, err := buildVersions(m.cfg, &mf) + if err != nil { + return nil, nil, fmt.Errorf("build versions: %w", err) + } + + sv, err := buildServiceVersions(m.cfg, &mf) + if err != nil { + return nil, nil, fmt.Errorf("build service versions: %w", err) + } + + return v, sv, nil +} + +// buildVersions parses the images.postgres section of the manifest and +// returns a *Versions equivalent to NewVersions(cfg) for the same data. +func buildVersions(cfg config.Config, mf *versionManifest) (*Versions, error) { + raw, ok := mf.Images["postgres"] + if !ok { + return nil, fmt.Errorf("manifest missing images.postgres") + } + + var entries []manifestPostgresEntry + if err := json.Unmarshal(raw, &entries); err != nil { + return nil, fmt.Errorf("unmarshal images.postgres: %w", err) + } + if len(entries) == 0 { + return nil, fmt.Errorf("images.postgres is empty") + } + + versions := &Versions{ + cfg: cfg, + images: make(map[string]map[string]*Images), + } + + var defaultVer *ds.PgEdgeVersion + for _, e := range entries { + pv, err := ds.ParsePgEdgeVersion(e.PostgresVersion, e.SpockVersion) + if err != nil { + return nil, fmt.Errorf("invalid version entry {postgres:%s spock:%s}: %w", + e.PostgresVersion, e.SpockVersion, err) + } + img := &Images{ + PgEdgeImage: serviceImageTag(cfg, e.Image), + } + versions.addImage(pv, img) + if e.Default { + defaultVer = pv + } + } + + if defaultVer == nil { + // Fall back to the last entry if no default is marked. + defaultVer = versions.supportedVersions[len(versions.supportedVersions)-1] + } + versions.defaultVersion = defaultVer + + return versions, nil +} + +// buildServiceVersions parses all non-postgres image sections and returns a +// *ServiceVersions equivalent to NewServiceVersions(cfg) for the same data. +func buildServiceVersions(cfg config.Config, mf *versionManifest) (*ServiceVersions, error) { + sv := &ServiceVersions{ + cfg: cfg, + images: make(map[string]map[string]*ServiceImage), + } + + for svcType, raw := range mf.Images { + if svcType == "postgres" { + continue + } + var entries []manifestServiceEntry + if err := json.Unmarshal(raw, &entries); err != nil { + return nil, fmt.Errorf("unmarshal images.%s: %w", svcType, err) + } + for _, e := range entries { + sv.addServiceImage(svcType, e.Version, &ServiceImage{ + Tag: serviceImageTag(cfg, e.Image), + }) + } + } + + return sv, nil +} diff --git a/server/internal/orchestrator/swarm/manifest_loader_test.go b/server/internal/orchestrator/swarm/manifest_loader_test.go new file mode 100644 index 00000000..26520ccc --- /dev/null +++ b/server/internal/orchestrator/swarm/manifest_loader_test.go @@ -0,0 +1,568 @@ +package swarm + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/rs/zerolog" + + "github.com/pgEdge/control-plane/server/internal/config" +) + +// testCfg returns a config with an isolated cache directory in t.TempDir(). +func testCfg(t *testing.T, extra ...func(*config.DockerSwarm)) (config.Config, string) { + t.Helper() + cacheDir := t.TempDir() + cfg := config.Config{ + DockerSwarm: config.DockerSwarm{ + ImageRepositoryHost: "ghcr.io/pgedge", + }, + } + for _, fn := range extra { + fn(&cfg.DockerSwarm) + } + return cfg, filepath.Join(cacheDir, "manifest-cache.json") +} + +func nopLogger() zerolog.Logger { return zerolog.Nop() } + +// validManifest returns a well-formed manifest JSON matching the embedded one. +func validManifest(t *testing.T) []byte { + t.Helper() + data, err := json.Marshal(map[string]any{ + "schema_version": 1, + "images": map[string]any{ + "postgres": []map[string]any{ + { + "postgres_version": "17.10", + "spock_version": "5", + "image": "pgedge-postgres:17.10-spock5.0.8-standard-1", + "stability": "stable", + "default": true, + }, + }, + "mcp": []map[string]any{ + {"version": "latest", "image": "postgres-mcp:latest", "stability": "stable", "default": true}, + }, + "postgrest": []map[string]any{ + {"version": "14.5", "image": "postgrest:14.5", "stability": "stable", "default": true}, + }, + "rag": []map[string]any{ + {"version": "latest", "image": "rag-server:latest", "stability": "stable", "default": true}, + }, + }, + }) + if err != nil { + t.Fatalf("marshal test manifest: %v", err) + } + return data +} + +// TestManifestLoader_LoadFromEmbedded verifies the loader falls back to the +// embedded manifest when no URL, cache, or manifest_path is available. +func TestManifestLoader_LoadFromEmbedded(t *testing.T) { + cfg, cachePath := testCfg(t) + // Point at a server that always 500s to force URL failure. + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer srv.Close() + cfg.DockerSwarm.ManifestURL = srv.URL + + loader := NewManifestLoader(context.Background(), cfg, nopLogger(), + withCachePath(cachePath), + withHTTPClient(srv.Client()), + ) + + v := loader.Versions() + if v == nil { + t.Fatal("expected non-nil Versions from embedded manifest") + } + if v.Default() == nil { + t.Fatal("expected non-nil default version from embedded manifest") + } + if len(v.Supported()) == 0 { + t.Fatal("expected at least one supported version from embedded manifest") + } + + sv := loader.ServiceVersions() + if sv == nil { + t.Fatal("expected non-nil ServiceVersions from embedded manifest") + } +} + +// TestManifestLoader_LoadFromURL verifies the happy-path URL fetch. +func TestManifestLoader_LoadFromURL(t *testing.T) { + cfg, cachePath := testCfg(t) + manifest := validManifest(t) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write(manifest) + })) + defer srv.Close() + cfg.DockerSwarm.ManifestURL = srv.URL + + loader := NewManifestLoader(context.Background(), cfg, nopLogger(), + withCachePath(cachePath), + withHTTPClient(srv.Client()), + ) + + v := loader.Versions() + if v == nil { + t.Fatal("expected non-nil Versions") + } + def := v.Default() + if def == nil { + t.Fatal("expected non-nil default version") + } + if def.PostgresVersion.String() != "17.10" { + t.Errorf("default postgres version = %s, want 17.10", def.PostgresVersion) + } + + // Cache should have been written. + if _, err := os.Stat(cachePath); err != nil { + t.Errorf("expected cache file at %s: %v", cachePath, err) + } +} + +// TestManifestLoader_LoadFromCache verifies that a stale URL causes the loader +// to fall back to the disk cache. +func TestManifestLoader_LoadFromCache(t *testing.T) { + cfg, cachePath := testCfg(t) + manifest := validManifest(t) + + // Pre-populate the cache. + if err := os.MkdirAll(filepath.Dir(cachePath), 0o755); err != nil { + t.Fatal(err) + } + if err := os.WriteFile(cachePath, manifest, 0o644); err != nil { + t.Fatal(err) + } + + // URL always fails. + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusServiceUnavailable) + })) + defer srv.Close() + cfg.DockerSwarm.ManifestURL = srv.URL + + loader := NewManifestLoader(context.Background(), cfg, nopLogger(), + withCachePath(cachePath), + withHTTPClient(srv.Client()), + ) + + v := loader.Versions() + if v == nil { + t.Fatal("expected Versions from cache") + } + if v.Default() == nil { + t.Fatal("expected non-nil default version from cache") + } +} + +// TestManifestLoader_LoadFromManifestPath verifies the local file override. +func TestManifestLoader_LoadFromManifestPath(t *testing.T) { + manifest := validManifest(t) + mfFile := filepath.Join(t.TempDir(), "local-manifest.json") + if err := os.WriteFile(mfFile, manifest, 0o644); err != nil { + t.Fatal(err) + } + + _, cachePath := testCfg(t) + cfg := config.Config{ + DockerSwarm: config.DockerSwarm{ + ImageRepositoryHost: "ghcr.io/pgedge", + ManifestPath: mfFile, + }, + } + + loader := NewManifestLoader(context.Background(), cfg, nopLogger(), + withCachePath(cachePath), + ) + + v := loader.Versions() + if v == nil { + t.Fatal("expected Versions from manifest_path") + } + if v.Default().PostgresVersion.String() != "17.10" { + t.Errorf("default version = %s, want 17.10", v.Default().PostgresVersion) + } +} + +// TestManifestLoader_ManifestPathMissing verifies fallback to embedded when +// manifest_path points to a non-existent file. +func TestManifestLoader_ManifestPathMissing(t *testing.T) { + _, cachePath := testCfg(t) + cfg := config.Config{ + DockerSwarm: config.DockerSwarm{ + ImageRepositoryHost: "ghcr.io/pgedge", + ManifestPath: "/does/not/exist/manifest.json", + }, + } + + loader := NewManifestLoader(context.Background(), cfg, nopLogger(), + withCachePath(cachePath), + ) + + // Should have fallen back to embedded. + if loader.Versions() == nil { + t.Fatal("expected non-nil Versions after fallback to embedded") + } +} + +// TestManifestLoader_InvalidSchemaVersion verifies that a manifest with an +// unsupported schema_version causes URL/cache to be skipped and falls back. +func TestManifestLoader_InvalidSchemaVersion(t *testing.T) { + cfg, cachePath := testCfg(t) + badManifest := []byte(`{"schema_version":99,"images":{}}`) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _, _ = w.Write(badManifest) + })) + defer srv.Close() + cfg.DockerSwarm.ManifestURL = srv.URL + + loader := NewManifestLoader(context.Background(), cfg, nopLogger(), + withCachePath(cachePath), + withHTTPClient(srv.Client()), + ) + + // Falls back to embedded — should still work. + if loader.Versions() == nil { + t.Fatal("expected fallback to embedded on invalid schema_version") + } +} + +// TestManifestLoader_MalformedJSON verifies that malformed JSON causes fallback. +func TestManifestLoader_MalformedJSON(t *testing.T) { + cfg, cachePath := testCfg(t) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _, _ = w.Write([]byte(`{not valid json`)) + })) + defer srv.Close() + cfg.DockerSwarm.ManifestURL = srv.URL + + loader := NewManifestLoader(context.Background(), cfg, nopLogger(), + withCachePath(cachePath), + withHTTPClient(srv.Client()), + ) + + if loader.Versions() == nil { + t.Fatal("expected fallback to embedded on malformed JSON") + } +} + +// TestManifestLoader_NoManifestPathNoRefresh verifies that when manifest_path +// is set the background refresh goroutine is NOT started (we check indirectly +// by confirming versions don't change after a URL update). +func TestManifestLoader_NoRefreshWhenManifestPathSet(t *testing.T) { + manifest := validManifest(t) + mfFile := filepath.Join(t.TempDir(), "local-manifest.json") + if err := os.WriteFile(mfFile, manifest, 0o644); err != nil { + t.Fatal(err) + } + + _, cachePath := testCfg(t) + cfg := config.Config{ + DockerSwarm: config.DockerSwarm{ + ImageRepositoryHost: "ghcr.io/pgedge", + ManifestPath: mfFile, + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + loader := NewManifestLoader(ctx, cfg, nopLogger(), withCachePath(cachePath)) + + origDefault := loader.Versions().Default().PostgresVersion.String() + + // Even after a brief wait the versions should not change (no refresh goroutine). + time.Sleep(20 * time.Millisecond) + if loader.Versions().Default().PostgresVersion.String() != origDefault { + t.Error("versions changed unexpectedly when manifest_path is set") + } +} + +// TestManifestLoader_RefreshSuccess verifies that refresh() updates in-memory +// versions when a new valid manifest is served. +func TestManifestLoader_RefreshSuccess(t *testing.T) { + cfg, cachePath := testCfg(t) + + // First serve a manifest with PG 17.10. + current := validManifest(t) + + // Prepare an updated manifest with PG 16.14 as default. + updated, err := json.Marshal(map[string]any{ + "schema_version": 1, + "images": map[string]any{ + "postgres": []map[string]any{ + { + "postgres_version": "16.14", + "spock_version": "5", + "image": "pgedge-postgres:16.14-spock5.0.8-standard-1", + "stability": "stable", + "default": true, + }, + }, + }, + }) + if err != nil { + t.Fatal(err) + } + + serve := current + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _, _ = w.Write(serve) + })) + defer srv.Close() + cfg.DockerSwarm.ManifestURL = srv.URL + + loader := NewManifestLoader(context.Background(), cfg, nopLogger(), + withCachePath(cachePath), + withHTTPClient(srv.Client()), + ) + + if loader.Versions().Default().PostgresVersion.String() != "17.10" { + t.Fatalf("expected initial default 17.10, got %s", loader.Versions().Default().PostgresVersion) + } + + // Swap what the server returns, then trigger a refresh. + serve = updated + loader.refresh(context.Background()) + + if loader.Versions().Default().PostgresVersion.String() != "16.14" { + t.Errorf("expected refreshed default 16.14, got %s", loader.Versions().Default().PostgresVersion) + } +} + +// TestManifestLoader_RefreshFailure verifies that a failing refresh leaves +// in-memory versions unchanged. +func TestManifestLoader_RefreshFailure(t *testing.T) { + cfg, cachePath := testCfg(t) + manifest := validManifest(t) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _, _ = w.Write(manifest) + })) + defer srv.Close() + cfg.DockerSwarm.ManifestURL = srv.URL + + loader := NewManifestLoader(context.Background(), cfg, nopLogger(), + withCachePath(cachePath), + withHTTPClient(srv.Client()), + ) + + origDefault := loader.Versions().Default().PostgresVersion.String() + + // Now make the server return garbage. + manifest = []byte(`{invalid}`) + loader.refresh(context.Background()) + + if loader.Versions().Default().PostgresVersion.String() != origDefault { + t.Error("versions changed after a failed refresh") + } +} + +// TestBuildVersions_MatchesNewVersions verifies that buildVersions produces +// the same set of supported versions as the hardcoded NewVersions function +// when given the embedded manifest. +func TestBuildVersions_MatchesNewVersions(t *testing.T) { + cfg := config.Config{ + DockerSwarm: config.DockerSwarm{ + ImageRepositoryHost: "ghcr.io/pgedge", + }, + } + + var mf versionManifest + if err := json.Unmarshal(embeddedManifest, &mf); err != nil { + t.Fatalf("unmarshal embedded manifest: %v", err) + } + + got, err := buildVersions(cfg, &mf) + if err != nil { + t.Fatalf("buildVersions: %v", err) + } + + want := NewVersions(cfg) + + if len(got.Supported()) != len(want.Supported()) { + t.Errorf("Supported() len = %d, want %d", len(got.Supported()), len(want.Supported())) + } + + for _, wv := range want.Supported() { + imgs, err := got.GetImages(wv) + if err != nil { + t.Errorf("GetImages(%s) not found in manifest-built Versions: %v", wv, err) + continue + } + wantImgs, _ := want.GetImages(wv) + if imgs.PgEdgeImage != wantImgs.PgEdgeImage { + t.Errorf("GetImages(%s).PgEdgeImage = %q, want %q", wv, imgs.PgEdgeImage, wantImgs.PgEdgeImage) + } + } + + if got.Default().PostgresVersion.String() != want.Default().PostgresVersion.String() { + t.Errorf("Default() = %s, want %s", got.Default().PostgresVersion, want.Default().PostgresVersion) + } +} + +// TestBuildServiceVersions_MatchesNewServiceVersions verifies that +// buildServiceVersions produces the same registrations as NewServiceVersions +// for the embedded manifest. +func TestBuildServiceVersions_MatchesNewServiceVersions(t *testing.T) { + cfg := config.Config{ + DockerSwarm: config.DockerSwarm{ + ImageRepositoryHost: "ghcr.io/pgedge", + }, + } + + var mf versionManifest + if err := json.Unmarshal(embeddedManifest, &mf); err != nil { + t.Fatalf("unmarshal embedded manifest: %v", err) + } + + got, err := buildServiceVersions(cfg, &mf) + if err != nil { + t.Fatalf("buildServiceVersions: %v", err) + } + + want := NewServiceVersions(cfg) + + serviceTypes := []string{"mcp", "postgrest", "rag"} + for _, svc := range serviceTypes { + gotVers, err := got.SupportedServiceVersions(svc) + if err != nil { + t.Errorf("SupportedServiceVersions(%q) error: %v", svc, err) + continue + } + wantVers, _ := want.SupportedServiceVersions(svc) + if len(gotVers) != len(wantVers) { + t.Errorf("SupportedServiceVersions(%q) len = %d, want %d", svc, len(gotVers), len(wantVers)) + } + + for _, ver := range wantVers { + gotImg, err := got.GetServiceImage(svc, ver) + if err != nil { + t.Errorf("GetServiceImage(%q, %q) not found: %v", svc, ver, err) + continue + } + wantImg, _ := want.GetServiceImage(svc, ver) + if gotImg.Tag != wantImg.Tag { + t.Errorf("GetServiceImage(%q, %q).Tag = %q, want %q", svc, ver, gotImg.Tag, wantImg.Tag) + } + } + } +} + +// TestManifestLoader_RealURL exercises the loader against a real HTTP server. +// Run with a local file server already serving version-manifest.json: +// +// cd server/internal/orchestrator/swarm && python3 -m http.server 9090 +// MANIFEST_TEST_URL=http://localhost:9090/version-manifest.json \ +// go test -v -run TestManifestLoader_RealURL ./server/internal/orchestrator/swarm/... +func TestManifestLoader_RealURL(t *testing.T) { + url := os.Getenv("MANIFEST_TEST_URL") + if url == "" { + t.Skip("MANIFEST_TEST_URL not set; skipping real-URL integration test") + } + + _, cachePath := testCfg(t) + cfg := config.Config{ + DockerSwarm: config.DockerSwarm{ + ImageRepositoryHost: "ghcr.io/pgedge", + ManifestURL: url, + }, + } + + loader := NewManifestLoader(context.Background(), cfg, zerolog.New(os.Stderr).With().Timestamp().Logger(), + withCachePath(cachePath), + ) + + v := loader.Versions() + if v == nil { + t.Fatal("expected non-nil Versions from real URL") + } + t.Logf("loaded %d supported versions; default=%s", len(v.Supported()), v.Default().PostgresVersion) + + // Cache file must have been written. + if _, err := os.Stat(cachePath); err != nil { + t.Errorf("expected cache file written at %s: %v", cachePath, err) + } + + // Force a refresh and confirm versions are still intact. + loader.refresh(context.Background()) + if loader.Versions().Default().PostgresVersion.String() != v.Default().PostgresVersion.String() { + t.Error("default version changed unexpectedly after refresh") + } + t.Logf("refresh OK; default still %s", loader.Versions().Default().PostgresVersion) +} + +// TestValidateManifest covers schema_version and JSON validation. +func TestValidateManifest(t *testing.T) { + m := &ManifestLoader{logger: nopLogger()} + + if err := m.validateManifest(embeddedManifest); err != nil { + t.Errorf("embedded manifest should be valid: %v", err) + } + + if err := m.validateManifest([]byte(`{"schema_version":2,"images":{}}`)); err == nil { + t.Error("expected error for schema_version 2") + } + + if err := m.validateManifest([]byte(`not json`)); err == nil { + t.Error("expected error for non-JSON input") + } +} + +// TestManifestLoader_ImageTagsHaveRegistryPrefix verifies that all image tags +// returned by Versions and ServiceVersions include the configured registry +// host. +func TestManifestLoader_ImageTagsHaveRegistryPrefix(t *testing.T) { + cfg, cachePath := testCfg(t) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer srv.Close() + cfg.DockerSwarm.ManifestURL = srv.URL + + loader := NewManifestLoader(context.Background(), cfg, nopLogger(), + withCachePath(cachePath), + withHTTPClient(srv.Client()), + ) + + for _, pv := range loader.Versions().Supported() { + imgs, err := loader.Versions().GetImages(pv) + if err != nil { + t.Errorf("GetImages(%s): %v", pv, err) + continue + } + if !strings.HasPrefix(imgs.PgEdgeImage, "ghcr.io/pgedge/") { + t.Errorf("PgEdgeImage %q missing registry prefix", imgs.PgEdgeImage) + } + } + + for _, svc := range []string{"mcp", "postgrest", "rag"} { + vers, _ := loader.ServiceVersions().SupportedServiceVersions(svc) + for _, ver := range vers { + img, err := loader.ServiceVersions().GetServiceImage(svc, ver) + if err != nil { + t.Errorf("GetServiceImage(%q, %q): %v", svc, ver, err) + continue + } + if !strings.HasPrefix(img.Tag, "ghcr.io/pgedge/") { + t.Errorf("service %q version %q Tag %q missing registry prefix", svc, ver, img.Tag) + } + } + } +} From 7d7cdcd174822d97499b485237746ba9114e178a Mon Sep 17 00:00:00 2001 From: Siva Date: Fri, 29 May 2026 21:11:25 +0530 Subject: [PATCH 4/9] addressing AI review commits --- .../orchestrator/swarm/manifest_loader.go | 32 ++++-- .../swarm/manifest_loader_test.go | 97 +++++++++++++++---- 2 files changed, 104 insertions(+), 25 deletions(-) diff --git a/server/internal/orchestrator/swarm/manifest_loader.go b/server/internal/orchestrator/swarm/manifest_loader.go index 8af6d795..7def2e32 100644 --- a/server/internal/orchestrator/swarm/manifest_loader.go +++ b/server/internal/orchestrator/swarm/manifest_loader.go @@ -68,6 +68,10 @@ func withHTTPClient(c *http.Client) manifestLoaderOption { return func(m *ManifestLoader) { m.httpClient = c } } +func withTickerC(ch <-chan time.Time) manifestLoaderOption { + return func(m *ManifestLoader) { m.tickerC = ch } +} + // ManifestLoader loads the version manifest from a remote URL (with disk // caching and hourly refresh) or from a local file, and exposes the parsed // *Versions and *ServiceVersions it produces. @@ -83,6 +87,7 @@ type ManifestLoader struct { logger zerolog.Logger cachePath string httpClient *http.Client + tickerC <-chan time.Time // nil → use default hourly ticker; injectable for tests mu sync.RWMutex versions *Versions @@ -134,8 +139,10 @@ func (m *ManifestLoader) load() { data, src := m.resolve() v, sv, err := m.parseManifestData(data) if err != nil { - // Embedded manifest is malformed — this is a build-time error. - panic(fmt.Sprintf("manifest_loader: failed to parse manifest from %s: %v", src, err)) + // resolve() fully parse-validates every non-embedded source before + // returning it, so this branch is only reachable when the embedded + // manifest itself is corrupt — a build-time error. + panic(fmt.Sprintf("manifest_loader: failed to parse embedded manifest (%s): %v", src, err)) } m.mu.Lock() m.versions = v @@ -230,13 +237,18 @@ func (m *ManifestLoader) writeCache(data []byte) error { // refreshLoop runs in the background and refreshes the manifest every hour. // It stops when ctx is cancelled. func (m *ManifestLoader) refreshLoop(ctx context.Context) { - ticker := time.NewTicker(refreshInterval) - defer ticker.Stop() + tickC := m.tickerC + var ticker *time.Ticker + if tickC == nil { + ticker = time.NewTicker(refreshInterval) + defer ticker.Stop() + tickC = ticker.C + } for { select { case <-ctx.Done(): return - case <-ticker.C: + case <-tickC: m.refresh(ctx) } } @@ -270,8 +282,11 @@ func (m *ManifestLoader) refresh(ctx context.Context) { m.logger.Info().Str("url", u).Msg("version manifest refreshed") } -// validateManifest checks that data is valid JSON and has a supported -// schema_version. +// validateManifest checks that data is valid JSON, has a supported +// schema_version, and can be fully parsed into *Versions/*ServiceVersions. +// Performing a full parse here means resolve() only returns data that will +// succeed in parseManifestData, so load()'s panic is truly unreachable except +// for a corrupt embedded manifest. func (m *ManifestLoader) validateManifest(data []byte) error { var mf versionManifest if err := json.Unmarshal(data, &mf); err != nil { @@ -280,6 +295,9 @@ func (m *ManifestLoader) validateManifest(data []byte) error { if mf.SchemaVersion != supportedSchemaVersion { return fmt.Errorf("unsupported schema_version %d (want %d)", mf.SchemaVersion, supportedSchemaVersion) } + if _, _, err := m.parseManifestData(data); err != nil { + return fmt.Errorf("manifest not parseable: %w", err) + } return nil } diff --git a/server/internal/orchestrator/swarm/manifest_loader_test.go b/server/internal/orchestrator/swarm/manifest_loader_test.go index 26520ccc..fbe70294 100644 --- a/server/internal/orchestrator/swarm/manifest_loader_test.go +++ b/server/internal/orchestrator/swarm/manifest_loader_test.go @@ -76,7 +76,10 @@ func TestManifestLoader_LoadFromEmbedded(t *testing.T) { defer srv.Close() cfg.DockerSwarm.ManifestURL = srv.URL - loader := NewManifestLoader(context.Background(), cfg, nopLogger(), + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + loader := NewManifestLoader(ctx, cfg, nopLogger(), withCachePath(cachePath), withHTTPClient(srv.Client()), ) @@ -110,7 +113,10 @@ func TestManifestLoader_LoadFromURL(t *testing.T) { defer srv.Close() cfg.DockerSwarm.ManifestURL = srv.URL - loader := NewManifestLoader(context.Background(), cfg, nopLogger(), + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + loader := NewManifestLoader(ctx, cfg, nopLogger(), withCachePath(cachePath), withHTTPClient(srv.Client()), ) @@ -154,7 +160,10 @@ func TestManifestLoader_LoadFromCache(t *testing.T) { defer srv.Close() cfg.DockerSwarm.ManifestURL = srv.URL - loader := NewManifestLoader(context.Background(), cfg, nopLogger(), + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + loader := NewManifestLoader(ctx, cfg, nopLogger(), withCachePath(cachePath), withHTTPClient(srv.Client()), ) @@ -230,7 +239,10 @@ func TestManifestLoader_InvalidSchemaVersion(t *testing.T) { defer srv.Close() cfg.DockerSwarm.ManifestURL = srv.URL - loader := NewManifestLoader(context.Background(), cfg, nopLogger(), + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + loader := NewManifestLoader(ctx, cfg, nopLogger(), withCachePath(cachePath), withHTTPClient(srv.Client()), ) @@ -251,7 +263,10 @@ func TestManifestLoader_MalformedJSON(t *testing.T) { defer srv.Close() cfg.DockerSwarm.ManifestURL = srv.URL - loader := NewManifestLoader(context.Background(), cfg, nopLogger(), + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + loader := NewManifestLoader(ctx, cfg, nopLogger(), withCachePath(cachePath), withHTTPClient(srv.Client()), ) @@ -261,9 +276,13 @@ func TestManifestLoader_MalformedJSON(t *testing.T) { } } -// TestManifestLoader_NoManifestPathNoRefresh verifies that when manifest_path -// is set the background refresh goroutine is NOT started (we check indirectly -// by confirming versions don't change after a URL update). +// TestManifestLoader_NoRefreshWhenManifestPathSet verifies that when +// manifest_path is set the background refresh goroutine is NOT started. +// +// Strategy: inject a pre-fired ticker via withTickerC. If the goroutine were +// started, it would immediately consume the tick, fetch the URL (which returns +// PG 16.14), and update the default version. We then assert the version is +// still 17.10 (from the local file), proving no goroutine ran. func TestManifestLoader_NoRefreshWhenManifestPathSet(t *testing.T) { manifest := validManifest(t) mfFile := filepath.Join(t.TempDir(), "local-manifest.json") @@ -271,25 +290,55 @@ func TestManifestLoader_NoRefreshWhenManifestPathSet(t *testing.T) { t.Fatal(err) } + // Serve a different default version at the URL so any refresh would be detectable. + updated, _ := json.Marshal(map[string]any{ + "schema_version": 1, + "images": map[string]any{ + "postgres": []map[string]any{ + { + "postgres_version": "16.14", + "spock_version": "5", + "image": "pgedge-postgres:16.14-spock5.0.8-standard-1", + "stability": "stable", + "default": true, + }, + }, + }, + }) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _, _ = w.Write(updated) + })) + defer srv.Close() + _, cachePath := testCfg(t) cfg := config.Config{ DockerSwarm: config.DockerSwarm{ ImageRepositoryHost: "ghcr.io/pgedge", ManifestPath: mfFile, + ManifestURL: srv.URL, }, } + // Pre-fired ticker: if the refresh goroutine were started it would consume + // this tick immediately and refresh from the URL. + immediateTick := make(chan time.Time, 1) + immediateTick <- time.Now() + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - loader := NewManifestLoader(ctx, cfg, nopLogger(), withCachePath(cachePath)) + loader := NewManifestLoader(ctx, cfg, nopLogger(), + withCachePath(cachePath), + withHTTPClient(srv.Client()), + withTickerC(immediateTick), + ) - origDefault := loader.Versions().Default().PostgresVersion.String() + // Allow enough time for the goroutine (if it existed) to process the tick + // and complete an HTTP round-trip to localhost. + time.Sleep(50 * time.Millisecond) - // Even after a brief wait the versions should not change (no refresh goroutine). - time.Sleep(20 * time.Millisecond) - if loader.Versions().Default().PostgresVersion.String() != origDefault { - t.Error("versions changed unexpectedly when manifest_path is set") + if got := loader.Versions().Default().PostgresVersion.String(); got != "17.10" { + t.Errorf("default version = %q; refresh goroutine must not run when manifest_path is set", got) } } @@ -327,7 +376,10 @@ func TestManifestLoader_RefreshSuccess(t *testing.T) { defer srv.Close() cfg.DockerSwarm.ManifestURL = srv.URL - loader := NewManifestLoader(context.Background(), cfg, nopLogger(), + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + loader := NewManifestLoader(ctx, cfg, nopLogger(), withCachePath(cachePath), withHTTPClient(srv.Client()), ) @@ -357,7 +409,10 @@ func TestManifestLoader_RefreshFailure(t *testing.T) { defer srv.Close() cfg.DockerSwarm.ManifestURL = srv.URL - loader := NewManifestLoader(context.Background(), cfg, nopLogger(), + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + loader := NewManifestLoader(ctx, cfg, nopLogger(), withCachePath(cachePath), withHTTPClient(srv.Client()), ) @@ -484,7 +539,10 @@ func TestManifestLoader_RealURL(t *testing.T) { }, } - loader := NewManifestLoader(context.Background(), cfg, zerolog.New(os.Stderr).With().Timestamp().Logger(), + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + loader := NewManifestLoader(ctx, cfg, zerolog.New(os.Stderr).With().Timestamp().Logger(), withCachePath(cachePath), ) @@ -536,7 +594,10 @@ func TestManifestLoader_ImageTagsHaveRegistryPrefix(t *testing.T) { defer srv.Close() cfg.DockerSwarm.ManifestURL = srv.URL - loader := NewManifestLoader(context.Background(), cfg, nopLogger(), + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + loader := NewManifestLoader(ctx, cfg, nopLogger(), withCachePath(cachePath), withHTTPClient(srv.Client()), ) From 28ea7b79295dc3a6c0d99e3268c9f76d3057bf63 Mon Sep 17 00:00:00 2001 From: Siva Date: Mon, 1 Jun 2026 22:50:32 +0530 Subject: [PATCH 5/9] addressing review comments --- server/internal/config/config.go | 5 + server/internal/logging/factory.go | 1 + server/internal/orchestrator/swarm/images.go | 6 +- .../orchestrator/swarm/manifest_loader.go | 222 ++++++++++-------- .../swarm/manifest_loader_test.go | 134 +++++++---- .../orchestrator/swarm/service_images.go | 4 +- 6 files changed, 228 insertions(+), 144 deletions(-) diff --git a/server/internal/config/config.go b/server/internal/config/config.go index 5a05aae2..a9c1316b 100644 --- a/server/internal/config/config.go +++ b/server/internal/config/config.go @@ -108,6 +108,10 @@ func (d DockerSwarm) validate() []error { return errs } +// DefaultManifestURL is the pgEdge CDN URL used when no manifest_url is configured. +// TODO(PLAT-598): Replace with the real URL once the hosting location is confirmed. +const DefaultManifestURL = "https://download.pgedge.com/manifests/version-manifest.json" + var defaultDockerSwarm = DockerSwarm{ ImageRepositoryHost: "ghcr.io/pgedge", // This combination gives us 256 subnets with 16 addresses each. @@ -116,6 +120,7 @@ var defaultDockerSwarm = DockerSwarm{ // This combination gives us 256 subnets with 64 addresses each. DatabaseNetworksCIDR: "10.128.128.0/18", DatabaseNetworksSubnetBits: 26, + ManifestURL: DefaultManifestURL, } type SystemD struct { diff --git a/server/internal/logging/factory.go b/server/internal/logging/factory.go index dbe9ede5..7532c8ff 100644 --- a/server/internal/logging/factory.go +++ b/server/internal/logging/factory.go @@ -19,6 +19,7 @@ const ( ComponentDatabaseService Component = "database_service" ComponentElectionCandidate Component = "election_candidate" ComponentEmbeddedEtcd Component = "embedded_etcd" + ComponentManifestLoader Component = "manifest_loader" ComponentMigration Component = "migration" ComponentMigrationRunner Component = "migration_runner" ComponentPortsService Component = "ports_service" diff --git a/server/internal/orchestrator/swarm/images.go b/server/internal/orchestrator/swarm/images.go index 4388a727..7d61ac78 100644 --- a/server/internal/orchestrator/swarm/images.go +++ b/server/internal/orchestrator/swarm/images.go @@ -80,11 +80,11 @@ func NewVersions(cfg config.Config) *Versions { return versions } -func (v *Versions) Supported() []*ds.PgEdgeVersion { +func (v Versions) Supported() []*ds.PgEdgeVersion { return v.supportedVersions } -func (v *Versions) Default() *ds.PgEdgeVersion { +func (v Versions) Default() *ds.PgEdgeVersion { return v.defaultVersion } @@ -100,7 +100,7 @@ func (v *Versions) addImage(version *ds.PgEdgeVersion, images *Images) { v.supportedVersions = append(v.supportedVersions, version) } -func (v *Versions) GetImages(version *ds.PgEdgeVersion) (*Images, error) { +func (v Versions) GetImages(version *ds.PgEdgeVersion) (*Images, error) { pgv := version.PostgresVersion.String() sv := version.SpockVersion.String() diff --git a/server/internal/orchestrator/swarm/manifest_loader.go b/server/internal/orchestrator/swarm/manifest_loader.go index 7def2e32..23083032 100644 --- a/server/internal/orchestrator/swarm/manifest_loader.go +++ b/server/internal/orchestrator/swarm/manifest_loader.go @@ -2,6 +2,8 @@ package swarm import ( "context" + "crypto/sha256" + "encoding/hex" "encoding/json" "fmt" "io" @@ -17,26 +19,38 @@ import ( "github.com/pgEdge/control-plane/server/internal/config" "github.com/pgEdge/control-plane/server/internal/ds" + "github.com/pgEdge/control-plane/server/internal/logging" ) //go:embed version-manifest.json var embeddedManifest []byte const ( - // DefaultManifestURL is the URL used when no manifest_url is configured. - // TODO(PLAT-598): Replace with the real URL once the hosting location is confirmed. - DefaultManifestURL = "https://download.pgedge.com/manifests/version-manifest.json" - - manifestCacheFilename = "version-manifest-cache.json" refreshInterval = time.Hour fetchTimeout = 10 * time.Second supportedSchemaVersion = 1 ) +// manifestCachePath returns the cache file path for a given manifest URL. +// The filename embeds a short hash of the URL so that switching to a different +// URL automatically uses a fresh cache file instead of a stale one. +func manifestCachePath(dir, url string) string { + sum := sha256.Sum256([]byte(url)) + return filepath.Join(dir, fmt.Sprintf("version-manifest-cache-%s.json", hex.EncodeToString(sum[:8]))) +} + +// versionManifestImages is the typed images block inside version-manifest.json. +type versionManifestImages struct { + Postgres []manifestPostgresEntry `json:"postgres"` + PostgREST []manifestServiceEntry `json:"postgrest"` + MCP []manifestServiceEntry `json:"mcp"` + RAG []manifestServiceEntry `json:"rag"` +} + // versionManifest is the top-level structure of version-manifest.json. type versionManifest struct { - SchemaVersion int `json:"schema_version"` - Images map[string]json.RawMessage `json:"images"` + SchemaVersion int `json:"schema_version"` + Images versionManifestImages `json:"images"` } // manifestPostgresEntry is one entry under images.postgres in the manifest. @@ -72,22 +86,35 @@ func withTickerC(ch <-chan time.Time) manifestLoaderOption { return func(m *ManifestLoader) { m.tickerC = ch } } +// withEmbeddedFallback enables the embedded-manifest fallback regardless of +// the configured URL. Used in tests that simulate URL failure while still +// exercising the default-URL resolution chain. +func withEmbeddedFallback() manifestLoaderOption { + return func(m *ManifestLoader) { m.embeddedFallback = true } +} + // ManifestLoader loads the version manifest from a remote URL (with disk // caching and hourly refresh) or from a local file, and exposes the parsed // *Versions and *ServiceVersions it produces. // -// Resolution order on startup: -// 1. Local file (cfg.DockerSwarm.ManifestPath), if set — no network fetch, no refresh. -// 2. Remote URL (cfg.DockerSwarm.ManifestURL, defaulting to DefaultManifestURL). -// 3. Disk cache at defaultCachePath (or the path set by withCachePath). -// 4. Embedded binary manifest (always succeeds; panics only if the embedded -// JSON is corrupt, which would indicate a broken build). +// Resolution chains: +// +// 1. manifest_path set (local file): local file only — no fallback, returns +// error if the file is missing or invalid. +// +// 2. Custom manifest_url (differs from DefaultManifestURL): remote URL → +// disk cache — no embedded fallback; returns error if all sources fail. +// +// 3. Default manifest_url: remote URL → disk cache → embedded manifest +// (always succeeds; panics only if the embedded JSON is corrupt, which +// indicates a broken build). type ManifestLoader struct { - cfg config.Config - logger zerolog.Logger - cachePath string - httpClient *http.Client - tickerC <-chan time.Time // nil → use default hourly ticker; injectable for tests + cfg config.Config + logger zerolog.Logger + cachePath string + httpClient *http.Client + tickerC <-chan time.Time // nil → use default hourly ticker; injectable for tests + embeddedFallback bool // set when using the default URL; injectable for tests mu sync.RWMutex versions *Versions @@ -96,13 +123,15 @@ type ManifestLoader struct { // NewManifestLoader creates and starts a ManifestLoader. It loads the // manifest synchronously before returning so callers always get a valid -// *Versions / *ServiceVersions immediately. Background refresh (if -// applicable) is started as a goroutine tied to ctx. -func NewManifestLoader(ctx context.Context, cfg config.Config, logger zerolog.Logger, opts ...manifestLoaderOption) *ManifestLoader { +// *Versions / *ServiceVersions immediately. Returns an error when using a +// custom manifest_path or manifest_url and the manifest cannot be loaded. +// Background refresh (if applicable) is started as a goroutine tied to ctx. +func NewManifestLoader(ctx context.Context, cfg config.Config, loggerFactory *logging.Factory, opts ...manifestLoaderOption) (*ManifestLoader, error) { m := &ManifestLoader{ - cfg: cfg, - logger: logger.With().Str("component", "manifest_loader").Logger(), - cachePath: filepath.Join(cfg.DataDir, "manifests", manifestCacheFilename), + cfg: cfg, + logger: loggerFactory.Logger(logging.ComponentManifestLoader), + cachePath: manifestCachePath(filepath.Join(cfg.DataDir, "manifests"), cfg.DockerSwarm.ManifestURL), + embeddedFallback: cfg.DockerSwarm.ManifestURL == config.DefaultManifestURL, httpClient: &http.Client{ Timeout: fetchTimeout, }, @@ -111,65 +140,82 @@ func NewManifestLoader(ctx context.Context, cfg config.Config, logger zerolog.Lo o(m) } - m.load() + if err := m.load(); err != nil { + return nil, err + } if cfg.DockerSwarm.ManifestPath == "" { go m.refreshLoop(ctx) } - return m + return m, nil } -// Versions returns the current parsed *Versions. -func (m *ManifestLoader) Versions() *Versions { +// Versions returns a snapshot of the current parsed Versions. +func (m *ManifestLoader) Versions() Versions { m.mu.RLock() defer m.mu.RUnlock() - return m.versions + return *m.versions } -// ServiceVersions returns the current parsed *ServiceVersions. -func (m *ManifestLoader) ServiceVersions() *ServiceVersions { +// ServiceVersions returns a snapshot of the current parsed ServiceVersions. +func (m *ManifestLoader) ServiceVersions() ServiceVersions { m.mu.RLock() defer m.mu.RUnlock() - return m.svcVersions + return *m.svcVersions } -// load performs the initial synchronous load, trying each source in order. -func (m *ManifestLoader) load() { - data, src := m.resolve() +// load performs the initial synchronous load. +func (m *ManifestLoader) load() error { + data, src, err := m.resolve() + if err != nil { + return err + } v, sv, err := m.parseManifestData(data) if err != nil { - // resolve() fully parse-validates every non-embedded source before - // returning it, so this branch is only reachable when the embedded - // manifest itself is corrupt — a build-time error. - panic(fmt.Sprintf("manifest_loader: failed to parse embedded manifest (%s): %v", src, err)) + if src == "embedded" { + // The embedded manifest is corrupt — this is a build-time error. + panic(fmt.Sprintf("manifest_loader: corrupt embedded manifest: %v", err)) + } + return fmt.Errorf("parse manifest from %s: %w", src, err) } m.mu.Lock() m.versions = v m.svcVersions = sv m.mu.Unlock() m.logger.Info().Str("source", src).Msg("version manifest loaded") + return nil } -// resolve returns the raw manifest bytes and a human-readable source label, -// falling back through the resolution order. -func (m *ManifestLoader) resolve() ([]byte, string) { - // 1. Local file override. +// resolve selects the appropriate resolution chain and returns raw manifest +// bytes plus a human-readable source label. +func (m *ManifestLoader) resolve() ([]byte, string, error) { if p := m.cfg.DockerSwarm.ManifestPath; p != "" { - data, err := os.ReadFile(p) - if err != nil { - m.logger.Warn().Err(err).Str("path", p).Msg("failed to read manifest_path; falling back to embedded manifest") - } else if err = m.validateManifest(data); err != nil { - m.logger.Warn().Err(err).Str("path", p).Msg("manifest_path validation failed; falling back to embedded manifest") - } else { - return data, "file:" + p - } - // manifest_path was set but unusable — skip URL/cache and use embedded. - return embeddedManifest, "embedded" + return m.resolveLocalPath(p) } + return m.resolveURL() +} + +// resolveLocalPath reads and validates a manifest from a local file path. +// No fallback — returns an error if the file is missing or invalid. +func (m *ManifestLoader) resolveLocalPath(p string) ([]byte, string, error) { + data, err := os.ReadFile(p) + if err != nil { + return nil, "", fmt.Errorf("manifest_path %s: %w", p, err) + } + if err = m.validateManifest(data); err != nil { + return nil, "", fmt.Errorf("manifest_path %s validation failed: %w", p, err) + } + return data, "file:" + p, nil +} + +// resolveURL tries to load the manifest from the configured URL, falling back +// to the disk cache. For the default URL it also falls back to the embedded +// manifest. For a custom URL it returns an error if all sources fail so the +// operator knows their configuration is broken. +func (m *ManifestLoader) resolveURL() ([]byte, string, error) { + u := m.cfg.DockerSwarm.ManifestURL - // 2. Remote URL. - u := m.manifestURL() data, err := m.fetchURL(context.Background(), u) if err != nil { m.logger.Warn().Err(err).Str("url", u).Msg("failed to fetch manifest from URL; trying disk cache") @@ -177,30 +223,24 @@ func (m *ManifestLoader) resolve() ([]byte, string) { m.logger.Warn().Err(err).Str("url", u).Msg("remote manifest validation failed; trying disk cache") } else { _ = m.writeCache(data) - return data, "url:" + u + return data, "url:" + u, nil } - // 3. Disk cache. if cached, err := os.ReadFile(m.cachePath); err == nil { if err = m.validateManifest(cached); err != nil { - m.logger.Warn().Err(err).Str("path", m.cachePath).Msg("cached manifest validation failed; falling back to embedded manifest") + m.logger.Warn().Err(err).Str("path", m.cachePath).Msg("cached manifest validation failed; skipping cache") } else { m.logger.Info().Str("path", m.cachePath).Msg("using cached manifest") - return cached, "cache:" + m.cachePath + return cached, "cache:" + m.cachePath, nil } } - // 4. Embedded binary manifest. - m.logger.Warn().Msg("using embedded manifest; consider checking connectivity to manifest URL") - return embeddedManifest, "embedded" -} - -// manifestURL returns the configured URL, or DefaultManifestURL if unset. -func (m *ManifestLoader) manifestURL() string { - if u := m.cfg.DockerSwarm.ManifestURL; u != "" { - return u + if m.embeddedFallback { + m.logger.Warn().Msg("using embedded manifest; consider checking connectivity to manifest URL") + return embeddedManifest, "embedded", nil } - return DefaultManifestURL + + return nil, "", fmt.Errorf("failed to load manifest from %s: URL and cache both unavailable", u) } // fetchURL fetches bytes from the given URL using the configured http.Client. @@ -223,11 +263,11 @@ func (m *ManifestLoader) fetchURL(ctx context.Context, url string) ([]byte, erro // writeCache persists data to the cache path, creating parent directories as // needed. Errors are logged but not returned (caching is best-effort). func (m *ManifestLoader) writeCache(data []byte) error { - if err := os.MkdirAll(filepath.Dir(m.cachePath), 0o755); err != nil { + if err := os.MkdirAll(filepath.Dir(m.cachePath), 0o700); err != nil { m.logger.Warn().Err(err).Str("path", m.cachePath).Msg("failed to create manifest cache directory") return err } - if err := os.WriteFile(m.cachePath, data, 0o644); err != nil { + if err := os.WriteFile(m.cachePath, data, 0o600); err != nil { m.logger.Warn().Err(err).Str("path", m.cachePath).Msg("failed to write manifest cache") return err } @@ -258,7 +298,7 @@ func (m *ManifestLoader) refreshLoop(ctx context.Context) { // disk cache, and swaps the in-memory versions atomically. Failures are // logged but do not disturb the current in-memory versions. func (m *ManifestLoader) refresh(ctx context.Context) { - u := m.manifestURL() + u := m.cfg.DockerSwarm.ManifestURL data, err := m.fetchURL(ctx, u) if err != nil { m.logger.Warn().Err(err).Str("url", u).Msg("manifest refresh: fetch failed; keeping current versions") @@ -284,9 +324,8 @@ func (m *ManifestLoader) refresh(ctx context.Context) { // validateManifest checks that data is valid JSON, has a supported // schema_version, and can be fully parsed into *Versions/*ServiceVersions. -// Performing a full parse here means resolve() only returns data that will -// succeed in parseManifestData, so load()'s panic is truly unreachable except -// for a corrupt embedded manifest. +// Performing a full parse here ensures resolve() only returns data that will +// succeed in parseManifestData. func (m *ManifestLoader) validateManifest(data []byte) error { var mf versionManifest if err := json.Unmarshal(data, &mf); err != nil { @@ -324,17 +363,9 @@ func (m *ManifestLoader) parseManifestData(data []byte) (*Versions, *ServiceVers // buildVersions parses the images.postgres section of the manifest and // returns a *Versions equivalent to NewVersions(cfg) for the same data. func buildVersions(cfg config.Config, mf *versionManifest) (*Versions, error) { - raw, ok := mf.Images["postgres"] - if !ok { - return nil, fmt.Errorf("manifest missing images.postgres") - } - - var entries []manifestPostgresEntry - if err := json.Unmarshal(raw, &entries); err != nil { - return nil, fmt.Errorf("unmarshal images.postgres: %w", err) - } + entries := mf.Images.Postgres if len(entries) == 0 { - return nil, fmt.Errorf("images.postgres is empty") + return nil, fmt.Errorf("manifest missing images.postgres") } versions := &Versions{ @@ -375,16 +406,17 @@ func buildServiceVersions(cfg config.Config, mf *versionManifest) (*ServiceVersi images: make(map[string]map[string]*ServiceImage), } - for svcType, raw := range mf.Images { - if svcType == "postgres" { - continue - } - var entries []manifestServiceEntry - if err := json.Unmarshal(raw, &entries); err != nil { - return nil, fmt.Errorf("unmarshal images.%s: %w", svcType, err) - } - for _, e := range entries { - sv.addServiceImage(svcType, e.Version, &ServiceImage{ + type svcSection struct { + name string + entries []manifestServiceEntry + } + for _, s := range []svcSection{ + {"postgrest", mf.Images.PostgREST}, + {"mcp", mf.Images.MCP}, + {"rag", mf.Images.RAG}, + } { + for _, e := range s.entries { + sv.addServiceImage(s.name, e.Version, &ServiceImage{ Tag: serviceImageTag(cfg, e.Image), }) } diff --git a/server/internal/orchestrator/swarm/manifest_loader_test.go b/server/internal/orchestrator/swarm/manifest_loader_test.go index fbe70294..ad23a249 100644 --- a/server/internal/orchestrator/swarm/manifest_loader_test.go +++ b/server/internal/orchestrator/swarm/manifest_loader_test.go @@ -11,9 +11,8 @@ import ( "testing" "time" - "github.com/rs/zerolog" - "github.com/pgEdge/control-plane/server/internal/config" + "github.com/pgEdge/control-plane/server/internal/testutils" ) // testCfg returns a config with an isolated cache directory in t.TempDir(). @@ -31,8 +30,6 @@ func testCfg(t *testing.T, extra ...func(*config.DockerSwarm)) (config.Config, s return cfg, filepath.Join(cacheDir, "manifest-cache.json") } -func nopLogger() zerolog.Logger { return zerolog.Nop() } - // validManifest returns a well-formed manifest JSON matching the embedded one. func validManifest(t *testing.T) []byte { t.Helper() @@ -79,15 +76,16 @@ func TestManifestLoader_LoadFromEmbedded(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - loader := NewManifestLoader(ctx, cfg, nopLogger(), + loader, err := NewManifestLoader(ctx, cfg, testutils.LoggerFactory(t), withCachePath(cachePath), withHTTPClient(srv.Client()), + withEmbeddedFallback(), ) + if err != nil { + t.Fatalf("NewManifestLoader: %v", err) + } v := loader.Versions() - if v == nil { - t.Fatal("expected non-nil Versions from embedded manifest") - } if v.Default() == nil { t.Fatal("expected non-nil default version from embedded manifest") } @@ -96,8 +94,8 @@ func TestManifestLoader_LoadFromEmbedded(t *testing.T) { } sv := loader.ServiceVersions() - if sv == nil { - t.Fatal("expected non-nil ServiceVersions from embedded manifest") + if _, err := sv.SupportedServiceVersions("mcp"); err != nil { + t.Fatalf("expected ServiceVersions to be populated from embedded manifest: %v", err) } } @@ -116,15 +114,15 @@ func TestManifestLoader_LoadFromURL(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - loader := NewManifestLoader(ctx, cfg, nopLogger(), + loader, err := NewManifestLoader(ctx, cfg, testutils.LoggerFactory(t), withCachePath(cachePath), withHTTPClient(srv.Client()), ) + if err != nil { + t.Fatalf("NewManifestLoader: %v", err) + } v := loader.Versions() - if v == nil { - t.Fatal("expected non-nil Versions") - } def := v.Default() if def == nil { t.Fatal("expected non-nil default version") @@ -146,10 +144,10 @@ func TestManifestLoader_LoadFromCache(t *testing.T) { manifest := validManifest(t) // Pre-populate the cache. - if err := os.MkdirAll(filepath.Dir(cachePath), 0o755); err != nil { + if err := os.MkdirAll(filepath.Dir(cachePath), 0o700); err != nil { t.Fatal(err) } - if err := os.WriteFile(cachePath, manifest, 0o644); err != nil { + if err := os.WriteFile(cachePath, manifest, 0o600); err != nil { t.Fatal(err) } @@ -163,20 +161,45 @@ func TestManifestLoader_LoadFromCache(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - loader := NewManifestLoader(ctx, cfg, nopLogger(), + loader, err := NewManifestLoader(ctx, cfg, testutils.LoggerFactory(t), withCachePath(cachePath), withHTTPClient(srv.Client()), ) + if err != nil { + t.Fatalf("NewManifestLoader: %v", err) + } v := loader.Versions() - if v == nil { - t.Fatal("expected Versions from cache") - } if v.Default() == nil { t.Fatal("expected non-nil default version from cache") } } +// TestManifestLoader_CustomURLNoFallbackToEmbedded verifies that a custom +// manifest_url that fails does NOT fall back to the embedded manifest and +// instead returns an error. +func TestManifestLoader_CustomURLNoFallbackToEmbedded(t *testing.T) { + cfg, cachePath := testCfg(t) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer srv.Close() + // Use a non-default URL to trigger the custom-URL chain. + cfg.DockerSwarm.ManifestURL = srv.URL + "/custom" + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, err := NewManifestLoader(ctx, cfg, testutils.LoggerFactory(t), + withCachePath(cachePath), + withHTTPClient(srv.Client()), + ) + if err == nil { + t.Fatal("expected error for custom URL with no cache and failing server") + } +} + // TestManifestLoader_LoadFromManifestPath verifies the local file override. func TestManifestLoader_LoadFromManifestPath(t *testing.T) { manifest := validManifest(t) @@ -193,21 +216,21 @@ func TestManifestLoader_LoadFromManifestPath(t *testing.T) { }, } - loader := NewManifestLoader(context.Background(), cfg, nopLogger(), + loader, err := NewManifestLoader(context.Background(), cfg, testutils.LoggerFactory(t), withCachePath(cachePath), ) + if err != nil { + t.Fatalf("NewManifestLoader: %v", err) + } v := loader.Versions() - if v == nil { - t.Fatal("expected Versions from manifest_path") - } if v.Default().PostgresVersion.String() != "17.10" { t.Errorf("default version = %s, want 17.10", v.Default().PostgresVersion) } } -// TestManifestLoader_ManifestPathMissing verifies fallback to embedded when -// manifest_path points to a non-existent file. +// TestManifestLoader_ManifestPathMissing verifies that a missing manifest_path +// returns an error (no fallback to embedded). func TestManifestLoader_ManifestPathMissing(t *testing.T) { _, cachePath := testCfg(t) cfg := config.Config{ @@ -217,18 +240,17 @@ func TestManifestLoader_ManifestPathMissing(t *testing.T) { }, } - loader := NewManifestLoader(context.Background(), cfg, nopLogger(), + _, err := NewManifestLoader(context.Background(), cfg, testutils.LoggerFactory(t), withCachePath(cachePath), ) - - // Should have fallen back to embedded. - if loader.Versions() == nil { - t.Fatal("expected non-nil Versions after fallback to embedded") + if err == nil { + t.Fatal("expected error when manifest_path points to a non-existent file") } } // TestManifestLoader_InvalidSchemaVersion verifies that a manifest with an -// unsupported schema_version causes URL/cache to be skipped and falls back. +// unsupported schema_version causes URL/cache to be skipped and falls back to +// the embedded manifest (default URL chain). func TestManifestLoader_InvalidSchemaVersion(t *testing.T) { cfg, cachePath := testCfg(t) badManifest := []byte(`{"schema_version":99,"images":{}}`) @@ -242,13 +264,17 @@ func TestManifestLoader_InvalidSchemaVersion(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - loader := NewManifestLoader(ctx, cfg, nopLogger(), + loader, err := NewManifestLoader(ctx, cfg, testutils.LoggerFactory(t), withCachePath(cachePath), withHTTPClient(srv.Client()), + withEmbeddedFallback(), ) + if err != nil { + t.Fatalf("NewManifestLoader: %v", err) + } // Falls back to embedded — should still work. - if loader.Versions() == nil { + if loader.Versions().Default() == nil { t.Fatal("expected fallback to embedded on invalid schema_version") } } @@ -266,12 +292,16 @@ func TestManifestLoader_MalformedJSON(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - loader := NewManifestLoader(ctx, cfg, nopLogger(), + loader, err := NewManifestLoader(ctx, cfg, testutils.LoggerFactory(t), withCachePath(cachePath), withHTTPClient(srv.Client()), + withEmbeddedFallback(), ) + if err != nil { + t.Fatalf("NewManifestLoader: %v", err) + } - if loader.Versions() == nil { + if loader.Versions().Default() == nil { t.Fatal("expected fallback to embedded on malformed JSON") } } @@ -327,11 +357,14 @@ func TestManifestLoader_NoRefreshWhenManifestPathSet(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - loader := NewManifestLoader(ctx, cfg, nopLogger(), + loader, err := NewManifestLoader(ctx, cfg, testutils.LoggerFactory(t), withCachePath(cachePath), withHTTPClient(srv.Client()), withTickerC(immediateTick), ) + if err != nil { + t.Fatalf("NewManifestLoader: %v", err) + } // Allow enough time for the goroutine (if it existed) to process the tick // and complete an HTTP round-trip to localhost. @@ -379,10 +412,13 @@ func TestManifestLoader_RefreshSuccess(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - loader := NewManifestLoader(ctx, cfg, nopLogger(), + loader, err := NewManifestLoader(ctx, cfg, testutils.LoggerFactory(t), withCachePath(cachePath), withHTTPClient(srv.Client()), ) + if err != nil { + t.Fatalf("NewManifestLoader: %v", err) + } if loader.Versions().Default().PostgresVersion.String() != "17.10" { t.Fatalf("expected initial default 17.10, got %s", loader.Versions().Default().PostgresVersion) @@ -412,10 +448,13 @@ func TestManifestLoader_RefreshFailure(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - loader := NewManifestLoader(ctx, cfg, nopLogger(), + loader, err := NewManifestLoader(ctx, cfg, testutils.LoggerFactory(t), withCachePath(cachePath), withHTTPClient(srv.Client()), ) + if err != nil { + t.Fatalf("NewManifestLoader: %v", err) + } origDefault := loader.Versions().Default().PostgresVersion.String() @@ -542,13 +581,16 @@ func TestManifestLoader_RealURL(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - loader := NewManifestLoader(ctx, cfg, zerolog.New(os.Stderr).With().Timestamp().Logger(), + loader, err := NewManifestLoader(ctx, cfg, testutils.LoggerFactory(t), withCachePath(cachePath), ) + if err != nil { + t.Fatalf("NewManifestLoader: %v", err) + } v := loader.Versions() - if v == nil { - t.Fatal("expected non-nil Versions from real URL") + if v.Default() == nil { + t.Fatal("expected non-nil default version from real URL") } t.Logf("loaded %d supported versions; default=%s", len(v.Supported()), v.Default().PostgresVersion) @@ -567,7 +609,7 @@ func TestManifestLoader_RealURL(t *testing.T) { // TestValidateManifest covers schema_version and JSON validation. func TestValidateManifest(t *testing.T) { - m := &ManifestLoader{logger: nopLogger()} + m := &ManifestLoader{logger: testutils.Logger(t)} if err := m.validateManifest(embeddedManifest); err != nil { t.Errorf("embedded manifest should be valid: %v", err) @@ -597,10 +639,14 @@ func TestManifestLoader_ImageTagsHaveRegistryPrefix(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - loader := NewManifestLoader(ctx, cfg, nopLogger(), + loader, err := NewManifestLoader(ctx, cfg, testutils.LoggerFactory(t), withCachePath(cachePath), withHTTPClient(srv.Client()), + withEmbeddedFallback(), ) + if err != nil { + t.Fatalf("NewManifestLoader: %v", err) + } for _, pv := range loader.Versions().Supported() { imgs, err := loader.Versions().GetImages(pv) diff --git a/server/internal/orchestrator/swarm/service_images.go b/server/internal/orchestrator/swarm/service_images.go index 4826dbab..82fb3adc 100644 --- a/server/internal/orchestrator/swarm/service_images.go +++ b/server/internal/orchestrator/swarm/service_images.go @@ -94,7 +94,7 @@ func (sv *ServiceVersions) addServiceImage(serviceType string, version string, i } // GetServiceImage returns the full ServiceImage for the given service type and version. -func (sv *ServiceVersions) GetServiceImage(serviceType string, version string) (*ServiceImage, error) { +func (sv ServiceVersions) GetServiceImage(serviceType string, version string) (*ServiceImage, error) { versionMap, ok := sv.images[serviceType] if !ok { return nil, fmt.Errorf("unsupported service type %q", serviceType) @@ -108,7 +108,7 @@ func (sv *ServiceVersions) GetServiceImage(serviceType string, version string) ( return image, nil } -func (sv *ServiceVersions) SupportedServiceVersions(serviceType string) ([]string, error) { +func (sv ServiceVersions) SupportedServiceVersions(serviceType string) ([]string, error) { versionMap, ok := sv.images[serviceType] if !ok { return nil, fmt.Errorf("unsupported service type %q", serviceType) From a1880507669b70156ebc2a458b6628e997412387 Mon Sep 17 00:00:00 2001 From: Siva Date: Tue, 2 Jun 2026 21:29:00 +0530 Subject: [PATCH 6/9] addressing review comments --- .../orchestrator/swarm/manifest_loader.go | 14 ++++++------- .../swarm/manifest_loader_test.go | 20 +++++++++++++++++++ 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/server/internal/orchestrator/swarm/manifest_loader.go b/server/internal/orchestrator/swarm/manifest_loader.go index 23083032..238a1dab 100644 --- a/server/internal/orchestrator/swarm/manifest_loader.go +++ b/server/internal/orchestrator/swarm/manifest_loader.go @@ -140,7 +140,7 @@ func NewManifestLoader(ctx context.Context, cfg config.Config, loggerFactory *lo o(m) } - if err := m.load(); err != nil { + if err := m.load(ctx); err != nil { return nil, err } @@ -166,8 +166,8 @@ func (m *ManifestLoader) ServiceVersions() ServiceVersions { } // load performs the initial synchronous load. -func (m *ManifestLoader) load() error { - data, src, err := m.resolve() +func (m *ManifestLoader) load(ctx context.Context) error { + data, src, err := m.resolve(ctx) if err != nil { return err } @@ -189,11 +189,11 @@ func (m *ManifestLoader) load() error { // resolve selects the appropriate resolution chain and returns raw manifest // bytes plus a human-readable source label. -func (m *ManifestLoader) resolve() ([]byte, string, error) { +func (m *ManifestLoader) resolve(ctx context.Context) ([]byte, string, error) { if p := m.cfg.DockerSwarm.ManifestPath; p != "" { return m.resolveLocalPath(p) } - return m.resolveURL() + return m.resolveURL(ctx) } // resolveLocalPath reads and validates a manifest from a local file path. @@ -213,10 +213,10 @@ func (m *ManifestLoader) resolveLocalPath(p string) ([]byte, string, error) { // to the disk cache. For the default URL it also falls back to the embedded // manifest. For a custom URL it returns an error if all sources fail so the // operator knows their configuration is broken. -func (m *ManifestLoader) resolveURL() ([]byte, string, error) { +func (m *ManifestLoader) resolveURL(ctx context.Context) ([]byte, string, error) { u := m.cfg.DockerSwarm.ManifestURL - data, err := m.fetchURL(context.Background(), u) + data, err := m.fetchURL(ctx, u) if err != nil { m.logger.Warn().Err(err).Str("url", u).Msg("failed to fetch manifest from URL; trying disk cache") } else if err = m.validateManifest(data); err != nil { diff --git a/server/internal/orchestrator/swarm/manifest_loader_test.go b/server/internal/orchestrator/swarm/manifest_loader_test.go index ad23a249..36260f03 100644 --- a/server/internal/orchestrator/swarm/manifest_loader_test.go +++ b/server/internal/orchestrator/swarm/manifest_loader_test.go @@ -607,6 +607,26 @@ func TestManifestLoader_RealURL(t *testing.T) { t.Logf("refresh OK; default still %s", loader.Versions().Default().PostgresVersion) } +// TestEmbeddedManifestValid fully parses the version-manifest.json embedded in +// the binary. A failure here means NewManifestLoader would panic at startup — +// catching it in CI is much better than catching it in production. +func TestEmbeddedManifestValid(t *testing.T) { + m := &ManifestLoader{logger: testutils.Logger(t)} + v, sv, err := m.parseManifestData(embeddedManifest) + if err != nil { + t.Fatalf("embedded manifest cannot be parsed: %v", err) + } + if v.Default() == nil { + t.Fatal("embedded manifest has no default version") + } + if len(v.Supported()) == 0 { + t.Fatal("embedded manifest has no supported versions") + } + if _, err := sv.SupportedServiceVersions("mcp"); err != nil { + t.Errorf("embedded manifest missing mcp service versions: %v", err) + } +} + // TestValidateManifest covers schema_version and JSON validation. func TestValidateManifest(t *testing.T) { m := &ManifestLoader{logger: testutils.Logger(t)} From 6ecb9bf0bee630633eb1100fd41b4a7c74f4ddd6 Mon Sep 17 00:00:00 2001 From: Siva Date: Wed, 3 Jun 2026 11:50:51 +0530 Subject: [PATCH 7/9] addressing review comments --- Makefile | 1 + NOTICE.txt | 3 +- NOTICE.txt.tmpl | 37 +++++++++++++++++++ server/internal/database/orchestrator.go | 5 +++ server/internal/database/service.go | 6 +++ .../orchestrator/swarm/orchestrator.go | 18 +++++++++ .../orchestrator/systemd/orchestrator.go | 4 ++ 7 files changed, 72 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 9fd51b14..c6706ff0 100644 --- a/Makefile +++ b/Makefile @@ -155,6 +155,7 @@ licenses: GOOS=linux $(go-licenses) report ./... \ --ignore github.com/pgEdge/control-plane \ --ignore github.com/eclipse/paho.golang \ + --ignore dario.cat/mergo \ --template=NOTICE.txt.tmpl > NOTICE.txt .PHONY: licenses-ci diff --git a/NOTICE.txt b/NOTICE.txt index 7d0d6c6d..e135839b 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -28,7 +28,6 @@ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ``` - ## dario.cat/mergo * Name: dario.cat/mergo @@ -64,9 +63,9 @@ DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - ``` + ## github.com/alessio/shellescape * Name: github.com/alessio/shellescape diff --git a/NOTICE.txt.tmpl b/NOTICE.txt.tmpl index e75ee4c9..73f523dc 100644 --- a/NOTICE.txt.tmpl +++ b/NOTICE.txt.tmpl @@ -28,6 +28,43 @@ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ``` +## dario.cat/mergo + +* Name: dario.cat/mergo +* Version: v1.0.0 +* License: [BSD-3-Clause](https://github.com/imdario/mergo/blob/v1.0.0/LICENSE) + +``` +Copyright (c) 2013 Dario Castañé. All rights reserved. +Copyright (c) 2012 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +``` + {{ range . }} ## {{ .Name }} diff --git a/server/internal/database/orchestrator.go b/server/internal/database/orchestrator.go index 8f42a569..e101bbb2 100644 --- a/server/internal/database/orchestrator.go +++ b/server/internal/database/orchestrator.go @@ -181,4 +181,9 @@ type Orchestrator interface { StartInstance(ctx context.Context, instanceID string) error NodeDSN(ctx context.Context, rc *resource.Context, nodeName string, fromInstanceID string, dbName string) (*postgres.DSN, error) InstancePaths(pgVersion *ds.Version, instanceID string) (InstancePaths, error) + // ReconcileInstanceSpec is called during spec reconciliation to allow the + // orchestrator to update computed fields (e.g. resolved image) on the new + // spec before it is persisted. old is nil when the instance is being created + // for the first time. + ReconcileInstanceSpec(old, new *InstanceSpec) error } diff --git a/server/internal/database/service.go b/server/internal/database/service.go index d1bc1590..b2287d4c 100644 --- a/server/internal/database/service.go +++ b/server/internal/database/service.go @@ -669,7 +669,13 @@ func (s *Service) ReconcileInstanceSpec(ctx context.Context, spec *InstanceSpec) case err == nil: previous = stored.Spec spec.CopySettingsFrom(previous) + if err := s.orchestrator.ReconcileInstanceSpec(previous, spec); err != nil { + return nil, fmt.Errorf("failed to reconcile instance spec: %w", err) + } case errors.Is(err, storage.ErrNotFound): + if err := s.orchestrator.ReconcileInstanceSpec(nil, spec); err != nil { + return nil, fmt.Errorf("failed to reconcile instance spec: %w", err) + } stored = &StoredInstanceSpec{} default: return nil, fmt.Errorf("failed to get current spec for instance '%s': %w", spec.InstanceID, err) diff --git a/server/internal/orchestrator/swarm/orchestrator.go b/server/internal/orchestrator/swarm/orchestrator.go index 742c520b..ed032411 100644 --- a/server/internal/orchestrator/swarm/orchestrator.go +++ b/server/internal/orchestrator/swarm/orchestrator.go @@ -212,6 +212,24 @@ func (o *Orchestrator) resolveInstanceImages(spec *database.InstanceSpec) (*Imag } } +// ReconcileInstanceSpec resolves the container image for the new spec and +// clears a stale ResolvedImage if PgEdgeVersion changed since the last +// reconciliation. +func (o *Orchestrator) ReconcileInstanceSpec(old, new *database.InstanceSpec) error { + // If the Postgres version changed, the previously resolved image no longer + // matches — clear it so resolveInstanceImages fetches the correct one from + // the manifest. + if old != nil && old.PgEdgeVersion != nil && new.PgEdgeVersion != nil { + if !old.PgEdgeVersion.Equals(new.PgEdgeVersion) { + if new.OrchestratorOpts != nil && new.OrchestratorOpts.Swarm != nil { + new.OrchestratorOpts.Swarm.ResolvedImage = "" + } + } + } + _, err := o.resolveInstanceImages(new) + return err +} + func (o *Orchestrator) instanceResources(spec *database.InstanceSpec, scripts database.Scripts) (*database.InstanceResource, []resource.Resource, []resource.Resource, error) { images, err := o.resolveInstanceImages(spec) if err != nil { diff --git a/server/internal/orchestrator/systemd/orchestrator.go b/server/internal/orchestrator/systemd/orchestrator.go index 48abb8e4..3141af6e 100644 --- a/server/internal/orchestrator/systemd/orchestrator.go +++ b/server/internal/orchestrator/systemd/orchestrator.go @@ -147,6 +147,10 @@ func (o *Orchestrator) PopulateHostStatus(ctx context.Context, h *host.HostStatu return nil } +func (o *Orchestrator) ReconcileInstanceSpec(_, _ *database.InstanceSpec) error { + return nil +} + func (o *Orchestrator) GenerateInstanceResources(spec *database.InstanceSpec, scripts database.Scripts) (*database.InstanceResources, error) { paths, err := o.InstancePaths(spec.PgEdgeVersion.PostgresVersion, spec.InstanceID) if err != nil { From b49d1cd4326554f4d50eab5f8f37340a887844fc Mon Sep 17 00:00:00 2001 From: Siva Date: Thu, 4 Jun 2026 12:11:52 +0530 Subject: [PATCH 8/9] reverting notice changes --- Makefile | 1 - NOTICE.txt | 3 ++- NOTICE.txt.tmpl | 37 ------------------------------------- 3 files changed, 2 insertions(+), 39 deletions(-) diff --git a/Makefile b/Makefile index c6706ff0..9fd51b14 100644 --- a/Makefile +++ b/Makefile @@ -155,7 +155,6 @@ licenses: GOOS=linux $(go-licenses) report ./... \ --ignore github.com/pgEdge/control-plane \ --ignore github.com/eclipse/paho.golang \ - --ignore dario.cat/mergo \ --template=NOTICE.txt.tmpl > NOTICE.txt .PHONY: licenses-ci diff --git a/NOTICE.txt b/NOTICE.txt index e135839b..7d0d6c6d 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -28,6 +28,7 @@ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ``` + ## dario.cat/mergo * Name: dario.cat/mergo @@ -63,8 +64,8 @@ DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -``` +``` ## github.com/alessio/shellescape diff --git a/NOTICE.txt.tmpl b/NOTICE.txt.tmpl index 73f523dc..e75ee4c9 100644 --- a/NOTICE.txt.tmpl +++ b/NOTICE.txt.tmpl @@ -28,43 +28,6 @@ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ``` -## dario.cat/mergo - -* Name: dario.cat/mergo -* Version: v1.0.0 -* License: [BSD-3-Clause](https://github.com/imdario/mergo/blob/v1.0.0/LICENSE) - -``` -Copyright (c) 2013 Dario Castañé. All rights reserved. -Copyright (c) 2012 The Go Authors. All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are -met: - - * Redistributions of source code must retain the above copyright -notice, this list of conditions and the following disclaimer. - * Redistributions in binary form must reproduce the above -copyright notice, this list of conditions and the following disclaimer -in the documentation and/or other materials provided with the -distribution. - * Neither the name of Google Inc. nor the names of its -contributors may be used to endorse or promote products derived from -this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -``` - {{ range . }} ## {{ .Name }} From 12a66b8cbdb2ba37a5f237f90b9a07f69a2e54c6 Mon Sep 17 00:00:00 2001 From: Siva Date: Tue, 30 Jun 2026 16:35:48 +0530 Subject: [PATCH 9/9] updating version-manifest.json with latest spock versions --- server/internal/orchestrator/swarm/version-manifest.json | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/internal/orchestrator/swarm/version-manifest.json b/server/internal/orchestrator/swarm/version-manifest.json index e0ab9524..2ac84227 100644 --- a/server/internal/orchestrator/swarm/version-manifest.json +++ b/server/internal/orchestrator/swarm/version-manifest.json @@ -29,7 +29,7 @@ { "postgres_version": "16.14", "spock_version": "5", - "image": "pgedge-postgres:16.14-spock5.0.8-standard-1", + "image": "pgedge-postgres:16.14-spock5.0.9-standard-1", "stability": "stable" }, { @@ -59,7 +59,7 @@ { "postgres_version": "17.10", "spock_version": "5", - "image": "pgedge-postgres:17.10-spock5.0.8-standard-1", + "image": "pgedge-postgres:17.10-spock5.0.9-standard-1", "stability": "stable" }, { @@ -89,7 +89,7 @@ { "postgres_version": "18.4", "spock_version": "5", - "image": "pgedge-postgres:18.4-spock5.0.8-standard-1", + "image": "pgedge-postgres:18.4-spock5.0.9-standard-1", "stability": "stable", "default": true }