diff --git a/.gitignore b/.gitignore index 0e0965836..f2471a36d 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,5 @@ postgres/pgsql-test/output/ *.tsbuildinfo .env .env.local +graphql/server/logs/ +graphql/server/*.heapsnapshot diff --git a/graphql/server/README.md b/graphql/server/README.md index 5a39463e5..e0c43b2d0 100644 --- a/graphql/server/README.md +++ b/graphql/server/README.md @@ -66,6 +66,28 @@ Runs an Express server that wires CORS, uploads, domain parsing, auth, and PostG - File uploads via `graphql-upload` - GraphiQL and health check endpoints - Schema cache flush via `/flush` or database notifications +- Opt-in observability for memory, DB activity, and Graphile build debugging + +## Observability + +`@constructive-io/graphql-server` includes an opt-in observability mode for local debugging. + +- Master switch: `GRAPHQL_OBSERVABILITY_ENABLED=true` +- Debug routes: `GET /debug/memory`, `GET /debug/db` +- Background sampler: periodic NDJSON snapshots under `graphql/server/logs/` +- CLI helpers: + - `pnpm debug:memory:analyze` + - `pnpm debug:heap:capture` + +Observability only activates when all of the following are true: + +- `GRAPHQL_OBSERVABILITY_ENABLED=true` +- `NODE_ENV=development` +- the server is bound to a loopback host such as `localhost`, `127.0.0.1`, or `::1` + +When those conditions are not met, the debug routes are not mounted and the sampler does not start. This keeps the default runtime surface minimal and prevents the observability layer from being exposed remotely. + +For the operational workflow, sampler output, and heap snapshot usage, see [docs/memory-debugging.md](./docs/memory-debugging.md). ## Routes @@ -74,6 +96,8 @@ Runs an Express server that wires CORS, uploads, domain parsing, auth, and PostG - `GET /graphql` / `POST /graphql` -> GraphQL endpoint - `POST /graphql` (multipart) -> file uploads - `POST /flush` -> clears cached Graphile schema for the current API +- `GET /debug/memory` -> memory/process/Graphile debug snapshot when observability is enabled +- `GET /debug/db` -> PostgreSQL activity/locks/pool debug snapshot when observability is enabled ## Meta API routing @@ -113,6 +137,10 @@ Configuration is merged from defaults, config files, and env vars via `@construc | `API_ANON_ROLE` | Anonymous role name | `administrator` | | `API_ROLE_NAME` | Authenticated role name | `administrator` | | `API_DEFAULT_DATABASE_ID` | Default database ID | `hard-coded` | +| `GRAPHQL_OBSERVABILITY_ENABLED` | Master switch for debug routes and sampler | `false` | +| `GRAPHQL_DEBUG_SAMPLER_ENABLED` | Enables periodic NDJSON sampling when observability is on | `true` | +| `GRAPHQL_DEBUG_SAMPLER_INTERVAL_MS` | Sampler interval in milliseconds | `10000` | +| `GRAPHQL_DEBUG_SAMPLER_DIR` | Override output directory for sampler logs | `graphql/server/logs` | ## Testing diff --git a/graphql/server/docs/memory-debugging.md b/graphql/server/docs/memory-debugging.md new file mode 100644 index 000000000..f52c0cbfc --- /dev/null +++ b/graphql/server/docs/memory-debugging.md @@ -0,0 +1,147 @@ +# Observability and Memory Debugging Runbook + +Use this runbook when reproducing Graphile/server memory growth locally or in staging, or when you need a quick operational view of Graphile build churn and PostgreSQL activity. + +## What observability enables + +Set `GRAPHQL_OBSERVABILITY_ENABLED=true` to turn on the graphql/server observability surfaces. + +When enabled: + +- `GET /debug/memory` returns a process snapshot with Node/V8 memory, Graphile cache state, in-flight builds, and Graphile build timings +- `GET /debug/db` returns PostgreSQL pool usage, active queries, blocked sessions, lock summary, and selected database stats +- the debug sampler writes periodic NDJSON snapshots for offline analysis + +Observability only activates when all of the following are true: + +- `GRAPHQL_OBSERVABILITY_ENABLED=true` +- `NODE_ENV=development` +- the GraphQL server is bound to a loopback host such as `localhost`, `127.0.0.1`, or `::1` + +When those conditions are not met: + +- `/debug/memory` returns `404` +- `/debug/db` returns `404` +- the sampler does not start +- no debug log directory is created + +This mode is intended for local debugging only. It is not a live operational surface. + +## Start the server with profiling enabled + +```bash +cd graphql/server +NODE_ENV=development \ +GRAPHQL_OBSERVABILITY_ENABLED=true \ +NODE_OPTIONS="--heapsnapshot-signal=SIGUSR2 --expose-gc" \ +GRAPHQL_DEBUG_SAMPLER_ENABLED=true \ +GRAPHQL_DEBUG_SAMPLER_INTERVAL_MS=10000 \ +pnpm dev +``` + +`GRAPHQL_OBSERVABILITY_ENABLED` is the master switch. When it is not `true`, the server does not mount `/debug/memory`, does not mount `/debug/db`, and does not start the sampler. +`NODE_ENV` must also be `development`, and the server host must be loopback-only. + +Optional knobs: + +- `GRAPHQL_DEBUG_SAMPLER_ENABLED=false` disables the sampler while leaving the debug routes available +- `GRAPHQL_DEBUG_SAMPLER_INTERVAL_MS=` changes the sampling interval +- `GRAPHQL_DEBUG_SAMPLER_DIR=/abs/path` writes the sampler output somewhere other than `graphql/server/logs` + +The debug sampler writes one run directory per server process under `graphql/server/logs/` by default. + +Expected files per sampler session: + +- `debug-memory.ndjson` +- `debug-db.ndjson` +- `debug-sampler-errors.ndjson` + +## Debug routes + +Use these routes for live inspection while a server is running: + +- `GET /debug/memory` + - process memory usage + - V8 heap statistics and heap spaces + - Graphile cache state + - in-flight handler creation count + - Graphile build timing aggregates + - PostGIS codec telemetry +- `GET /debug/db` + - PG pool totals, idle count, and waiters + - active queries and blocked sessions + - lock summary + - selected `pg_stat_database` counters + - `pg_notification_queue_usage()` + +## Analyze the latest sampler run + +```bash +cd graphql/server +pnpm debug:memory:analyze +``` + +To analyze a specific run directory: + +```bash +cd graphql/server +pnpm debug:memory:analyze -- --dir ./logs/run-2026-03-09T12-00-00-000Z-pid12345 +``` + +## Capture a heap snapshot + +```bash +cd graphql/server +pnpm debug:heap:capture -- --pid +``` + +If your server writes snapshots somewhere else, pass `--dir`. + +## Tooling reference + +- `pnpm debug:memory:analyze` + - reads the latest sampler directory by default + - summarizes heap/RSS range, Graphile build stats, DB waiters, and blocked sessions +- `pnpm debug:heap:capture -- --pid ` + - sends `SIGUSR2` to the server process + - requires `NODE_OPTIONS="--heapsnapshot-signal=SIGUSR2 --expose-gc"` + - prints the created `.heapsnapshot` path + +## Recommended incident workflow + +1. Start the server with `GRAPHQL_OBSERVABILITY_ENABLED=true` and `NODE_ENV=development` on a loopback host. +2. Reproduce the issue. +3. Inspect `/debug/memory` and `/debug/db` live if you need immediate feedback. +4. Run `pnpm debug:memory:analyze` against the generated logs. +5. If retained heap is still unclear, capture one or more heap snapshots. +6. Disable observability again when you are done. + +## How to read the snapshots + +Focus on a few high-signal sections first. + +- Memory and V8 + - `heapUsedBytes`, `rssBytes`, and the V8 heap space breakdown tell you whether pressure is in old space, new space, or large object space +- Graphile cache and builds + - `graphileCache` shows how many cached handlers are live + - `graphileBuilds` shows how often handlers are being rebuilt and how expensive the builds are +- PostgreSQL activity + - `pool.waitingCount`, `blockedActivity`, and `lockSummary` are the fastest indicators of DB contention + - `activeActivity` highlights long-running queries and transaction age +## What to watch for + +- `heapUsedMb.max` and `rssMb.max` relative to baseline +- last 6 sampler samples still trending upward in heap or RSS after load stops +- repeated Graphile builds with high `averageBuildMs` or `maxBuildMs` +- blocked DB sessions or `pool.waitingCount > 0` +- active queries with long `xact_age` or `query_age` + +## Current acceptance bar + +- no blocked DB sessions +- no PG pool waiters +- last 6 idle samples do not trend upward by more than 5% for heap or RSS + +## Operational note + +The observability routes and sampler are designed for engineering use on a local machine. Keep them disabled by default and do not treat them as a staging or production feature. diff --git a/graphql/server/package.json b/graphql/server/package.json index 63b64a71e..9b51a833e 100644 --- a/graphql/server/package.json +++ b/graphql/server/package.json @@ -26,6 +26,8 @@ "build:dev": "makage build --dev", "dev": "ts-node src/run.ts", "dev:watch": "nodemon --watch src --ext ts --exec ts-node src/run.ts", + "debug:memory:analyze": "node scripts/analyze-debug-logs.mjs", + "debug:heap:capture": "node scripts/capture-heap-snapshot.mjs", "lint": "eslint . --fix", "test": "jest --passWithNoTests", "test:watch": "jest --watch", diff --git a/graphql/server/scripts/analyze-debug-logs.mjs b/graphql/server/scripts/analyze-debug-logs.mjs new file mode 100644 index 000000000..64c27b8f0 --- /dev/null +++ b/graphql/server/scripts/analyze-debug-logs.mjs @@ -0,0 +1,267 @@ +#!/usr/bin/env node + +import fs from 'node:fs/promises'; +import path from 'node:path'; +import { fileURLToPath } from 'node:url'; + +const scriptDir = path.dirname(fileURLToPath(import.meta.url)); +const packageDir = path.resolve(scriptDir, '..'); +const args = process.argv.slice(2); + +const getArgValue = (flag, fallback = null) => { + const index = args.indexOf(flag); + if (index === -1 || index === args.length - 1) { + return fallback; + } + return args[index + 1]; +}; + +const requestedLogDir = path.resolve(getArgValue('--dir', path.join(packageDir, 'logs'))); +const jsonMode = args.includes('--json'); + +const pathExists = async (targetPath) => { + try { + await fs.access(targetPath); + return true; + } catch { + return false; + } +}; + +const resolveLogDir = async (baseDir) => { + const entries = await fs.readdir(baseDir, { withFileTypes: true }); + const candidates = await Promise.all( + entries + .filter((entry) => entry.isDirectory()) + .map(async (entry) => { + const candidateDir = path.join(baseDir, entry.name); + const hasMemory = await pathExists(path.join(candidateDir, 'debug-memory.ndjson')); + const hasDb = await pathExists(path.join(candidateDir, 'debug-db.ndjson')); + if (!hasMemory && !hasDb) { + return null; + } + + const stat = await fs.stat(candidateDir); + return { dir: candidateDir, mtimeMs: stat.mtimeMs }; + }), + ); + + const latest = candidates.filter(Boolean).sort((a, b) => b.mtimeMs - a.mtimeMs)[0]; + return latest?.dir ?? baseDir; +}; + +const readNdjson = async (filePath) => { + if (!(await pathExists(filePath))) { + return []; + } + + const content = await fs.readFile(filePath, 'utf8'); + return content + .split('\n') + .map((line) => line.trim()) + .filter(Boolean) + .map((line) => { try { return JSON.parse(line); } catch { return null; } }) + .filter(Boolean); +}; + +const getLastSessionEntries = (entries) => { + let lastStartIndex = -1; + for (let i = 0; i < entries.length; i += 1) { + if (entries[i]?.event === 'sampler_started') { + lastStartIndex = i; + } + } + + return lastStartIndex === -1 + ? entries.filter((entry) => entry?.timestamp) + : entries.slice(lastStartIndex + 1).filter((entry) => entry?.timestamp); +}; + +const first = (entries) => (entries.length > 0 ? entries[0] : null); +const last = (entries) => (entries.length > 0 ? entries[entries.length - 1] : null); +const maxBy = (entries, getter, fallback = 0) => + entries.reduce((max, entry) => Math.max(max, getter(entry) ?? fallback), fallback); +const toMB = (bytes) => Number((bytes / 1024 / 1024).toFixed(1)); +const percentDelta = (start, end) => { + if (!Number.isFinite(start) || start <= 0 || !Number.isFinite(end)) { + return null; + } + return Number((((end - start) / start) * 100).toFixed(1)); +}; + +const summarize = async () => { + const logDir = await resolveLogDir(requestedLogDir); + const memoryPath = path.join(logDir, 'debug-memory.ndjson'); + const dbPath = path.join(logDir, 'debug-db.ndjson'); + + const [memoryLines, dbLines] = await Promise.all([readNdjson(memoryPath), readNdjson(dbPath)]); + const memoryEntries = getLastSessionEntries(memoryLines).filter((entry) => entry.memory); + const dbEntries = getLastSessionEntries(dbLines).filter((entry) => entry.database !== undefined); + + if (memoryEntries.length === 0 && dbEntries.length === 0) { + throw new Error(`No debug snapshot entries found in ${logDir}`); + } + + const memoryStart = first(memoryEntries); + const memoryEnd = last(memoryEntries); + const dbStart = first(dbEntries); + const dbEnd = last(dbEntries); + const startedAt = new Date((memoryStart ?? dbStart).timestamp); + const endedAt = new Date((memoryEnd ?? dbEnd).timestamp); + const idleTail = memoryEntries.slice(-6); + const idleTailStart = first(idleTail); + const idleTailEnd = last(idleTail); + + const summary = { + logDir, + startedAt: startedAt.toISOString(), + endedAt: endedAt.toISOString(), + durationSeconds: Number(((endedAt.getTime() - startedAt.getTime()) / 1000).toFixed(1)), + samples: { + memory: memoryEntries.length, + db: dbEntries.length, + }, + memory: memoryEntries.length > 0 ? { + heapUsedMb: { + start: toMB(memoryStart.memory.heapUsedBytes), + max: toMB(maxBy(memoryEntries, (entry) => entry.memory.heapUsedBytes)), + end: toMB(memoryEnd.memory.heapUsedBytes), + delta: toMB(memoryEnd.memory.heapUsedBytes - memoryStart.memory.heapUsedBytes), + }, + rssMb: { + start: toMB(memoryStart.memory.rssBytes), + max: toMB(maxBy(memoryEntries, (entry) => entry.memory.rssBytes)), + end: toMB(memoryEnd.memory.rssBytes), + delta: toMB(memoryEnd.memory.rssBytes - memoryStart.memory.rssBytes), + }, + idleTail: idleTail.length > 1 ? { + samples: idleTail.length, + heapTrendPct: percentDelta(idleTailStart.memory.heapUsedBytes, idleTailEnd.memory.heapUsedBytes), + rssTrendPct: percentDelta(idleTailStart.memory.rssBytes, idleTailEnd.memory.rssBytes), + } : null, + graphileCache: { + maxSize: maxBy(memoryEntries, (entry) => entry.graphileCache?.size ?? 0), + endSize: memoryEnd.graphileCache?.size ?? 0, + endKeys: memoryEnd.graphileCache?.keys ?? [], + }, + svcCache: memoryEnd.svcCache ?? null, + inFlight: { + maxCount: maxBy(memoryEntries, (entry) => entry.inFlight?.count ?? 0), + endCount: memoryEnd.inFlight?.count ?? 0, + }, + graphileBuilds: { + started: memoryEnd.graphileBuilds?.started ?? 0, + succeeded: memoryEnd.graphileBuilds?.succeeded ?? 0, + failed: memoryEnd.graphileBuilds?.failed ?? 0, + maxBuildMs: memoryEnd.graphileBuilds?.maxMs ?? 0, + averageBuildMs: memoryEnd.graphileBuilds?.averageMs ?? 0, + }, + } : null, + database: dbEntries.length > 0 ? { + pool: { + maxTotalCount: maxBy(dbEntries, (entry) => entry.pool?.totalCount ?? 0), + maxIdleCount: maxBy(dbEntries, (entry) => entry.pool?.idleCount ?? 0), + maxWaitingCount: maxBy(dbEntries, (entry) => entry.pool?.waitingCount ?? 0), + }, + activity: { + maxActiveRows: maxBy(dbEntries, (entry) => entry.activeActivity?.length ?? 0), + maxBlockedRows: maxBy(dbEntries, (entry) => entry.blockedActivity?.length ?? 0), + blockedSamples: dbEntries.filter((entry) => (entry.blockedActivity?.length ?? 0) > 0).length, + }, + stats: { + maxNumBackends: maxBy(dbEntries, (entry) => Number(entry.databaseStats?.numbackends ?? 0)), + maxTempBytes: maxBy(dbEntries, (entry) => Number(entry.databaseStats?.temp_bytes ?? 0)), + maxDeadlocks: maxBy(dbEntries, (entry) => Number(entry.databaseStats?.deadlocks ?? 0)), + }, + notificationQueueUsage: { + max: maxBy(dbEntries, (entry) => Number(entry.notificationQueueUsage ?? 0)), + end: Number(dbEnd?.notificationQueueUsage ?? 0), + }, + } : null, + }; + + const verdicts = []; + if (summary.memory) { + if (summary.memory.heapUsedMb.max >= 1500 || summary.memory.rssMb.max >= 1800) { + verdicts.push('memory pressure stayed very high'); + } + if (summary.memory.idleTail?.heapTrendPct != null && summary.memory.idleTail.heapTrendPct > 5) { + verdicts.push('heap still trended upward during the last 6 samples'); + } + if (summary.memory.idleTail?.rssTrendPct != null && summary.memory.idleTail.rssTrendPct > 5) { + verdicts.push('rss still trended upward during the last 6 samples'); + } + if (summary.memory.graphileBuilds.failed === 0) { + verdicts.push('no graphile build failures'); + } + } + if (summary.database) { + if (summary.database.activity.blockedSamples === 0) { + verdicts.push('no blocked DB sessions captured'); + } + if (summary.database.pool.maxWaitingCount === 0) { + verdicts.push('pg pool never queued waiters'); + } + if (summary.database.notificationQueueUsage.max === 0) { + verdicts.push('Postgres notify queue stayed empty'); + } + } + + return { ...summary, verdicts }; +}; + +const main = async () => { + const summary = await summarize(); + if (jsonMode) { + console.log(JSON.stringify(summary, null, 2)); + return; + } + + console.log(`Log dir: ${summary.logDir}`); + console.log(`Window: ${summary.startedAt} -> ${summary.endedAt} (${summary.durationSeconds}s)`); + console.log(`Samples: memory=${summary.samples.memory}, db=${summary.samples.db}`); + + if (summary.memory) { + console.log( + `Heap MB: start=${summary.memory.heapUsedMb.start} max=${summary.memory.heapUsedMb.max} end=${summary.memory.heapUsedMb.end} delta=${summary.memory.heapUsedMb.delta}`, + ); + console.log( + `RSS MB: start=${summary.memory.rssMb.start} max=${summary.memory.rssMb.max} end=${summary.memory.rssMb.end} delta=${summary.memory.rssMb.delta}`, + ); + if (summary.memory.idleTail) { + console.log( + `Idle tail (last ${summary.memory.idleTail.samples}): heapTrend=${summary.memory.idleTail.heapTrendPct}% rssTrend=${summary.memory.idleTail.rssTrendPct}%`, + ); + } + console.log( + `Graphile cache: max=${summary.memory.graphileCache.maxSize} end=${summary.memory.graphileCache.endSize} keys=${summary.memory.graphileCache.endKeys.join(',') || 'none'}`, + ); + console.log( + `Graphile builds: started=${summary.memory.graphileBuilds.started} failed=${summary.memory.graphileBuilds.failed} maxBuildMs=${summary.memory.graphileBuilds.maxBuildMs} avgBuildMs=${summary.memory.graphileBuilds.averageBuildMs}`, + ); + } + + if (summary.database) { + console.log( + `DB pool: maxTotal=${summary.database.pool.maxTotalCount} maxIdle=${summary.database.pool.maxIdleCount} maxWaiting=${summary.database.pool.maxWaitingCount}`, + ); + console.log( + `DB activity: maxActiveRows=${summary.database.activity.maxActiveRows} maxBlockedRows=${summary.database.activity.maxBlockedRows} blockedSamples=${summary.database.activity.blockedSamples}`, + ); + console.log( + `DB stats: maxNumBackends=${summary.database.stats.maxNumBackends} maxTempBytes=${summary.database.stats.maxTempBytes} maxDeadlocks=${summary.database.stats.maxDeadlocks}`, + ); + console.log( + `Notify queue usage: end=${summary.database.notificationQueueUsage.end} max=${summary.database.notificationQueueUsage.max}`, + ); + } + + if (summary.verdicts.length > 0) { + console.log(`Verdicts: ${summary.verdicts.join('; ')}`); + } +}; + +main().catch((error) => { + console.error(error instanceof Error ? error.stack : String(error)); + process.exitCode = 1; +}); diff --git a/graphql/server/scripts/capture-heap-snapshot.mjs b/graphql/server/scripts/capture-heap-snapshot.mjs new file mode 100644 index 000000000..e1f404d85 --- /dev/null +++ b/graphql/server/scripts/capture-heap-snapshot.mjs @@ -0,0 +1,75 @@ +#!/usr/bin/env node + +import fs from 'node:fs/promises'; +import path from 'node:path'; +import { fileURLToPath } from 'node:url'; + +const args = process.argv.slice(2); +const scriptDir = path.dirname(fileURLToPath(import.meta.url)); +const packageDir = path.resolve(scriptDir, '..'); + +const getArgValue = (flag, fallback = null) => { + const index = args.indexOf(flag); + if (index === -1 || index === args.length - 1) { + return fallback; + } + return args[index + 1]; +}; + +const parseIntArg = (value, fallback) => { + const parsed = Number.parseInt(value ?? '', 10); + return Number.isFinite(parsed) ? parsed : fallback; +}; + +const pid = parseIntArg(getArgValue('--pid'), NaN); +if (!Number.isFinite(pid)) { + console.error('Usage: capture-heap-snapshot.mjs --pid [--dir ] [--timeout-ms ]'); + process.exit(1); +} + +const snapshotDir = path.resolve(getArgValue('--dir', packageDir)); +const timeoutMs = parseIntArg(getArgValue('--timeout-ms'), 30_000); +const pollIntervalMs = 500; + +const listHeapSnapshots = async () => { + try { + const entries = await fs.readdir(snapshotDir, { withFileTypes: true }); + return entries + .filter((entry) => entry.isFile() && entry.name.endsWith('.heapsnapshot')) + .map((entry) => entry.name) + .sort(); + } catch (error) { + if (error && typeof error === 'object' && 'code' in error && error.code === 'ENOENT') { + return []; + } + throw error; + } +}; + +const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms)); + +const main = async () => { + const before = new Set(await listHeapSnapshots()); + process.kill(pid, 'SIGUSR2'); + + const startedAt = Date.now(); + while (Date.now() - startedAt < timeoutMs) { + await sleep(pollIntervalMs); + const after = await listHeapSnapshots(); + const created = after.find((fileName) => !before.has(fileName)); + if (created) { + console.log(path.join(snapshotDir, created)); + return; + } + } + + throw new Error( + `Timed out waiting for a new heap snapshot in ${snapshotDir}. ` + + 'Start the server with NODE_OPTIONS="--heapsnapshot-signal=SIGUSR2 --expose-gc".', + ); +}; + +main().catch((error) => { + console.error(error instanceof Error ? error.message : String(error)); + process.exit(1); +}); diff --git a/graphql/server/src/diagnostics/__tests__/observability.test.ts b/graphql/server/src/diagnostics/__tests__/observability.test.ts new file mode 100644 index 000000000..507584a5c --- /dev/null +++ b/graphql/server/src/diagnostics/__tests__/observability.test.ts @@ -0,0 +1,47 @@ +import { + isDevelopmentObservabilityMode, + isGraphqlDebugSamplerEnabled, + isGraphqlObservabilityEnabled, + isLoopbackAddress, + isLoopbackHost, +} from '../observability'; + +describe('observability helpers', () => { + const originalEnv = { ...process.env }; + + afterEach(() => { + process.env = { ...originalEnv }; + }); + + it('recognizes loopback hosts and addresses', () => { + expect(isLoopbackHost('localhost')).toBe(true); + expect(isLoopbackHost('127.0.0.1:3000')).toBe(true); + expect(isLoopbackHost('[::1]:3000')).toBe(true); + expect(isLoopbackHost('0.0.0.0')).toBe(false); + + expect(isLoopbackAddress('127.0.0.1')).toBe(true); + expect(isLoopbackAddress('::ffff:127.0.0.1')).toBe(true); + expect(isLoopbackAddress('::1')).toBe(true); + expect(isLoopbackAddress('10.0.0.1')).toBe(false); + }); + + it('enables observability only in development on a loopback host', () => { + process.env.NODE_ENV = 'development'; + process.env.GRAPHQL_OBSERVABILITY_ENABLED = 'true'; + + expect(isDevelopmentObservabilityMode()).toBe(true); + expect(isGraphqlObservabilityEnabled('localhost')).toBe(true); + expect(isGraphqlDebugSamplerEnabled('localhost')).toBe(true); + expect(isGraphqlObservabilityEnabled('0.0.0.0')).toBe(false); + expect(isGraphqlDebugSamplerEnabled('0.0.0.0')).toBe(false); + }); + + it('disables observability outside development even when requested', () => { + process.env.NODE_ENV = 'production'; + process.env.GRAPHQL_OBSERVABILITY_ENABLED = 'true'; + + expect(isDevelopmentObservabilityMode()).toBe(false); + expect(isGraphqlObservabilityEnabled('localhost')).toBe(false); + expect(isGraphqlDebugSamplerEnabled('localhost')).toBe(false); + }); +}); diff --git a/graphql/server/src/diagnostics/debug-db-snapshot.ts b/graphql/server/src/diagnostics/debug-db-snapshot.ts new file mode 100644 index 000000000..218868ef1 --- /dev/null +++ b/graphql/server/src/diagnostics/debug-db-snapshot.ts @@ -0,0 +1,239 @@ +import type { ConstructiveOptions } from '@constructive-io/graphql-types'; +import { buildConnectionString, getPgPool } from 'pg-cache'; +import { getPgEnvOptions } from 'pg-env'; +import { Pool, type PoolClient } from 'pg'; + +const ACTIVE_ACTIVITY_SQL = ` + select + pid, + usename, + application_name, + state, + wait_event_type, + wait_event, + age(now(), xact_start) as xact_age, + age(now(), query_start) as query_age, + left(query, 500) as query + from pg_stat_activity + where datname = current_database() + and pid <> pg_backend_pid() + and ( + xact_start is not null + or wait_event_type is not null + or state <> 'idle' + ) + order by xact_start asc nulls last, query_start asc nulls last + limit 50 +`; + +const BLOCKED_ACTIVITY_SQL = ` + with blocked as ( + select + a.pid as blocked_pid, + a.usename as blocked_user, + a.application_name as blocked_application, + a.state as blocked_state, + a.wait_event_type, + a.wait_event, + age(now(), a.query_start) as blocked_for, + left(a.query, 500) as blocked_query, + pg_blocking_pids(a.pid) as blocker_pids + from pg_stat_activity a + where a.datname = current_database() + and cardinality(pg_blocking_pids(a.pid)) > 0 + ) + select + b.blocked_pid, + b.blocked_user, + b.blocked_application, + b.blocked_state, + b.wait_event_type, + b.wait_event, + b.blocked_for, + b.blocked_query, + blocker.pid as blocker_pid, + blocker.usename as blocker_user, + blocker.application_name as blocker_application, + blocker.state as blocker_state, + left(blocker.query, 500) as blocker_query + from blocked b + left join lateral unnest(b.blocker_pids) blocker_pid on true + left join pg_stat_activity blocker on blocker.pid = blocker_pid + order by b.blocked_for desc +`; + +const LOCK_SUMMARY_SQL = ` + select + locktype, + mode, + granted, + count(*)::int as count + from pg_locks + group by locktype, mode, granted + order by granted asc, count desc, locktype asc, mode asc +`; + +const DATABASE_STATS_SQL = ` + select + numbackends, + xact_commit, + xact_rollback, + blks_read, + blks_hit, + tup_returned, + tup_fetched, + tup_inserted, + tup_updated, + tup_deleted, + temp_files, + temp_bytes, + deadlocks, + checksum_failures, + stats_reset + from pg_stat_database + where datname = current_database() +`; + +const SETTINGS_SQL = ` + select + name, + setting, + unit + from pg_settings + where name = any(array[ + 'max_connections', + 'shared_buffers', + 'work_mem', + 'maintenance_work_mem', + 'effective_cache_size', + 'statement_timeout', + 'lock_timeout', + 'idle_in_transaction_session_timeout' + ]) + order by name asc +`; + +const NOTIFY_QUEUE_SQL = ` + select pg_notification_queue_usage() as queue_usage +`; + +const DIAGNOSTICS_STATEMENT_TIMEOUT_MS = 3_000; +const DIAGNOSTICS_LOCK_TIMEOUT_MS = 500; +const diagnosticsPools = new Map(); + +const buildDiagnosticsConnectionString = (opts: ConstructiveOptions): string => { + const pgConfig = getPgEnvOptions(opts.pg); + return buildConnectionString( + pgConfig.user, + pgConfig.password, + pgConfig.host, + pgConfig.port, + pgConfig.database, + ); +}; + +const getDiagnosticsPool = (opts: ConstructiveOptions): Pool => { + const connectionString = buildDiagnosticsConnectionString(opts); + const existing = diagnosticsPools.get(connectionString); + if (existing) { + return existing; + } + + const pool = new Pool({ + connectionString, + max: 1, + idleTimeoutMillis: 10_000, + connectionTimeoutMillis: 1_500, + allowExitOnIdle: true, + application_name: 'constructive-debug-snapshot', + }); + diagnosticsPools.set(connectionString, pool); + return pool; +}; + +const withDiagnosticsClient = async ( + opts: ConstructiveOptions, + fn: (client: PoolClient) => Promise, +): Promise => { + const diagnosticsPool = getDiagnosticsPool(opts); + const client = await diagnosticsPool.connect(); + + try { + await client.query('BEGIN'); + await client.query(`SET LOCAL statement_timeout = '${DIAGNOSTICS_STATEMENT_TIMEOUT_MS}ms'`); + await client.query(`SET LOCAL lock_timeout = '${DIAGNOSTICS_LOCK_TIMEOUT_MS}ms'`); + const result = await fn(client); + await client.query('COMMIT'); + return result; + } catch (error) { + try { + await client.query('ROLLBACK'); + } catch { + // Best-effort rollback for diagnostics-only transactions. + } + throw error; + } finally { + client.release(); + } +}; + +export const closeDebugDatabasePools = async (): Promise => { + const pools = [...diagnosticsPools.values()]; + diagnosticsPools.clear(); + await Promise.allSettled(pools.map((pool) => pool.end())); +}; + +export interface DebugDatabaseSnapshot { + database: string | null | undefined; + pool: { + max: number | null; + totalCount: number; + idleCount: number; + waitingCount: number; + }; + activeActivity: unknown[]; + blockedActivity: unknown[]; + lockSummary: unknown[]; + databaseStats: Record | null; + settings: unknown[]; + notificationQueueUsage: number | null; + timestamp: string; +} + +export const getDebugDatabaseSnapshot = async ( + opts: ConstructiveOptions, +): Promise => { + const appPool = getPgPool(opts.pg); + const { + activity, + blocked, + lockSummary, + databaseStats, + settings, + notifyQueue, + } = await withDiagnosticsClient(opts, async (client) => ({ + activity: await client.query(ACTIVE_ACTIVITY_SQL), + blocked: await client.query(BLOCKED_ACTIVITY_SQL), + lockSummary: await client.query(LOCK_SUMMARY_SQL), + databaseStats: await client.query(DATABASE_STATS_SQL), + settings: await client.query(SETTINGS_SQL), + notifyQueue: await client.query(NOTIFY_QUEUE_SQL), + })); + + return { + database: opts.pg?.database ?? null, + pool: { + max: (appPool as { options?: { max?: number } }).options?.max ?? null, + totalCount: appPool.totalCount, + idleCount: appPool.idleCount, + waitingCount: appPool.waitingCount, + }, + activeActivity: activity.rows, + blockedActivity: blocked.rows, + lockSummary: lockSummary.rows, + databaseStats: databaseStats.rows[0] ?? null, + settings: settings.rows, + notificationQueueUsage: (notifyQueue.rows[0]?.queue_usage as number | null | undefined) ?? null, + timestamp: new Date().toISOString(), + }; +}; diff --git a/graphql/server/src/diagnostics/debug-memory-snapshot.ts b/graphql/server/src/diagnostics/debug-memory-snapshot.ts new file mode 100644 index 000000000..b3491ee13 --- /dev/null +++ b/graphql/server/src/diagnostics/debug-memory-snapshot.ts @@ -0,0 +1,123 @@ +import os from 'node:os'; +import v8 from 'node:v8'; +import { svcCache, SVC_CACHE_TTL_MS } from '@pgpmjs/server-utils'; +import { getCacheStats } from 'graphile-cache'; +import { getInFlightCount, getInFlightKeys } from '../middleware/graphile'; +import { getGraphileBuildStats } from '../middleware/observability/graphile-build-stats'; + +const toMB = (bytes: number): string => `${(bytes / 1024 / 1024).toFixed(1)} MB`; + +export interface DebugMemorySnapshot { + pid: number; + nodeEnv: string | undefined; + memory: { + heapUsedBytes: number; + heapTotalBytes: number; + rssBytes: number; + externalBytes: number; + arrayBuffersBytes: number; + heapUsed: string; + heapTotal: string; + rss: string; + external: string; + arrayBuffers: string; + }; + cpuUsageMicros: NodeJS.CpuUsage; + resourceUsage: NodeJS.ResourceUsage; + system: { + loadAverage: number[]; + freeMemoryBytes: number; + totalMemoryBytes: number; + uptimeSeconds: number; + }; + v8: { + heapStatistics: ReturnType; + heapSpaces: Array<{ + spaceName: string; + spaceSizeBytes: number; + spaceUsedBytes: number; + spaceAvailableBytes: number; + physicalSpaceSizeBytes: number; + }>; + }; + graphileCache: ReturnType; + svcCache: { + size: number; + max: number; + ttlMs: number; + oldestKeyAgeMs: number | null; + keys: string[]; + }; + inFlight: { + count: number; + keys: string[]; + }; + graphileBuilds: ReturnType; + uptimeMinutes: number; + timestamp: string; +} + +export const getDebugMemorySnapshot = (): DebugMemorySnapshot => { + const mem = process.memoryUsage(); + const heapSpaces = v8.getHeapSpaceStatistics().map((space) => ({ + spaceName: space.space_name, + spaceSizeBytes: space.space_size, + spaceUsedBytes: space.space_used_size, + spaceAvailableBytes: space.space_available_size, + physicalSpaceSizeBytes: space.physical_space_size, + })); + + return { + pid: process.pid, + nodeEnv: process.env.NODE_ENV, + memory: { + heapUsedBytes: mem.heapUsed, + heapTotalBytes: mem.heapTotal, + rssBytes: mem.rss, + externalBytes: mem.external, + arrayBuffersBytes: mem.arrayBuffers, + heapUsed: toMB(mem.heapUsed), + heapTotal: toMB(mem.heapTotal), + rss: toMB(mem.rss), + external: toMB(mem.external), + arrayBuffers: toMB(mem.arrayBuffers), + }, + cpuUsageMicros: process.cpuUsage(), + resourceUsage: process.resourceUsage(), + system: { + loadAverage: os.loadavg(), + freeMemoryBytes: os.freemem(), + totalMemoryBytes: os.totalmem(), + uptimeSeconds: os.uptime(), + }, + v8: { + heapStatistics: v8.getHeapStatistics(), + heapSpaces, + }, + graphileCache: getCacheStats(), + svcCache: { + size: svcCache.size, + max: svcCache.max, + ttlMs: SVC_CACHE_TTL_MS, + // Note: with updateAgeOnGet: true, this is "time since last access" not "time since creation" + oldestKeyAgeMs: (() => { + let minRemaining = Infinity; + for (const key of svcCache.keys()) { + const remaining = svcCache.getRemainingTTL(key); + if (remaining < minRemaining) { + minRemaining = remaining; + } + } + return Number.isFinite(minRemaining) ? SVC_CACHE_TTL_MS - minRemaining : null; + })(), + keys: [...svcCache.keys()].slice(0, 200), + }, + inFlight: { + count: getInFlightCount(), + keys: getInFlightKeys(), + }, + graphileBuilds: getGraphileBuildStats(), + uptimeMinutes: process.uptime() / 60, + timestamp: new Date().toISOString(), + }; +}; diff --git a/graphql/server/src/diagnostics/debug-sampler.ts b/graphql/server/src/diagnostics/debug-sampler.ts new file mode 100644 index 000000000..76ae11de8 --- /dev/null +++ b/graphql/server/src/diagnostics/debug-sampler.ts @@ -0,0 +1,224 @@ +import fs from 'node:fs/promises'; +import path from 'node:path'; +import type { ConstructiveOptions } from '@constructive-io/graphql-types'; +import { Logger } from '@pgpmjs/logger'; +import { getDebugDatabaseSnapshot } from './debug-db-snapshot'; +import { getDebugMemorySnapshot } from './debug-memory-snapshot'; +import { isGraphqlDebugSamplerEnabled } from './observability'; + +const log = new Logger('debug-sampler'); + +const MAX_TOTAL_BYTES = 1024 * 1024 * 1024; // 1 GB + +const getSamplerIntervalMs = (): number => { + const raw = process.env.GRAPHQL_DEBUG_SAMPLER_INTERVAL_MS; + const parsed = raw ? Number.parseInt(raw, 10) : 10_000; + return Number.isFinite(parsed) && parsed >= 1_000 ? parsed : 10_000; +}; + +const getSamplerRootDir = (): string => { + if (process.env.GRAPHQL_DEBUG_SAMPLER_DIR) { + return path.resolve(process.env.GRAPHQL_DEBUG_SAMPLER_DIR); + } + + return path.resolve(__dirname, '../..', 'logs'); +}; + +const createSessionLogDir = (): string => { + const rootDir = getSamplerRootDir(); + const sessionName = `run-${new Date().toISOString().replace(/[:.]/g, '-')}-pid${process.pid}`; + return process.env.GRAPHQL_DEBUG_SAMPLER_DIR + ? rootDir + : path.join(rootDir, sessionName); +}; + +const appendJsonLine = async (filePath: string, payload: unknown): Promise => { + await fs.mkdir(path.dirname(filePath), { recursive: true }); + await fs.appendFile(filePath, `${JSON.stringify(payload)}\n`, 'utf8'); +}; + +const getDirSize = async (dirPath: string): Promise => { + let total = 0; + const entries = await fs.readdir(dirPath, { withFileTypes: true }); + for (const entry of entries) { + const fullPath = path.join(dirPath, entry.name); + if (entry.isDirectory()) { + total += await getDirSize(fullPath); + } else { + const stat = await fs.stat(fullPath); + total += stat.size; + } + } + return total; +}; + +const enforceMaxSize = async (rootDir: string, currentSessionDir: string): Promise => { + try { + const totalSize = await getDirSize(rootDir); + if (totalSize <= MAX_TOTAL_BYTES) { + return; + } + + const entries = await fs.readdir(rootDir, { withFileTypes: true }); + const sessionDirs = await Promise.all( + entries + .filter((e) => e.isDirectory()) + .map(async (e) => { + const fullPath = path.join(rootDir, e.name); + const stat = await fs.stat(fullPath); + return { path: fullPath, mtimeMs: stat.mtimeMs }; + }), + ); + + sessionDirs.sort((a, b) => a.mtimeMs - b.mtimeMs); + + for (const dir of sessionDirs) { + if (dir.path === currentSessionDir) { + continue; + } + log.info(`Rolling cleanup: removing old session dir ${dir.path}`); + await fs.rm(dir.path, { recursive: true, force: true }); + const newSize = await getDirSize(rootDir); + if (newSize <= MAX_TOTAL_BYTES) { + break; + } + } + } catch (error) { + log.error('Failed to enforce max log size', error); + } +}; + +export interface DebugSamplerHandle { + stop(): Promise; +} + +export const startDebugSampler = (opts: ConstructiveOptions): DebugSamplerHandle | null => { + if (!isGraphqlDebugSamplerEnabled(opts.server?.host)) { + return null; + } + + const intervalMs = getSamplerIntervalMs(); + const logDir = createSessionLogDir(); + const memoryLogPath = path.join(logDir, 'debug-memory.ndjson'); + const dbLogPath = path.join(logDir, 'debug-db.ndjson'); + const errorLogPath = path.join(logDir, 'debug-sampler-errors.ndjson'); + + let timer: NodeJS.Timeout | null = null; + let stopped = false; + let inFlight: Promise | null = null; + let writeFailureLogged = false; + + const runBackgroundWrite = (promise: Promise, scope: string): void => { + promise.catch((error) => { + // Avoid recursive attempts to write additional error files when the + // underlying storage path is broken or unavailable. + if (!writeFailureLogged) { + writeFailureLogged = true; + log.error(`Debug sampler background write failed (${scope})`, error); + } + }); + }; + + const recordError = async (scope: 'memory' | 'db' | 'sampler', error: unknown): Promise => { + const payload = { + scope, + timestamp: new Date().toISOString(), + pid: process.pid, + error: error instanceof Error ? { + name: error.name, + message: error.message, + stack: error.stack, + } : { + message: String(error), + }, + }; + await appendJsonLine(errorLogPath, payload); + }; + + const sampleOnce = async (): Promise => { + if (stopped) { + return; + } + + try { + await appendJsonLine(memoryLogPath, getDebugMemorySnapshot()); + } catch (error) { + log.error('Failed to capture debug memory snapshot', error); + await recordError('memory', error); + } + + try { + await appendJsonLine(dbLogPath, await getDebugDatabaseSnapshot(opts)); + } catch (error) { + log.error('Failed to capture debug DB snapshot', error); + await recordError('db', error); + } + + await enforceMaxSize(getSamplerRootDir(), logDir); + }; + + const tick = (): void => { + if (stopped) { + return; + } + + if (inFlight) { + runBackgroundWrite( + recordError('sampler', new Error('Skipped debug sample because previous sample is still running')), + 'record-skip', + ); + return; + } + + inFlight = sampleOnce() + .catch(async (error) => { + log.error('Debug sampler tick failed', error); + await recordError('sampler', error); + }) + .finally(() => { + inFlight = null; + }); + }; + + const lifecyclePayload = { + event: 'sampler_started', + intervalMs, + logDir, + pid: process.pid, + timestamp: new Date().toISOString(), + }; + runBackgroundWrite(appendJsonLine(memoryLogPath, lifecyclePayload), 'lifecycle-memory-start'); + runBackgroundWrite(appendJsonLine(dbLogPath, lifecyclePayload), 'lifecycle-db-start'); + + log.info(`Debug sampler writing snapshots every ${intervalMs}ms to ${logDir}`); + tick(); + timer = setInterval(tick, intervalMs); + timer.unref(); + + return { + async stop(): Promise { + stopped = true; + if (timer) { + clearInterval(timer); + timer = null; + } + + if (inFlight) { + await inFlight; + } + + const payload = { + event: 'sampler_stopped', + intervalMs, + logDir, + pid: process.pid, + timestamp: new Date().toISOString(), + }; + + await Promise.allSettled([ + appendJsonLine(memoryLogPath, payload), + appendJsonLine(dbLogPath, payload), + ]); + }, + }; +}; diff --git a/graphql/server/src/diagnostics/observability.ts b/graphql/server/src/diagnostics/observability.ts new file mode 100644 index 000000000..bf0e8d466 --- /dev/null +++ b/graphql/server/src/diagnostics/observability.ts @@ -0,0 +1,69 @@ +const LOOPBACK_HOSTS = new Set(['localhost', '127.0.0.1', '::1', '[::1]']); +const LOOPBACK_ADDRESSES = new Set(['127.0.0.1', '::1']); + +const parseBooleanEnv = (value: string | undefined, fallback: boolean): boolean => { + if (value == null) { + return fallback; + } + + const normalized = value.trim().toLowerCase(); + if (['1', 'true', 'yes', 'on'].includes(normalized)) return true; + if (['0', 'false', 'no', 'off'].includes(normalized)) return false; + return fallback; +}; + +const normalizeHost = (value: string | null | undefined): string | null => { + if (!value) { + return null; + } + + const trimmed = value.trim().toLowerCase(); + if (!trimmed) { + return null; + } + + if (trimmed.startsWith('[')) { + const closingIndex = trimmed.indexOf(']'); + return closingIndex === -1 ? trimmed : trimmed.slice(0, closingIndex + 1); + } + + const colonCount = trimmed.split(':').length - 1; + if (colonCount === 1) { + return trimmed.split(':')[0] || null; + } + + return trimmed; +}; + +const normalizeAddress = (value: string | null | undefined): string | null => { + const normalized = normalizeHost(value); + if (!normalized) { + return null; + } + + return normalized.startsWith('::ffff:') ? normalized.slice(7) : normalized; +}; + +export const isDevelopmentObservabilityMode = (): boolean => process.env.NODE_ENV === 'development'; + +export const isLoopbackHost = (value: string | null | undefined): boolean => { + const normalized = normalizeHost(value); + return normalized != null && LOOPBACK_HOSTS.has(normalized); +}; + +export const isLoopbackAddress = (value: string | null | undefined): boolean => { + const normalized = normalizeAddress(value); + return normalized != null && LOOPBACK_ADDRESSES.has(normalized); +}; + +export const isGraphqlObservabilityRequested = (): boolean => + parseBooleanEnv(process.env.GRAPHQL_OBSERVABILITY_ENABLED, false); + +export const isGraphqlObservabilityEnabled = (serverHost?: string | null): boolean => + isDevelopmentObservabilityMode() && + isGraphqlObservabilityRequested() && + isLoopbackHost(serverHost); + +export const isGraphqlDebugSamplerEnabled = (serverHost?: string | null): boolean => + isGraphqlObservabilityEnabled(serverHost) && + parseBooleanEnv(process.env.GRAPHQL_DEBUG_SAMPLER_ENABLED, true); diff --git a/graphql/server/src/middleware/debug-memory.ts b/graphql/server/src/middleware/debug-memory.ts deleted file mode 100644 index 2fc524680..000000000 --- a/graphql/server/src/middleware/debug-memory.ts +++ /dev/null @@ -1,61 +0,0 @@ -import { getNodeEnv } from '@pgpmjs/env'; -import { Logger } from '@pgpmjs/logger'; -import { svcCache } from '@pgpmjs/server-utils'; -import type { RequestHandler } from 'express'; -import { getCacheStats } from 'graphile-cache'; -import { getInFlightCount, getInFlightKeys } from './graphile'; - -const log = new Logger('debug-memory'); - -const toMB = (bytes: number): string => `${(bytes / 1024 / 1024).toFixed(1)} MB`; - -/** - * Development-only debug endpoint for monitoring memory usage and cache state. - * - * Mounts GET /debug/memory which returns: - * - Node.js process memory (heap, RSS, external, array buffers) - * - Graphile cache stats (size, max, TTL, keys with ages) - * - Service cache size - * - In-flight handler creation count - * - Process uptime - * - * This endpoint is only available when NODE_ENV=development. - * In production, it returns 404. - */ -export const debugMemory: RequestHandler = (_req, res) => { - if (getNodeEnv() !== 'development') { - res.status(404).send('Not found'); - return; - } - - const mem = process.memoryUsage(); - const cacheStats = getCacheStats(); - - const response = { - memory: { - heapUsed: toMB(mem.heapUsed), - heapTotal: toMB(mem.heapTotal), - rss: toMB(mem.rss), - external: toMB(mem.external), - arrayBuffers: toMB(mem.arrayBuffers), - }, - graphileCache: { - size: cacheStats.size, - max: cacheStats.max, - ttl: `${(cacheStats.ttl / 1000 / 60).toFixed(0)} min`, - keys: cacheStats.keys, - }, - svcCache: { - size: svcCache.size, - }, - inFlight: { - count: getInFlightCount(), - keys: getInFlightKeys(), - }, - uptime: `${(process.uptime() / 60).toFixed(1)} min`, - timestamp: new Date().toISOString(), - }; - - log.debug('Memory snapshot:', response); - res.json(response); -}; diff --git a/graphql/server/src/middleware/graphile.ts b/graphql/server/src/middleware/graphile.ts index 7de8ec9d7..ef1618c2f 100644 --- a/graphql/server/src/middleware/graphile.ts +++ b/graphql/server/src/middleware/graphile.ts @@ -10,7 +10,9 @@ import { ConstructivePreset, makePgService } from 'graphile-settings'; import { buildConnectionString } from 'pg-cache'; import { getPgEnvOptions } from 'pg-env'; import './types'; // for Request type +import { isGraphqlObservabilityEnabled } from '../diagnostics/observability'; import { HandlerCreationError } from '../errors/api-errors'; +import { observeGraphileBuild } from './observability/graphile-build-stats'; const maskErrorLog = new Logger('graphile:maskError'); @@ -188,6 +190,8 @@ const buildPreset = ( }); export const graphile = (opts: ConstructiveOptions): RequestHandler => { + const observabilityEnabled = isGraphqlObservabilityEnabled(opts.server?.host); + return async (req: Request, res: Response, next: NextFunction) => { const label = reqLabel(req); try { @@ -267,7 +271,15 @@ export const graphile = (opts: ConstructiveOptions): RequestHandler => { // Create promise and store in in-flight map BEFORE try block const preset = buildPreset(connectionString, schema || [], anonRole, roleName); - const creationPromise = createGraphileInstance({ preset, cacheKey: key }); + const creationPromise = observeGraphileBuild( + { + cacheKey: key, + serviceKey: key, + databaseId: api.databaseId ?? null, + }, + () => createGraphileInstance({ preset, cacheKey: key }), + { enabled: observabilityEnabled }, + ); creating.set(key, creationPromise); try { diff --git a/graphql/server/src/middleware/observability/__tests__/graphile-build-stats.test.ts b/graphql/server/src/middleware/observability/__tests__/graphile-build-stats.test.ts new file mode 100644 index 000000000..e1735f221 --- /dev/null +++ b/graphql/server/src/middleware/observability/__tests__/graphile-build-stats.test.ts @@ -0,0 +1,43 @@ +import { getGraphileBuildStats, observeGraphileBuild, resetGraphileBuildStats } from '../graphile-build-stats'; + +describe('graphile build stats', () => { + beforeEach(() => { + resetGraphileBuildStats(); + }); + + it('does not record events when disabled', async () => { + await observeGraphileBuild( + { + cacheKey: 'svc:a', + serviceKey: 'svc:a', + databaseId: 'db1', + }, + async () => 'ok', + { enabled: false }, + ); + + const stats = getGraphileBuildStats(); + expect(stats.started).toBe(0); + expect(stats.succeeded).toBe(0); + expect(stats.recentEvents).toHaveLength(0); + }); + + it('caps aggregate maps to avoid unbounded key growth', async () => { + for (let i = 0; i < 130; i += 1) { + await observeGraphileBuild( + { + cacheKey: `svc:${i}`, + serviceKey: `svc:${i}`, + databaseId: `db:${i}`, + }, + async () => 'ok', + { enabled: true }, + ); + } + + const stats = getGraphileBuildStats(); + expect(Object.keys(stats.byServiceKey)).toHaveLength(100); + expect(stats.byServiceKey['svc:0']).toBeUndefined(); + expect(stats.byServiceKey['svc:129']).toBeDefined(); + }); +}); diff --git a/graphql/server/src/middleware/observability/__tests__/guard.test.ts b/graphql/server/src/middleware/observability/__tests__/guard.test.ts new file mode 100644 index 000000000..72edba049 --- /dev/null +++ b/graphql/server/src/middleware/observability/__tests__/guard.test.ts @@ -0,0 +1,47 @@ +import type { NextFunction, Request, Response } from 'express'; +import { localObservabilityOnly } from '../guard'; + +function makeReq(input: { remoteAddress?: string | null; host?: string } = {}): Request { + return { + socket: { + remoteAddress: input.remoteAddress ?? '::1', + }, + headers: input.host ? { host: input.host } : {}, + } as unknown as Request; +} + +function makeRes(): Response { + return { + status: jest.fn().mockReturnThis(), + send: jest.fn().mockReturnThis(), + } as unknown as Response; +} + +function makeNext(): NextFunction { + return jest.fn() as unknown as NextFunction; +} + +describe('localObservabilityOnly', () => { + it('allows loopback requests', () => { + const req = makeReq({ remoteAddress: '::ffff:127.0.0.1', host: 'localhost:3000' }); + const res = makeRes(); + const next = makeNext(); + + localObservabilityOnly(req, res, next); + + expect(next).toHaveBeenCalledTimes(1); + expect(res.status).not.toHaveBeenCalled(); + }); + + it('returns 404 for non-local requests', () => { + const req = makeReq({ remoteAddress: '10.0.0.5', host: 'localhost:3000' }); + const res = makeRes(); + const next = makeNext(); + + localObservabilityOnly(req, res, next); + + expect(next).not.toHaveBeenCalled(); + expect(res.status).toHaveBeenCalledWith(404); + expect(res.send).toHaveBeenCalledWith('Not found'); + }); +}); diff --git a/graphql/server/src/middleware/observability/debug-db.ts b/graphql/server/src/middleware/observability/debug-db.ts new file mode 100644 index 000000000..f012ffe3b --- /dev/null +++ b/graphql/server/src/middleware/observability/debug-db.ts @@ -0,0 +1,28 @@ +import type { ConstructiveOptions } from '@constructive-io/graphql-types'; +import { Logger } from '@pgpmjs/logger'; +import type { RequestHandler } from 'express'; +import { getDebugDatabaseSnapshot } from '../../diagnostics/debug-db-snapshot'; + +const log = new Logger('debug-db'); + +export const createDebugDatabaseMiddleware = (opts: ConstructiveOptions): RequestHandler => { + return async (_req, res) => { + try { + const response = await getDebugDatabaseSnapshot(opts); + + log.debug('Database debug snapshot', { + activeActivity: response.activeActivity.length, + blockedActivity: response.blockedActivity.length, + lockSummary: response.lockSummary.length, + }); + + res.json(response); + } catch (error) { + log.error('Failed to fetch debug DB snapshot', error); + res.status(500).json({ + error: 'Failed to fetch database debug snapshot', + message: error instanceof Error ? error.message : String(error), + }); + } + }; +}; diff --git a/graphql/server/src/middleware/observability/debug-memory.ts b/graphql/server/src/middleware/observability/debug-memory.ts new file mode 100644 index 000000000..81372f035 --- /dev/null +++ b/graphql/server/src/middleware/observability/debug-memory.ts @@ -0,0 +1,12 @@ +import { Logger } from '@pgpmjs/logger'; +import type { RequestHandler } from 'express'; +import { getDebugMemorySnapshot } from '../../diagnostics/debug-memory-snapshot'; + +const log = new Logger('debug-memory'); + +export const debugMemory: RequestHandler = (_req, res) => { + const response = getDebugMemorySnapshot(); + + log.debug('Memory snapshot:', response); + res.json(response); +}; diff --git a/graphql/server/src/middleware/observability/graphile-build-stats.ts b/graphql/server/src/middleware/observability/graphile-build-stats.ts new file mode 100644 index 000000000..9c14b0893 --- /dev/null +++ b/graphql/server/src/middleware/observability/graphile-build-stats.ts @@ -0,0 +1,240 @@ +export interface GraphileBuildEvent { + type: 'start' | 'success' | 'failure'; + cacheKey: string; + serviceKey: string | null; + databaseId: string | null; + durationMs?: number; + error?: string; + timestamp: string; +} + +interface GraphileBuildAggregate { + count: number; + totalMs: number; + maxMs: number; + lastMs: number; + lastAt: string | null; +} + +interface GraphileBuildContext { + cacheKey: string; + serviceKey: string | null; + databaseId: string | null; +} + +const MAX_BUILD_EVENTS = 100; +const MAX_AGGREGATE_KEYS = 100; + +const buildStats = { + started: 0, + succeeded: 0, + failed: 0, + totalMs: 0, + maxMs: 0, + lastMs: 0, + lastKey: null as string | null, + lastStartedAt: null as string | null, + lastFinishedAt: null as string | null, + lastError: null as string | null, + lastServiceKey: null as string | null, + lastDatabaseId: null as string | null, + recentEvents: [] as GraphileBuildEvent[], + byServiceKey: new Map(), +}; + +const pushBuildEvent = (event: GraphileBuildEvent): void => { + buildStats.recentEvents.push(event); + if (buildStats.recentEvents.length > MAX_BUILD_EVENTS) { + buildStats.recentEvents.splice(0, buildStats.recentEvents.length - MAX_BUILD_EVENTS); + } +}; + +const updateAggregate = ( + map: Map, + key: string, + durationMs: number, +): void => { + const hasKey = map.has(key); + if (!hasKey && map.size >= MAX_AGGREGATE_KEYS) { + const oldestKey = map.keys().next().value; + if (oldestKey) { + map.delete(oldestKey); + } + } + + const current = map.get(key) ?? { + count: 0, + totalMs: 0, + maxMs: 0, + lastMs: 0, + lastAt: null, + }; + + current.count += 1; + current.totalMs += durationMs; + current.maxMs = Math.max(current.maxMs, durationMs); + current.lastMs = durationMs; + current.lastAt = new Date().toISOString(); + + // Keep LRU order by reinserting after each update. + if (hasKey) { + map.delete(key); + } + map.set(key, current); +}; + +const recordBuildStart = (context: GraphileBuildContext, startedAt: number): void => { + buildStats.started += 1; + buildStats.lastKey = context.cacheKey; + buildStats.lastStartedAt = new Date(startedAt).toISOString(); + buildStats.lastServiceKey = context.serviceKey; + buildStats.lastDatabaseId = context.databaseId; + + pushBuildEvent({ + type: 'start', + cacheKey: context.cacheKey, + serviceKey: context.serviceKey, + databaseId: context.databaseId, + timestamp: new Date(startedAt).toISOString(), + }); +}; + +const recordBuildSuccess = (context: GraphileBuildContext, startedAt: number): void => { + const durationMs = Date.now() - startedAt; + + buildStats.succeeded += 1; + buildStats.totalMs += durationMs; + buildStats.maxMs = Math.max(buildStats.maxMs, durationMs); + buildStats.lastMs = durationMs; + buildStats.lastFinishedAt = new Date().toISOString(); + buildStats.lastError = null; + buildStats.lastServiceKey = context.serviceKey; + buildStats.lastDatabaseId = context.databaseId; + + if (context.serviceKey) { + updateAggregate(buildStats.byServiceKey, context.serviceKey, durationMs); + } + + pushBuildEvent({ + type: 'success', + cacheKey: context.cacheKey, + serviceKey: context.serviceKey, + databaseId: context.databaseId, + durationMs, + timestamp: new Date().toISOString(), + }); +}; + +const recordBuildFailure = ( + context: GraphileBuildContext, + startedAt: number, + error: unknown, +): void => { + const durationMs = Date.now() - startedAt; + + buildStats.failed += 1; + buildStats.totalMs += durationMs; + buildStats.maxMs = Math.max(buildStats.maxMs, durationMs); + buildStats.lastMs = durationMs; + buildStats.lastFinishedAt = new Date().toISOString(); + buildStats.lastError = error instanceof Error ? error.message : String(error); + buildStats.lastServiceKey = context.serviceKey; + buildStats.lastDatabaseId = context.databaseId; + + pushBuildEvent({ + type: 'failure', + cacheKey: context.cacheKey, + serviceKey: context.serviceKey, + databaseId: context.databaseId, + durationMs, + error: error instanceof Error ? error.message : String(error), + timestamp: new Date().toISOString(), + }); +}; + +export const observeGraphileBuild = async ( + context: GraphileBuildContext, + fn: () => Promise, + opts: { enabled: boolean }, +): Promise => { + if (!opts.enabled) { + return fn(); + } + + const startedAt = Date.now(); + recordBuildStart(context, startedAt); + + try { + const result = await fn(); + recordBuildSuccess(context, startedAt); + return result; + } catch (error) { + recordBuildFailure(context, startedAt, error); + throw error; + } +}; + +export const resetGraphileBuildStats = (): void => { + buildStats.started = 0; + buildStats.succeeded = 0; + buildStats.failed = 0; + buildStats.totalMs = 0; + buildStats.maxMs = 0; + buildStats.lastMs = 0; + buildStats.lastKey = null; + buildStats.lastStartedAt = null; + buildStats.lastFinishedAt = null; + buildStats.lastError = null; + buildStats.lastServiceKey = null; + buildStats.lastDatabaseId = null; + buildStats.recentEvents = []; + buildStats.byServiceKey.clear(); +}; + +export function getGraphileBuildStats(): { + started: number; + succeeded: number; + failed: number; + totalMs: number; + maxMs: number; + lastMs: number; + averageMs: number; + lastKey: string | null; + lastStartedAt: string | null; + lastFinishedAt: string | null; + lastError: string | null; + lastServiceKey: string | null; + lastDatabaseId: string | null; + recentEvents: GraphileBuildEvent[]; + byServiceKey: Record; +} { + const completed = buildStats.succeeded + buildStats.failed; + const withAverages = (map: Map) => + Object.fromEntries( + [...map.entries()].map(([key, value]) => [ + key, + { + ...value, + averageMs: value.count > 0 ? value.totalMs / value.count : 0, + }, + ]), + ); + + return { + started: buildStats.started, + succeeded: buildStats.succeeded, + failed: buildStats.failed, + totalMs: buildStats.totalMs, + maxMs: buildStats.maxMs, + lastMs: buildStats.lastMs, + averageMs: completed > 0 ? buildStats.totalMs / completed : 0, + lastKey: buildStats.lastKey, + lastStartedAt: buildStats.lastStartedAt, + lastFinishedAt: buildStats.lastFinishedAt, + lastError: buildStats.lastError, + lastServiceKey: buildStats.lastServiceKey, + lastDatabaseId: buildStats.lastDatabaseId, + recentEvents: [...buildStats.recentEvents], + byServiceKey: withAverages(buildStats.byServiceKey), + }; +} diff --git a/graphql/server/src/middleware/observability/guard.ts b/graphql/server/src/middleware/observability/guard.ts new file mode 100644 index 000000000..04d5477ab --- /dev/null +++ b/graphql/server/src/middleware/observability/guard.ts @@ -0,0 +1,18 @@ +import type { RequestHandler } from 'express'; +import { isLoopbackAddress, isLoopbackHost } from '../../diagnostics/observability'; + +export const localObservabilityOnly: RequestHandler = (req, res, next) => { + const remoteAddress = req.socket.remoteAddress; + if (isLoopbackAddress(remoteAddress)) { + next(); + return; + } + + const hostHeader = req.headers.host; + if (!remoteAddress && isLoopbackHost(hostHeader)) { + next(); + return; + } + + res.status(404).send('Not found'); +}; diff --git a/graphql/server/src/middleware/observability/request-logger.ts b/graphql/server/src/middleware/observability/request-logger.ts new file mode 100644 index 000000000..cc5835ae4 --- /dev/null +++ b/graphql/server/src/middleware/observability/request-logger.ts @@ -0,0 +1,64 @@ +import { randomUUID } from 'crypto'; +import { Logger } from '@pgpmjs/logger'; +import type { RequestHandler } from 'express'; + +const log = new Logger('server'); +const SAFE_REQUEST_ID = /^[a-zA-Z0-9\-_]{1,128}$/; + +interface RequestLoggerOptions { + observabilityEnabled: boolean; +} + +export const createRequestLogger = ({ observabilityEnabled }: RequestLoggerOptions): RequestHandler => { + return (req, res, next) => { + const headerRequestId = req.header('x-request-id'); + const reqId = (headerRequestId && SAFE_REQUEST_ID.test(headerRequestId)) + ? headerRequestId + : randomUUID(); + const start = process.hrtime.bigint(); + let finished = false; + + req.requestId = reqId; + + const host = req.hostname || req.headers.host || 'unknown'; + const ip = req.clientIp ?? req.ip ?? 'unknown'; + + log.debug(`[${reqId}] -> ${req.method} ${req.originalUrl} host=${host} ip=${ip}`); + + res.on('finish', () => { + finished = true; + const durationMs = Number(process.hrtime.bigint() - start) / 1e6; + const apiInfo = req.api + ? `db=${req.api.dbname} schemas=${req.api.schema?.join(',') || 'none'}` + : 'api=unresolved'; + const authInfo = req.token ? 'auth=token' : 'auth=anon'; + const svcInfo = req.svc_key ? `svc=${req.svc_key}` : 'svc=unset'; + + log.debug( + `[${reqId}] <- ${res.statusCode} ${req.method} ${req.originalUrl} (${durationMs.toFixed( + 1, + )} ms) ${apiInfo} ${svcInfo} ${authInfo}`, + ); + }); + + if (observabilityEnabled) { + res.on('close', () => { + if (finished) { + return; + } + + const durationMs = Number(process.hrtime.bigint() - start) / 1e6; + const apiInfo = req.api + ? `db=${req.api.dbname} schemas=${req.api.schema?.join(',') || 'none'}` + : 'api=unresolved'; + + log.warn( + `[${reqId}] connection closed before response completed ` + + `${req.method} ${req.originalUrl} (${durationMs.toFixed(1)} ms) ${apiInfo}`, + ); + }); + } + + next(); + }; +}; diff --git a/graphql/server/src/server.ts b/graphql/server/src/server.ts index a202186a9..1ae4759db 100644 --- a/graphql/server/src/server.ts +++ b/graphql/server/src/server.ts @@ -4,7 +4,6 @@ import { Logger } from '@pgpmjs/logger'; import { healthz, poweredBy, svcCache, trustProxy } from '@pgpmjs/server-utils'; import { PgpmOptions } from '@pgpmjs/types'; import { middleware as parseDomains } from '@constructive-io/url-domains'; -import { randomUUID } from 'crypto'; import express, { Express, RequestHandler } from 'express'; import type { Server as HttpServer } from 'http'; import graphqlUpload from 'graphql-upload'; @@ -13,6 +12,14 @@ import { graphileCache, closeAllCaches } from 'graphile-cache'; import { getPgPool } from 'pg-cache'; import requestIp from 'request-ip'; +import type { DebugSamplerHandle } from './diagnostics/debug-sampler'; +import { closeDebugDatabasePools } from './diagnostics/debug-db-snapshot'; +import { + isDevelopmentObservabilityMode, + isGraphqlObservabilityEnabled, + isGraphqlObservabilityRequested, + isLoopbackHost, +} from './diagnostics/observability'; import { createApiMiddleware } from './middleware/api'; import { createAuthenticateMiddleware } from './middleware/auth'; import { cors } from './middleware/cors'; @@ -21,8 +28,12 @@ import { favicon } from './middleware/favicon'; import { flush, flushService } from './middleware/flush'; import { graphile } from './middleware/graphile'; import { multipartBridge } from './middleware/multipart-bridge'; +import { createDebugDatabaseMiddleware } from './middleware/observability/debug-db'; +import { debugMemory } from './middleware/observability/debug-memory'; +import { localObservabilityOnly } from './middleware/observability/guard'; +import { createRequestLogger } from './middleware/observability/request-logger'; import { createUploadAuthenticateMiddleware, uploadRoute } from './middleware/upload'; -import { debugMemory } from './middleware/debug-memory'; +import { startDebugSampler } from './diagnostics/debug-sampler'; const log = new Logger('server'); @@ -62,47 +73,19 @@ class Server { private shuttingDown = false; private closed = false; private httpServer: HttpServer | null = null; + private debugSampler: DebugSamplerHandle | null = null; constructor(opts: ConstructiveOptions) { this.opts = getEnvOptions(opts); const effectiveOpts = this.opts; + const observabilityRequested = isGraphqlObservabilityRequested(); + const observabilityEnabled = isGraphqlObservabilityEnabled(effectiveOpts.server?.host); const app = express(); const api = createApiMiddleware(effectiveOpts); const authenticate = createAuthenticateMiddleware(effectiveOpts); const uploadAuthenticate = createUploadAuthenticateMiddleware(effectiveOpts); - const SAFE_REQUEST_ID = /^[a-zA-Z0-9\-_]{1,128}$/; - const requestLogger: RequestHandler = (req, res, next) => { - const headerRequestId = req.header('x-request-id'); - const reqId = (headerRequestId && SAFE_REQUEST_ID.test(headerRequestId)) - ? headerRequestId - : randomUUID(); - const start = process.hrtime.bigint(); - - req.requestId = reqId; - - const host = req.hostname || req.headers.host || 'unknown'; - const ip = req.clientIp || req.ip; - - log.debug(`[${reqId}] -> ${req.method} ${req.originalUrl} host=${host} ip=${ip}`); - - res.on('finish', () => { - const durationMs = Number(process.hrtime.bigint() - start) / 1e6; - const apiInfo = req.api - ? `db=${req.api.dbname} schemas=${req.api.schema?.join(',') || 'none'}` - : 'api=unresolved'; - const authInfo = req.token ? 'auth=token' : 'auth=anon'; - const svcInfo = req.svc_key ? `svc=${req.svc_key}` : 'svc=unset'; - - log.debug( - `[${reqId}] <- ${res.statusCode} ${req.method} ${req.originalUrl} (${durationMs.toFixed( - 1, - )} ms) ${apiInfo} ${svcInfo} ${authInfo}`, - ); - }); - - next(); - }; + const requestLogger = createRequestLogger({ observabilityEnabled }); // Log startup configuration (non-sensitive values only) const apiOpts = (effectiveOpts as any).api || {}; @@ -118,10 +101,34 @@ class Server { exposedSchemas: apiOpts.exposedSchemas?.join(',') || 'none', anonRole: apiOpts.anonRole, roleName: apiOpts.roleName, + observabilityEnabled, }); + if (observabilityRequested && !observabilityEnabled) { + const reasons = []; + if (!isDevelopmentObservabilityMode()) { + reasons.push('NODE_ENV must be development'); + } + if (!isLoopbackHost(effectiveOpts.server?.host)) { + reasons.push('server host must be localhost, 127.0.0.1, or ::1'); + } + + log.warn( + `GRAPHQL_OBSERVABILITY_ENABLED was requested but observability remains disabled${ + reasons.length > 0 ? `: ${reasons.join('; ')}` : '' + }`, + ); + } + healthz(app); - app.get('/debug/memory', debugMemory); + if (observabilityEnabled) { + app.get('/debug/memory', localObservabilityOnly, debugMemory); + app.get('/debug/db', localObservabilityOnly, createDebugDatabaseMiddleware(effectiveOpts)); + } else { + app.use('/debug', (_req, res) => { + res.status(404).send('Not found'); + }); + } app.use(favicon); trustProxy(app, effectiveOpts.server.trustProxy); // Warn if a global CORS override is set in production @@ -159,6 +166,7 @@ class Server { app.use(errorHandler); // Catches all thrown errors this.app = app; + this.debugSampler = observabilityEnabled ? startDebugSampler(effectiveOpts) : null; } listen(): HttpServer { @@ -266,9 +274,14 @@ class Server { this.closed = true; this.shuttingDown = true; await this.removeEventListener(); + if (this.debugSampler) { + await this.debugSampler.stop(); + this.debugSampler = null; + } if (this.httpServer?.listening) { await new Promise((resolve) => this.httpServer!.close(() => resolve())); } + await closeDebugDatabasePools(); if (closeCaches) { await Server.closeCaches({ closePools: true }); } diff --git a/packages/server-utils/src/lru.ts b/packages/server-utils/src/lru.ts index 8088f473e..4ac40abcf 100644 --- a/packages/server-utils/src/lru.ts +++ b/packages/server-utils/src/lru.ts @@ -7,10 +7,12 @@ const ONE_HOUR_IN_MS = 1000 * 60 * 60; const ONE_DAY = ONE_HOUR_IN_MS * 24; const ONE_YEAR = ONE_DAY * 366; +export const SVC_CACHE_TTL_MS = ONE_YEAR; + // --- Service Cache --- export const svcCache = new LRUCache({ max: 25, - ttl: ONE_YEAR, + ttl: SVC_CACHE_TTL_MS, updateAgeOnGet: true, dispose: (_, key) => { log.debug(`Disposing service[${key}]`); diff --git a/postgres/pg-cache/src/__tests__/lru.test.ts b/postgres/pg-cache/src/__tests__/lru.test.ts new file mode 100644 index 000000000..f7f50aee5 --- /dev/null +++ b/postgres/pg-cache/src/__tests__/lru.test.ts @@ -0,0 +1,117 @@ +// Guards against the pg-cache close() resource leak fixed in feat/observability. +// +// Previously, close() reset this.closed = false after shutdown, allowing +// set() to silently accept new pools that were never cleaned up. The module- +// level closePromise also reset to null, enabling double-shutdown. +// +// These tests lock the fix: close() is final, set() rejects, and repeated +// close() calls are idempotent. See pg-cache-close-leak.md for full details. + +import pg from 'pg'; +import { PgPoolCacheManager } from '../lru'; + +// Minimal mock — we only need pool.end() and pool.ended +const createMockPool = (): pg.Pool => { + let ended = false; + return { + get ended() { return ended; }, + end: jest.fn(async () => { ended = true; }), + } as unknown as pg.Pool; +}; + +describe('PgPoolCacheManager', () => { + let cache: PgPoolCacheManager; + + beforeEach(() => { + cache = new PgPoolCacheManager(); + }); + + afterEach(async () => { + // Ensure all pools are cleaned up even if a test fails mid-way + try { await cache.close(); } catch { /* already closed */ } + }); + + it('stores and retrieves a pool', () => { + const pool = createMockPool(); + cache.set('key1', pool); + expect(cache.get('key1')).toBe(pool); + expect(cache.has('key1')).toBe(true); + }); + + it('returns undefined for missing keys', () => { + expect(cache.get('missing')).toBeUndefined(); + expect(cache.has('missing')).toBe(false); + }); + + describe('close() lifecycle', () => { + it('set() after close() throws', async () => { + const pool1 = createMockPool(); + cache.set('key1', pool1); + + await cache.close(); + + const pool2 = createMockPool(); + expect(() => cache.set('key2', pool2)).toThrow( + 'Cannot add to cache after it has been closed (key: key2)', + ); + }); + + it('get() after close() returns undefined with warning', async () => { + const pool = createMockPool(); + cache.set('key1', pool); + + await cache.close(); + + expect(cache.get('key1')).toBeUndefined(); + }); + + it('double close() is idempotent', async () => { + const pool = createMockPool(); + cache.set('key1', pool); + + await cache.close(); + await cache.close(); // should not throw + + expect(pool.end).toHaveBeenCalledTimes(1); + }); + + it('close() disposes all pools', async () => { + const pool1 = createMockPool(); + const pool2 = createMockPool(); + cache.set('key1', pool1); + cache.set('key2', pool2); + + await cache.close(); + + expect(pool1.end).toHaveBeenCalledTimes(1); + expect(pool2.end).toHaveBeenCalledTimes(1); + }); + }); + + describe('cleanup callbacks', () => { + it('fires callback on close()', async () => { + const pool = createMockPool(); + cache.set('key1', pool); + + const callback = jest.fn(); + cache.registerCleanupCallback(callback); + + await cache.close(); + + expect(callback).toHaveBeenCalledWith('key1'); + }); + + it('unregister prevents callback from firing', async () => { + const pool = createMockPool(); + cache.set('key1', pool); + + const callback = jest.fn(); + const unregister = cache.registerCleanupCallback(callback); + unregister(); + + await cache.close(); + + expect(callback).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/postgres/pg-cache/src/lru.ts b/postgres/pg-cache/src/lru.ts index 119fb37af..5ec244284 100644 --- a/postgres/pg-cache/src/lru.ts +++ b/postgres/pg-cache/src/lru.ts @@ -70,6 +70,10 @@ export class PgPoolCacheManager { } get(key: PgPoolKey): pg.Pool | undefined { + if (this.closed) { + log.warn(`Cache is closed, ignoring get(${key})`); + return undefined; + } return this.pgCache.get(key)?.pool; }