diff --git a/src/index.js b/src/index.js index 9f57c005..0fdedfcc 100644 --- a/src/index.js +++ b/src/index.js @@ -29,6 +29,7 @@ import { runDemoUrlProcessor as demoUrlProcessor } from './tasks/demo-url-proces import { runCwvDemoSuggestionsProcessor as cwvDemoSuggestionsProcessor } from './tasks/cwv-demo-suggestions-processor/handler.js'; import { runAgentExecutor as agentExecutor } from './tasks/agent-executor/handler.js'; import { runSlackNotify as slackNotify } from './tasks/slack-notify/handler.js'; +import { runUrlInspectorRefresh as urlInspectorRefresh } from './tasks/url-inspector-refresh/handler.js'; const HANDLERS = { 'opportunity-status-processor': opportunityStatusProcessor, @@ -38,6 +39,7 @@ const HANDLERS = { 'agent-executor': agentExecutor, 'slack-notify': slackNotify, 'cwv-demo-suggestions-processor': cwvDemoSuggestionsProcessor, + 'url-inspector-refresh': urlInspectorRefresh, dummy: (message) => ok(message), // for tests }; diff --git a/src/tasks/url-inspector-refresh/handler.js b/src/tasks/url-inspector-refresh/handler.js new file mode 100644 index 00000000..f8b1e108 --- /dev/null +++ b/src/tasks/url-inspector-refresh/handler.js @@ -0,0 +1,217 @@ +/* + * Copyright 2026 Adobe. All rights reserved. + * This file is licensed to you under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. You may obtain a copy + * of the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS + * OF ANY KIND, either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ + +import { badRequest, internalServerError, ok } from '@adobe/spacecat-shared-http-utils'; +import { isValidUUID } from '@adobe/spacecat-shared-utils'; + +import { postgrestClientFromContext } from '../../utils/postgrest-client.js'; + +/* + * url-inspector-refresh + * + * One message in: { type: 'url-inspector-refresh', siteId: }. + * + * Per invocation: + * 1. Call rpc_url_inspector_stale_slices_for_site(siteId) — cheap, indexed, + * returns 0..N (month_start, month_end) rows that are stale. + * 2. For each stale month, call wrpc_refresh_url_inspector_domain_stats( + * siteId, month_start, month_end). The refresh RPC is idempotent + * (DELETE + INSERT under pg_advisory_xact_lock per site), so re-running + * on the same (site, month) is safe. + * + * Failure model: + * - Per-RPC retry: each PostgREST call is retried up to PER_RPC_ATTEMPTS times + * in-handler before being declared failed. + * - Per-month isolation: a failure on one month does NOT abort the rest of + * the site's months; the failure is logged + counted + skipped, and the + * loop continues. + * - We never throw to the SQS dispatcher: the task-processor jobs queue runs + * with maxReceiveCount=1, so a throw would immediately DLQ the message and + * require manual ops attention. Instead we lean on the every-30-min + * schedule + the idempotency of the refresh RPC: any month we fail to + * refresh stays "stale" in the next invocation's staleness query and gets + * retried on the next tick. + * + * Budget: + * - SQS visibility timeout for spacecat-task-processor-jobs is 900s. We cap + * wall time at PER_INVOCATION_BUDGET_MS (12 min) and defer the remainder + * to the next schedule tick. + * + * Observability: + * - Every per-month outcome is emitted as a single structured log line + * ({ event, siteId, month_start, status, durationMs, attempts }) so a + * downstream CloudWatch metric filter can turn them into counters / SLI + * gauges without a CloudWatch SDK dep on the hot path. + */ + +const TASK_TYPE = 'url-inspector-refresh'; +const PER_INVOCATION_BUDGET_MS = 12 * 60 * 1000; +const PER_RPC_ATTEMPTS = 2; +const RETRY_BACKOFF_MS = 250; + +function sleep(ms) { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); +} + +/** + * Call client.rpc(...) up to `attempts` times. Returns whatever the final attempt + * returned (success or last error). Backs off by `backoffMs * attemptNumber`. + */ +async function withRpcRetry(client, fnName, params, { + attempts = PER_RPC_ATTEMPTS, backoffMs = RETRY_BACKOFF_MS, log, sleepFn = sleep, +} = {}) { + let last; + for (let i = 1; i <= attempts; i += 1) { + /* eslint-disable no-await-in-loop */ + last = await client.rpc(fnName, params); + if (!last.error) { + return { ...last, attempts: i }; + } + if (i < attempts) { + log?.warn?.(`url-inspector-refresh: ${fnName} attempt ${i}/${attempts} failed (${last.error.message}); retrying in ${backoffMs * i}ms`); + await sleepFn(backoffMs * i); + } + /* eslint-enable no-await-in-loop */ + } + return { ...last, attempts }; +} + +/** + * Main handler. Returns: + * - 400 (badRequest) on missing/invalid siteId + * - 500 (internalServerError) on un-recoverable config issues (e.g. missing + * POSTGREST_URL — surfaced so prod alarms can pick it up immediately) + * - 200 (ok) with `{ siteId, refreshed, failed, deferred, totalStale }` on + * every other outcome, including partial success / all-failed + */ +export async function runUrlInspectorRefresh(message, context, deps = {}) { + const { log } = context; + const { siteId } = message || {}; + + if (!isValidUUID(siteId)) { + log.error(`${TASK_TYPE}: invalid or missing siteId`); + return badRequest('siteId is required and must be a valid UUID'); + } + + let client; + try { + client = deps.client || postgrestClientFromContext(context); + } catch (err) { + // baseUrl/apiKey missing — config error, NOT a transient one. Surface 500 so + // the Lambda errors metric (and any alarm wired to it) fires. + log.error(`${TASK_TYPE}: postgrest client init failed for site ${siteId}: ${err.message}`); + return internalServerError(err.message); + } + + const budgetMs = deps.budgetMs ?? PER_INVOCATION_BUDGET_MS; + const attempts = deps.attempts ?? PER_RPC_ATTEMPTS; + const sleepFn = deps.sleepFn ?? sleep; + + log.info(`${TASK_TYPE}: starting refresh for site ${siteId}`); + + const staleResult = await withRpcRetry( + client, + 'rpc_url_inspector_stale_slices_for_site', + { p_site_id: siteId }, + { attempts, log, sleepFn }, + ); + + if (staleResult.error) { + log.error(`${TASK_TYPE}: staleness query failed for site ${siteId} after ${staleResult.attempts} attempts: ${staleResult.error.message}`); + log.info(JSON.stringify({ + event: `${TASK_TYPE}.staleness_failed`, + siteId, + attempts: staleResult.attempts, + status: staleResult.error.status, + message: staleResult.error.message, + })); + // Do not throw: next 30-min tick will retry. Return ok so SQS deletes the + // message instead of sending it to DLQ. + return ok({ + siteId, refreshed: 0, failed: 0, deferred: 0, totalStale: 0, stalenessFailed: true, + }); + } + + const stale = Array.isArray(staleResult.data) ? staleResult.data : []; + if (stale.length === 0) { + log.info(`${TASK_TYPE}: site ${siteId} has no stale slices`); + return ok({ + siteId, refreshed: 0, failed: 0, deferred: 0, totalStale: 0, + }); + } + + log.info(`${TASK_TYPE}: site ${siteId} has ${stale.length} stale slice(s)`); + + const startedAt = Date.now(); + let refreshed = 0; + let failed = 0; + let deferred = 0; + + for (const slice of stale) { + const monthStart = slice.month_start; + const monthEnd = slice.month_end; + const elapsed = Date.now() - startedAt; + + if (elapsed > budgetMs) { + deferred = stale.length - refreshed - failed; + log.warn(`${TASK_TYPE}: site ${siteId} budget exhausted after ${elapsed}ms; deferring ${deferred} slice(s) to next tick`); + break; + } + + const t0 = Date.now(); + /* eslint-disable no-await-in-loop */ + const res = await withRpcRetry( + client, + 'wrpc_refresh_url_inspector_domain_stats', + { p_site_id: siteId, p_start_date: monthStart, p_end_date: monthEnd }, + { attempts, log, sleepFn }, + ); + /* eslint-enable no-await-in-loop */ + const durationMs = Date.now() - t0; + + if (res.error) { + failed += 1; + log.error(`${TASK_TYPE}: refresh failed for site ${siteId} month ${monthStart} after ${res.attempts} attempt(s): ${res.error.message}`); + log.info(JSON.stringify({ + event: `${TASK_TYPE}.refresh`, + siteId, + month_start: monthStart, + status: 'error', + attempts: res.attempts, + durationMs, + errorMessage: res.error.message, + errorStatus: res.error.status, + })); + // A failed month stays stale; the next 30-min tick will retry it. + } else { + refreshed += 1; + log.info(JSON.stringify({ + event: `${TASK_TYPE}.refresh`, + siteId, + month_start: monthStart, + status: 'ok', + attempts: res.attempts, + durationMs, + })); + } + } + + log.info(`${TASK_TYPE}: site ${siteId} complete — refreshed=${refreshed} failed=${failed} deferred=${deferred} totalStale=${stale.length}`); + + return ok({ + siteId, refreshed, failed, deferred, totalStale: stale.length, + }); +} + +export default runUrlInspectorRefresh; diff --git a/src/utils/postgrest-client.js b/src/utils/postgrest-client.js new file mode 100644 index 00000000..afa46c29 --- /dev/null +++ b/src/utils/postgrest-client.js @@ -0,0 +1,181 @@ +/* + * Copyright 2026 Adobe. All rights reserved. + * This file is licensed to you under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. You may obtain a copy + * of the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS + * OF ANY KIND, either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ + +/** + * Minimal PostgREST RPC client. + * + * Why we don't reuse `@adobe/spacecat-shared-data-access` v3 here: + * - task-processor runs in DynamoDB (v2) mode for every other handler; flipping + * `DATA_SERVICE_PROVIDER=postgres` would change the global behaviour for all of + * them. This client opts a single handler into PostgREST without that side + * effect. + * - The url-inspector-refresh handler only needs RPC calls (no model CRUD), so the + * full v3 layer is overkill. A ~80-line `fetch` wrapper is easier to reason + * about and to mock in tests. + * + * Surface matches the subset of supabase-js that the api-service controllers use: + * `const { data, error } = await client.rpc(name, namedParams);` + * so future migration to v3 is a one-line swap if we ever flip the task-processor + * to PostgREST for everything. + */ + +const DEFAULT_TIMEOUT_MS = 60_000; + +/** + * Build a PostgREST error object that mirrors supabase-js's shape: + * { message, status, code?, details?, hint?, body? } + * `body` is the parsed JSON or raw text returned by PostgREST when available; it's + * exposed so callers can log full server-side context without re-parsing. + */ +function toError({ + message, status, body, +}) { + const code = body && typeof body === 'object' ? body.code : undefined; + const details = body && typeof body === 'object' ? body.details : undefined; + const hint = body && typeof body === 'object' ? body.hint : undefined; + return { + message, status, code, details, hint, body, + }; +} + +export class PostgrestClient { + /** + * @param {object} opts + * @param {string} opts.baseUrl PostgREST root, e.g. https://pgrst.example/v1 + * @param {string} opts.apiKey Writer JWT (HS256) — sent as Bearer token + * @param {string} [opts.schema] Postgres schema (default: 'public') + * @param {number} [opts.timeoutMs] Per-request timeout (default: 60s) + * @param {Function} [opts.fetchImpl] Injectable fetch; defaults to global fetch + * @param {object} [opts.log] Logger; only `.debug` is used + */ + constructor({ + baseUrl, apiKey, schema = 'public', timeoutMs = DEFAULT_TIMEOUT_MS, fetchImpl, log, + }) { + if (!baseUrl) throw new Error('PostgrestClient: baseUrl is required'); + if (!apiKey) throw new Error('PostgrestClient: apiKey is required'); + this.baseUrl = baseUrl.replace(/\/$/, ''); + this.apiKey = apiKey; + this.schema = schema; + this.timeoutMs = timeoutMs; + this.fetch = fetchImpl || globalThis.fetch; + this.log = log; + if (typeof this.fetch !== 'function') { + throw new Error('PostgrestClient: no fetch implementation available'); + } + } + + /** + * Call a PostgREST RPC (i.e. POST /rpc/). + * + * @param {string} fnName Function name (without `/rpc/` prefix) + * @param {object} [params] Named arguments, posted as JSON body + * @param {object} [options] + * @param {AbortSignal} [options.signal] External abort signal (composed with the + * per-request timeout) + * @returns {Promise<{ data: any, error: object|null }>} + * + * Never throws. Network errors, timeouts, and non-2xx responses all surface as + * `{ data: null, error }` so handlers can `if (error) ...` without try/catch. + */ + async rpc(fnName, params = {}, { signal } = {}) { + const url = `${this.baseUrl}/rpc/${fnName}`; + const headers = { + 'Content-Type': 'application/json', + Accept: 'application/json', + // Both profile headers are needed when the function lives outside the + // exposed schema; harmless when it doesn't. + 'Content-Profile': this.schema, + 'Accept-Profile': this.schema, + Authorization: `Bearer ${this.apiKey}`, + }; + + const controller = new AbortController(); + const timeoutId = setTimeout(() => controller.abort(new Error(`PostgREST timeout after ${this.timeoutMs}ms`)), this.timeoutMs); + // Compose: if the caller's signal fires, abort our internal one too. + if (signal) { + if (signal.aborted) { + controller.abort(signal.reason); + } else { + signal.addEventListener('abort', () => controller.abort(signal.reason), { once: true }); + } + } + + let response; + try { + response = await this.fetch(url, { + method: 'POST', + headers, + body: JSON.stringify(params), + signal: controller.signal, + }); + } catch (err) { + clearTimeout(timeoutId); + return { + data: null, + error: toError({ + message: err?.message || 'PostgREST request failed', + status: 0, + body: null, + }), + }; + } finally { + clearTimeout(timeoutId); + } + + const rawText = await response.text(); + let body = null; + if (rawText) { + try { + body = JSON.parse(rawText); + } catch { + body = rawText; + } + } + + if (!response.ok) { + const message = body && typeof body === 'object' && body.message + ? body.message + : `PostgREST ${response.status} on /rpc/${fnName}`; + this.log?.debug?.(`PostgREST ${response.status} on /rpc/${fnName}: ${rawText}`); + return { + data: null, + error: toError({ message, status: response.status, body }), + }; + } + + return { data: body, error: null }; + } +} + +/** + * Build a PostgrestClient from a UniversalContext-shaped object. + * + * Reads: + * - POSTGREST_URL (required) + * - POSTGREST_API_KEY (required) — writer JWT + * - POSTGREST_SCHEMA (optional, default 'public') + * - POSTGREST_TIMEOUT_MS (optional) + * + * Lazy by design: the env vars are only read here, not at module load, so + * vault-secrets middleware has a chance to populate `context.env` first. + */ +export function postgrestClientFromContext(context, overrides = {}) { + const { env = {}, log } = context || {}; + return new PostgrestClient({ + baseUrl: env.POSTGREST_URL, + apiKey: env.POSTGREST_API_KEY, + schema: env.POSTGREST_SCHEMA || 'public', + timeoutMs: env.POSTGREST_TIMEOUT_MS ? Number(env.POSTGREST_TIMEOUT_MS) : undefined, + log, + ...overrides, + }); +} diff --git a/test/tasks/url-inspector-refresh/url-inspector-refresh.test.js b/test/tasks/url-inspector-refresh/url-inspector-refresh.test.js new file mode 100644 index 00000000..98c8391f --- /dev/null +++ b/test/tasks/url-inspector-refresh/url-inspector-refresh.test.js @@ -0,0 +1,381 @@ +/* + * Copyright 2026 Adobe. All rights reserved. + * This file is licensed to you under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. You may obtain a copy + * of the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS + * OF ANY KIND, either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ + +import { expect, use } from 'chai'; +import sinon from 'sinon'; +import sinonChai from 'sinon-chai'; +import esmock from 'esmock'; + +import { MockContextBuilder } from '../../shared.js'; + +use(sinonChai); + +const VALID_SITE_ID = '9ae8877a-bbf3-407d-9adb-d6a72ce3c5e3'; + +/** + * Build a stub client whose `.rpc(name, params)` consults a per-fn-name queue + * of fake responses. Each call shifts one off; if the queue is exhausted, the + * last response is reused. Matches the shape of PostgrestClient. + */ +function makeStubClient(sandbox, byName) { + const queues = {}; + for (const [name, list] of Object.entries(byName)) { + queues[name] = Array.isArray(list) ? [...list] : [list]; + } + return { + rpc: sandbox.stub().callsFake((fnName, params) => { + const q = queues[fnName]; + if (!q || q.length === 0) { + return Promise.resolve({ + data: null, + error: { message: `no stub queued for ${fnName}`, status: 0 }, + }); + } + const next = q.length === 1 ? q[0] : q.shift(); + // Pass params through for assertion convenience + return Promise.resolve(typeof next === 'function' ? next(params) : next); + }), + }; +} + +describe('runUrlInspectorRefresh', () => { + let sandbox; + let context; + let runUrlInspectorRefresh; + let sleepFn; + + beforeEach(async () => { + sandbox = sinon.createSandbox(); + + // Import the handler fresh per test (esm cache isolation). Sleep is faked + // via deps so retries do not actually wait. + const mod = await import('../../../src/tasks/url-inspector-refresh/handler.js'); + runUrlInspectorRefresh = mod.runUrlInspectorRefresh; + + sleepFn = sandbox.stub().resolves(); + + context = new MockContextBuilder() + .withSandbox(sandbox) + .build(); + + context.env = { + POSTGREST_URL: 'https://pgrst.example/v1', + POSTGREST_API_KEY: 'fake.jwt.token', + POSTGREST_SCHEMA: 'public', + }; + }); + + afterEach(() => sandbox.restore()); + + describe('input validation', () => { + it('returns 400 when siteId is missing', async () => { + const res = await runUrlInspectorRefresh({}, context, { + client: makeStubClient(sandbox, {}), + }); + expect(res.status).to.equal(400); + expect(context.log.error).to.have.been.calledWithMatch(/invalid or missing siteId/); + }); + + it('returns 400 when siteId is not a UUID', async () => { + const res = await runUrlInspectorRefresh({ siteId: 'not-a-uuid' }, context, { + client: makeStubClient(sandbox, {}), + }); + expect(res.status).to.equal(400); + }); + + it('returns 400 when message itself is null', async () => { + const res = await runUrlInspectorRefresh(null, context, { + client: makeStubClient(sandbox, {}), + }); + expect(res.status).to.equal(400); + }); + }); + + describe('client init', () => { + it('returns 500 when no PostgrestClient can be built from context.env', async () => { + context.env = {}; // no POSTGREST_URL / POSTGREST_API_KEY + const res = await runUrlInspectorRefresh({ siteId: VALID_SITE_ID }, context); + expect(res.status).to.equal(500); + const body = await res.json(); + expect(body.message).to.match(/baseUrl is required|apiKey is required/); + expect(context.log.error).to.have.been.calledWithMatch(/postgrest client init failed/); + }); + + it('builds the PostgrestClient from context.env when no override is provided', async () => { + // Inject context-built client by mocking postgrestClientFromContext via esmock + const stubClient = makeStubClient(sandbox, { + rpc_url_inspector_stale_slices_for_site: { data: [], error: null }, + }); + const mod = await esmock('../../../src/tasks/url-inspector-refresh/handler.js', { + '../../../src/utils/postgrest-client.js': { + postgrestClientFromContext: () => stubClient, + }, + }); + + const res = await mod.runUrlInspectorRefresh({ siteId: VALID_SITE_ID }, context); + expect(res.status).to.equal(200); + expect(stubClient.rpc).to.have.been.calledWith( + 'rpc_url_inspector_stale_slices_for_site', + { p_site_id: VALID_SITE_ID }, + ); + }); + }); + + describe('staleness query', () => { + it('returns ok with zeros when no slices are stale', async () => { + const client = makeStubClient(sandbox, { + rpc_url_inspector_stale_slices_for_site: { data: [], error: null }, + }); + const res = await runUrlInspectorRefresh( + { siteId: VALID_SITE_ID }, + context, + { client, sleepFn }, + ); + + expect(res.status).to.equal(200); + expect(await res.json()).to.deep.equal({ + siteId: VALID_SITE_ID, refreshed: 0, failed: 0, deferred: 0, totalStale: 0, + }); + expect(client.rpc).to.have.been.calledOnce; + expect(context.log.info).to.have.been.calledWithMatch(/no stale slices/); + }); + + it('tolerates a null .data payload by treating it as no stale slices', async () => { + const client = makeStubClient(sandbox, { + rpc_url_inspector_stale_slices_for_site: { data: null, error: null }, + }); + const res = await runUrlInspectorRefresh( + { siteId: VALID_SITE_ID }, + context, + { client, sleepFn }, + ); + + expect(res.status).to.equal(200); + expect((await res.json()).totalStale).to.equal(0); + }); + + it('uses the real timer-based sleep when no sleepFn is injected', async () => { + const client = makeStubClient(sandbox, { + rpc_url_inspector_stale_slices_for_site: [ + { data: null, error: { message: 'transient', status: 503 } }, + { data: [], error: null }, + ], + }); + // sleepFn omitted on purpose so the default `sleep` runs. Set backoff to + // 0 in the handler implicitly by using only 2 attempts and accepting the + // ~250ms wait — keep this test budget tiny by using fake timers. + const clock = sinon.useFakeTimers(); + const promise = runUrlInspectorRefresh( + { siteId: VALID_SITE_ID }, + context, + { client }, + ); + // Advance past the first retry's backoff window (RETRY_BACKOFF_MS * 1) + await clock.tickAsync(250); + const res = await promise; + clock.restore(); + + expect(res.status).to.equal(200); + expect(client.rpc).to.have.been.calledTwice; + }); + + it('retries the staleness query then succeeds on the second attempt', async () => { + const client = makeStubClient(sandbox, { + rpc_url_inspector_stale_slices_for_site: [ + { data: null, error: { message: 'transient', status: 503 } }, + { data: [], error: null }, + ], + }); + const res = await runUrlInspectorRefresh( + { siteId: VALID_SITE_ID }, + context, + { client, sleepFn }, + ); + + expect(res.status).to.equal(200); + expect(client.rpc).to.have.been.calledTwice; + expect(sleepFn).to.have.been.calledOnce; + expect(context.log.warn).to.have.been.calledWithMatch(/retrying in/); + }); + + it('returns ok with stalenessFailed=true after attempts are exhausted', async () => { + // No throw — lets the next 30-min tick retry naturally. + const client = makeStubClient(sandbox, { + rpc_url_inspector_stale_slices_for_site: { + data: null, + error: { message: 'PGRST500', status: 500 }, + }, + }); + const res = await runUrlInspectorRefresh( + { siteId: VALID_SITE_ID }, + context, + { client, sleepFn, attempts: 2 }, + ); + + expect(res.status).to.equal(200); + const body = await res.json(); + expect(body).to.include({ + siteId: VALID_SITE_ID, + refreshed: 0, + failed: 0, + deferred: 0, + totalStale: 0, + stalenessFailed: true, + }); + expect(client.rpc).to.have.been.calledTwice; + expect(context.log.error).to.have.been.calledWithMatch(/staleness query failed/); + }); + }); + + describe('refresh loop', () => { + const stale2 = [ + { month_start: '2026-04-01', month_end: '2026-04-30' }, + { month_start: '2026-05-01', month_end: '2026-05-31' }, + ]; + + it('refreshes every stale month on the happy path', async () => { + const client = makeStubClient(sandbox, { + rpc_url_inspector_stale_slices_for_site: { data: stale2, error: null }, + wrpc_refresh_url_inspector_domain_stats: { data: null, error: null }, + }); + const res = await runUrlInspectorRefresh( + { siteId: VALID_SITE_ID }, + context, + { client, sleepFn }, + ); + + expect(res.status).to.equal(200); + expect(await res.json()).to.deep.equal({ + siteId: VALID_SITE_ID, refreshed: 2, failed: 0, deferred: 0, totalStale: 2, + }); + // 1 staleness + 2 refreshes + expect(client.rpc).to.have.callCount(3); + expect(client.rpc.secondCall).to.have.been.calledWith( + 'wrpc_refresh_url_inspector_domain_stats', + { p_site_id: VALID_SITE_ID, p_start_date: '2026-04-01', p_end_date: '2026-04-30' }, + ); + expect(client.rpc.thirdCall).to.have.been.calledWith( + 'wrpc_refresh_url_inspector_domain_stats', + { p_site_id: VALID_SITE_ID, p_start_date: '2026-05-01', p_end_date: '2026-05-31' }, + ); + }); + + it('retries one month, succeeds, then completes the other', async () => { + const client = makeStubClient(sandbox, { + rpc_url_inspector_stale_slices_for_site: { data: stale2, error: null }, + wrpc_refresh_url_inspector_domain_stats: [ + { data: null, error: { message: 'deadlock', status: 503 } }, // first month, attempt 1 -> fail + { data: null, error: null }, // first month, attempt 2 -> ok + { data: null, error: null }, // second month, attempt 1 -> ok + ], + }); + + const res = await runUrlInspectorRefresh( + { siteId: VALID_SITE_ID }, + context, + { client, sleepFn }, + ); + + expect(res.status).to.equal(200); + expect(await res.json()).to.deep.equal({ + siteId: VALID_SITE_ID, refreshed: 2, failed: 0, deferred: 0, totalStale: 2, + }); + // 1 staleness + 3 refresh calls (one was retried) + expect(client.rpc).to.have.callCount(4); + // one backoff between the failed attempt and its retry + expect(sleepFn).to.have.been.calledOnce; + }); + + it('isolates per-month failures: month 1 keeps failing, month 2 succeeds', async () => { + const client = makeStubClient(sandbox, { + rpc_url_inspector_stale_slices_for_site: { data: stale2, error: null }, + wrpc_refresh_url_inspector_domain_stats: [ + { data: null, error: { message: 'boom', status: 500 } }, // month 1, attempt 1 + { data: null, error: { message: 'boom', status: 500 } }, // month 1, attempt 2 -> declared failed + { data: null, error: null }, // month 2, attempt 1 -> ok + ], + }); + + const res = await runUrlInspectorRefresh( + { siteId: VALID_SITE_ID }, + context, + { client, sleepFn }, + ); + + expect(res.status).to.equal(200); + expect(await res.json()).to.deep.equal({ + siteId: VALID_SITE_ID, refreshed: 1, failed: 1, deferred: 0, totalStale: 2, + }); + expect(client.rpc).to.have.callCount(4); + expect(context.log.error).to.have.been.calledWithMatch( + /refresh failed for site.*month 2026-04-01/, + ); + // structured log line emitted + const errorLogLine = context.log.info.getCalls() + .map((c) => c.args[0]) + .find((m) => typeof m === 'string' && m.includes('"status":"error"')); + expect(errorLogLine).to.exist; + }); + + it('defers remaining months when the per-invocation budget is exhausted', async () => { + let nowOffset = 0; + const clock = sandbox.stub(Date, 'now').callsFake(() => 1_000_000 + nowOffset); + + const client = makeStubClient(sandbox, { + rpc_url_inspector_stale_slices_for_site: { data: stale2, error: null }, + wrpc_refresh_url_inspector_domain_stats: () => { + // Each successful refresh "advances" the clock past the budget + nowOffset += 200; + return { data: null, error: null }; + }, + }); + + const res = await runUrlInspectorRefresh( + { siteId: VALID_SITE_ID }, + context, + { + client, sleepFn, budgetMs: 100, + }, + ); + + expect(res.status).to.equal(200); + const body = await res.json(); + expect(body).to.deep.equal({ + siteId: VALID_SITE_ID, refreshed: 1, failed: 0, deferred: 1, totalStale: 2, + }); + expect(context.log.warn).to.have.been.calledWithMatch(/budget exhausted/); + + clock.restore(); + }); + + it('emits one structured success log line per refreshed month', async () => { + const client = makeStubClient(sandbox, { + rpc_url_inspector_stale_slices_for_site: { data: stale2, error: null }, + wrpc_refresh_url_inspector_domain_stats: { data: null, error: null }, + }); + + await runUrlInspectorRefresh({ siteId: VALID_SITE_ID }, context, { client, sleepFn }); + + const structured = context.log.info.getCalls() + .map((c) => c.args[0]) + .filter((m) => typeof m === 'string' && m.includes('"event":"url-inspector-refresh.refresh"')); + expect(structured).to.have.lengthOf(2); + for (const line of structured) { + const parsed = JSON.parse(line); + expect(parsed).to.have.property('siteId', VALID_SITE_ID); + expect(parsed).to.have.property('status', 'ok'); + expect(parsed).to.have.property('attempts'); + expect(parsed).to.have.property('durationMs'); + } + }); + }); +}); diff --git a/test/utils/postgrest-client.test.js b/test/utils/postgrest-client.test.js new file mode 100644 index 00000000..a9748b60 --- /dev/null +++ b/test/utils/postgrest-client.test.js @@ -0,0 +1,354 @@ +/* + * Copyright 2026 Adobe. All rights reserved. + * This file is licensed to you under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. You may obtain a copy + * of the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS + * OF ANY KIND, either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ + +import { expect, use } from 'chai'; +import sinon from 'sinon'; +import sinonChai from 'sinon-chai'; + +import { PostgrestClient, postgrestClientFromContext } from '../../src/utils/postgrest-client.js'; + +use(sinonChai); + +const BASE_URL = 'https://pgrst.example/v1'; +const API_KEY = 'fake.jwt.token'; + +function makeResponse({ + ok: okStatus = true, status = 200, body = null, text, +} = {}) { + return { + ok: okStatus, + status, + statusText: okStatus ? 'OK' : 'Error', + text: () => Promise.resolve(text ?? (body == null ? '' : JSON.stringify(body))), + }; +} + +describe('PostgrestClient', () => { + let sandbox; + let log; + let fetchStub; + + beforeEach(() => { + sandbox = sinon.createSandbox(); + log = { debug: sandbox.spy() }; + fetchStub = sandbox.stub(); + }); + + afterEach(() => sandbox.restore()); + + describe('constructor', () => { + it('throws when baseUrl is missing', () => { + expect(() => new PostgrestClient({ apiKey: API_KEY, fetchImpl: fetchStub })) + .to.throw(/baseUrl is required/); + }); + + it('throws when apiKey is missing', () => { + expect(() => new PostgrestClient({ baseUrl: BASE_URL, fetchImpl: fetchStub })) + .to.throw(/apiKey is required/); + }); + + it('throws when no fetch implementation is available', () => { + const originalFetch = globalThis.fetch; + delete globalThis.fetch; + try { + expect(() => new PostgrestClient({ baseUrl: BASE_URL, apiKey: API_KEY })) + .to.throw(/no fetch implementation available/); + } finally { + globalThis.fetch = originalFetch; + } + }); + + it('uses globalThis.fetch by default when one is available', () => { + const originalFetch = globalThis.fetch; + globalThis.fetch = () => {}; + try { + const client = new PostgrestClient({ baseUrl: BASE_URL, apiKey: API_KEY }); + expect(client.fetch).to.equal(globalThis.fetch); + } finally { + globalThis.fetch = originalFetch; + } + }); + + it('strips trailing slash from baseUrl', () => { + const client = new PostgrestClient({ + baseUrl: `${BASE_URL}/`, apiKey: API_KEY, fetchImpl: fetchStub, + }); + expect(client.baseUrl).to.equal(BASE_URL); + }); + }); + + describe('rpc — success cases', () => { + let client; + + beforeEach(() => { + client = new PostgrestClient({ + baseUrl: BASE_URL, apiKey: API_KEY, fetchImpl: fetchStub, log, + }); + }); + + it('POSTs to /rpc/ with named params, profile headers, and writer JWT', async () => { + fetchStub.resolves(makeResponse({ body: [{ month_start: '2026-04-01', month_end: '2026-04-30' }] })); + + const { data, error } = await client.rpc('rpc_url_inspector_stale_slices_for_site', { + p_site_id: 'abc', + p_max_months_back: 6, + }); + + expect(error).to.be.null; + expect(data).to.deep.equal([{ month_start: '2026-04-01', month_end: '2026-04-30' }]); + expect(fetchStub).to.have.been.calledOnce; + + const [url, init] = fetchStub.firstCall.args; + expect(url).to.equal(`${BASE_URL}/rpc/rpc_url_inspector_stale_slices_for_site`); + expect(init.method).to.equal('POST'); + expect(init.body).to.equal(JSON.stringify({ p_site_id: 'abc', p_max_months_back: 6 })); + expect(init.headers).to.include({ + 'Content-Type': 'application/json', + Accept: 'application/json', + 'Content-Profile': 'public', + 'Accept-Profile': 'public', + Authorization: `Bearer ${API_KEY}`, + }); + expect(init.signal).to.exist; + }); + + it('uses a custom schema when configured', async () => { + const customClient = new PostgrestClient({ + baseUrl: BASE_URL, apiKey: API_KEY, schema: 'mysticat', fetchImpl: fetchStub, log, + }); + fetchStub.resolves(makeResponse({ body: [] })); + + await customClient.rpc('rpc_foo', {}); + + const [, init] = fetchStub.firstCall.args; + expect(init.headers['Content-Profile']).to.equal('mysticat'); + expect(init.headers['Accept-Profile']).to.equal('mysticat'); + }); + + it('defaults to an empty params body when none are provided', async () => { + fetchStub.resolves(makeResponse({ body: null, text: '' })); + + const { data, error } = await client.rpc('rpc_ping'); + + expect(error).to.be.null; + expect(data).to.be.null; + expect(fetchStub.firstCall.args[1].body).to.equal('{}'); + }); + + it('parses scalar responses (e.g. true/false from a void RPC) as JSON', async () => { + fetchStub.resolves(makeResponse({ body: true })); + + const { data, error } = await client.rpc('wrpc_refresh_foo', { p_site_id: 'abc' }); + + expect(error).to.be.null; + expect(data).to.be.true; + }); + + it('returns raw text on responses that are not JSON', async () => { + fetchStub.resolves(makeResponse({ text: 'not-json' })); + + const { data, error } = await client.rpc('rpc_foo'); + + expect(error).to.be.null; + expect(data).to.equal('not-json'); + }); + }); + + describe('rpc — error cases', () => { + let client; + + beforeEach(() => { + client = new PostgrestClient({ + baseUrl: BASE_URL, apiKey: API_KEY, fetchImpl: fetchStub, log, + }); + }); + + it('returns the parsed PostgREST error body on a 4xx', async () => { + fetchStub.resolves(makeResponse({ + ok: false, + status: 404, + body: { + message: 'Could not find function in the schema cache', + code: 'PGRST202', + details: 'missing function', + hint: 'reload schema', + }, + })); + + const { data, error } = await client.rpc('rpc_missing'); + + expect(data).to.be.null; + expect(error).to.deep.include({ + message: 'Could not find function in the schema cache', + status: 404, + code: 'PGRST202', + details: 'missing function', + hint: 'reload schema', + }); + expect(log.debug).to.have.been.called; + }); + + it('falls back to a generic message when the 5xx body has no .message', async () => { + fetchStub.resolves(makeResponse({ + ok: false, + status: 503, + body: { code: 'XX000' }, + })); + + const { data, error } = await client.rpc('rpc_overloaded'); + + expect(data).to.be.null; + expect(error.status).to.equal(503); + expect(error.message).to.equal('PostgREST 503 on /rpc/rpc_overloaded'); + expect(error.code).to.equal('XX000'); + }); + + it('returns the network error in the error object (no throw)', async () => { + fetchStub.rejects(new Error('socket hang up')); + + const { data, error } = await client.rpc('rpc_anything'); + + expect(data).to.be.null; + expect(error.status).to.equal(0); + expect(error.message).to.equal('socket hang up'); + }); + + it('falls back to a generic message when the thrown error has no .message', async () => { + fetchStub.rejects({}); + + const { data, error } = await client.rpc('rpc_anything'); + + expect(data).to.be.null; + expect(error.status).to.equal(0); + expect(error.message).to.equal('PostgREST request failed'); + }); + + it('aborts the request after the configured timeout', async () => { + // never resolves; just records the AbortSignal so we can listen to it + let capturedSignal; + fetchStub.callsFake((_, init) => new Promise((_, reject) => { + capturedSignal = init.signal; + capturedSignal.addEventListener('abort', () => { + reject(Object.assign(new Error('aborted'), { name: 'AbortError' })); + }); + })); + + const tightClient = new PostgrestClient({ + baseUrl: BASE_URL, apiKey: API_KEY, fetchImpl: fetchStub, log, timeoutMs: 5, + }); + + const { data, error } = await tightClient.rpc('rpc_slow'); + + expect(data).to.be.null; + expect(error.message).to.equal('aborted'); + expect(capturedSignal.aborted).to.be.true; + }); + + it('aborts immediately when an already-aborted signal is supplied', async () => { + const ac = new AbortController(); + ac.abort(new Error('caller cancelled')); + + let observedAbort = false; + fetchStub.callsFake((_, init) => new Promise((_, reject) => { + observedAbort = init.signal.aborted; + reject(Object.assign(new Error('aborted'), { name: 'AbortError' })); + })); + + const { error } = await client.rpc('rpc_foo', {}, { signal: ac.signal }); + + expect(error.message).to.equal('aborted'); + expect(observedAbort).to.be.true; + }); + + it('aborts when an external signal aborts mid-flight', async () => { + const ac = new AbortController(); + let capturedSignal; + fetchStub.callsFake((_, init) => new Promise((_, reject) => { + capturedSignal = init.signal; + capturedSignal.addEventListener('abort', () => { + reject(Object.assign(new Error('aborted'), { name: 'AbortError' })); + }); + })); + + const promise = client.rpc('rpc_foo', {}, { signal: ac.signal }); + ac.abort(new Error('caller cancelled')); + const { error } = await promise; + + expect(error.message).to.equal('aborted'); + expect(capturedSignal.aborted).to.be.true; + }); + }); +}); + +describe('postgrestClientFromContext', () => { + let sandbox; + let fetchStub; + + beforeEach(() => { + sandbox = sinon.createSandbox(); + fetchStub = sandbox.stub(); + }); + + afterEach(() => sandbox.restore()); + + it('reads POSTGREST_URL/POSTGREST_API_KEY/POSTGREST_SCHEMA from context.env', () => { + const ctx = { + env: { + POSTGREST_URL: BASE_URL, + POSTGREST_API_KEY: API_KEY, + POSTGREST_SCHEMA: 'mysticat', + }, + log: { debug: sandbox.spy() }, + }; + const client = postgrestClientFromContext(ctx, { fetchImpl: fetchStub }); + expect(client.baseUrl).to.equal(BASE_URL); + expect(client.apiKey).to.equal(API_KEY); + expect(client.schema).to.equal('mysticat'); + }); + + it('defaults the schema to "public" when unset', () => { + const ctx = { + env: { POSTGREST_URL: BASE_URL, POSTGREST_API_KEY: API_KEY }, + log: { debug: sandbox.spy() }, + }; + const client = postgrestClientFromContext(ctx, { fetchImpl: fetchStub }); + expect(client.schema).to.equal('public'); + }); + + it('honours POSTGREST_TIMEOUT_MS when present', () => { + const ctx = { + env: { + POSTGREST_URL: BASE_URL, POSTGREST_API_KEY: API_KEY, POSTGREST_TIMEOUT_MS: '1234', + }, + log: { debug: sandbox.spy() }, + }; + const client = postgrestClientFromContext(ctx, { fetchImpl: fetchStub }); + expect(client.timeoutMs).to.equal(1234); + }); + + it('throws via the constructor when POSTGREST_URL is missing', () => { + const ctx = { env: { POSTGREST_API_KEY: API_KEY } }; + expect(() => postgrestClientFromContext(ctx, { fetchImpl: fetchStub })) + .to.throw(/baseUrl is required/); + }); + + it('throws via the constructor when POSTGREST_API_KEY is missing', () => { + const ctx = { env: { POSTGREST_URL: BASE_URL } }; + expect(() => postgrestClientFromContext(ctx, { fetchImpl: fetchStub })) + .to.throw(/apiKey is required/); + }); + + it('tolerates an undefined context (constructor still validates required fields)', () => { + expect(() => postgrestClientFromContext(undefined, { fetchImpl: fetchStub })) + .to.throw(/baseUrl is required/); + }); +});