diff --git a/.changeset/distributed-adaptor-install-lock.md b/.changeset/distributed-adaptor-install-lock.md new file mode 100644 index 000000000..8bf07317c --- /dev/null +++ b/.changeset/distributed-adaptor-install-lock.md @@ -0,0 +1,6 @@ +--- +'@openfn/engine-multi': minor +'@openfn/ws-worker': minor +--- + +Add a filesystem-based lock so multiple workers can safely share a single adaptor repo directory (e.g. an NFS mount or k8s PVC). The lock lives inside engine-multi's autoinstall and serialises installs across processes via a per-adaptor lockfile. It is enabled by default and can be disabled with `WORKER_NO_REPO_LOCK=true` (or `--no-repo-lock`) on ws-worker. The engine's `isInstalled` check has also been tightened to verify the adaptor's `node_modules` entry exists on disk, not just the repo `package.json` dependency entry — this catches half-installed adaptors left behind by a crashed worker. diff --git a/packages/engine-multi/package.json b/packages/engine-multi/package.json index 1e4a1f6c4..5a5883a5b 100644 --- a/packages/engine-multi/package.json +++ b/packages/engine-multi/package.json @@ -21,11 +21,14 @@ "@openfn/logger": "workspace:*", "@openfn/runtime": "workspace:*", "fast-safe-stringify": "^2.1.1", - "json-stream-stringify": "^3.1.6" + "json-stream-stringify": "^3.1.6", + "proper-lockfile": "^4.1.2" }, "devDependencies": { "@types/node": "^18.19.130", + "@types/proper-lockfile": "^4.1.4", "ava": "5.3.1", + "esmock": "^2.7.5", "tslib": "^2.8.1", "tsm": "^2.3.0", "tsup": "^8.5.1", diff --git a/packages/engine-multi/src/api/autoinstall.ts b/packages/engine-multi/src/api/autoinstall.ts index 6d351d9f2..4d186b5e7 100644 --- a/packages/engine-multi/src/api/autoinstall.ts +++ b/packages/engine-multi/src/api/autoinstall.ts @@ -1,3 +1,5 @@ +import path from 'node:path'; +import { stat } from 'node:fs/promises'; import { ensureRepo, getAliasedName, @@ -13,98 +15,102 @@ import type { Logger } from '@openfn/logger'; import { AUTOINSTALL_COMPLETE, AUTOINSTALL_ERROR } from '../events'; import { AutoinstallError } from '../errors'; import ExecutionContext from '../classes/ExecutionContext'; +import { withInstallLock } from '../util/repo-lock'; // none of these options should be on the plan actually export type AutoinstallOptions = { skipRepoValidation?: boolean; - handleInstall?(fn: string, repoDir: string, logger: Logger): Promise; - handleIsInstalled?( - fn: string, - repoDir: string, - logger: Logger - ): Promise; + lockRepo?: boolean; versionLookup?: (specifier: string) => Promise; }; -const pending: Record> = {}; +// Per-entry options are pinned at enqueue time so the worker that drains the +// queue doesn't accidentally use the *first* caller's closure. See PR #1416. +type QueueEntry = { + adaptors: string[]; + callback: (err?: any) => void; + context: ExecutionContext; + repoDir: string; + logger: Logger; + useLock: boolean; +}; let busy = false; +const queue: QueueEntry[] = []; + +const processQueue = async () => { + const next = queue.shift(); + if (next) { + busy = true; + await doAutoinstall(next); + processQueue(); + } else { + busy = false; + } +}; -const queue: Array<{ adaptors: string[]; callback: (err?: any) => void }> = []; - -const enqueue = (adaptors: string[]) => - new Promise((resolve) => { - queue.push({ adaptors, callback: resolve }); - }); +const doAutoinstall = async (entry: QueueEntry) => { + const { adaptors, callback, context, repoDir, logger, useLock } = entry; -// Install any modules for an Execution Plan that are not already installed -// This will enforce a queue ensuring only one module is installed at a time -// This fixes https://github.com/OpenFn/kit/issues/503 -const autoinstall = async (context: ExecutionContext): Promise => { - // TODO not a huge fan of these functions in the closure, but it's ok for now - const processQueue = async () => { - const next = queue.shift(); - if (next) { - busy = true; - const { adaptors, callback } = next; - await doAutoinstall(adaptors, callback); - processQueue(); - } else { - // do nothing - busy = false; + for (const a of adaptors) { + const { name, version } = getNameAndVersion(a); + if (await isInstalled(a, repoDir, logger)) { + continue; } - }; - - // This will actually do the autoinstall for an run (all adaptors) - const doAutoinstall = async ( - adaptors: string[], - onComplete: (err?: any) => void - ) => { - // Check whether we still need to do any work - for (const a of adaptors) { - const { name, version } = getNameAndVersion(a); - if (await isInstalledFn(a, repoDir, logger)) { - continue; - } - - const startTime = Date.now(); - try { - await installFn(a, repoDir, logger); - const duration = Date.now() - startTime; - logger.success(`autoinstalled ${a} in ${duration / 1000}s`); - context.emit(AUTOINSTALL_COMPLETE, { - module: name, - version: version!, - duration, - }); - } catch (e: any) { - delete pending[a]; - - logger.error(`ERROR autoinstalling ${a}: ${e.message}`); - logger.error(e); - const duration = Date.now() - startTime; - context.emit(AUTOINSTALL_ERROR, { - module: name, - version: version!, - duration, - message: e.message || e.toString(), + const startTime = Date.now(); + try { + if (useLock) { + // Re-check inside the lock so we skip work a peer worker + // completed while we were waiting. + await withInstallLock(repoDir, getAliasedName(a), logger, async () => { + if (await isInstalled(a, repoDir, logger)) { + logger.debug( + `another worker installed ${a} while waiting for lock; skipping` + ); + return; + } + await install(a, repoDir, logger); }); - - // Abort on the first error - return onComplete(new AutoinstallError(a, e)); + } else { + await install(a, repoDir, logger); } + + const duration = Date.now() - startTime; + logger.success(`autoinstalled ${a} in ${duration / 1000}s`); + context.emit(AUTOINSTALL_COMPLETE, { + module: name, + version: version!, + duration, + }); + } catch (e: any) { + logger.error(`ERROR autoinstalling ${a}: ${e.message}`); + logger.error(e); + const duration = Date.now() - startTime; + context.emit(AUTOINSTALL_ERROR, { + module: name, + version: version!, + duration, + message: e.message || e.toString(), + }); + + // Abort on the first error + return callback(new AutoinstallError(a, e)); } - onComplete(); - }; + } + callback(); +}; +// Install any modules for an Execution Plan that are not already installed +// This will enforce a queue ensuring only one module is installed at a time +// This fixes https://github.com/OpenFn/kit/issues/503 +const autoinstall = async (context: ExecutionContext): Promise => { const { logger, state, options } = context; const { plan } = state; const { repoDir, whitelist } = options; const autoinstallOptions = options.autoinstall || {}; + const useLock = autoinstallOptions.lockRepo !== false; - const installFn = autoinstallOptions?.handleInstall || install; - const isInstalledFn = autoinstallOptions?.handleIsInstalled || isInstalled; const versionlookup = autoinstallOptions?.versionLookup || getLatestVersion; let didValidateRepo = false; @@ -124,7 +130,7 @@ const autoinstall = async (context: ExecutionContext): Promise => { const adaptors = Array.from(identifyAdaptors(plan)); const paths: ModulePaths = {}; - const adaptorsToLoad = []; + const adaptorsToLoad: string[] = []; for (const a of adaptors) { // Ensure that this is not blacklisted if (whitelist && !whitelist.find((r) => r.exec(a))) { @@ -166,7 +172,7 @@ const autoinstall = async (context: ExecutionContext): Promise => { version: v, }; - if (!(await isInstalledFn(resolvedAdaptorName, repoDir, logger))) { + if (!(await isInstalled(resolvedAdaptorName, repoDir, logger))) { adaptorsToLoad.push(resolvedAdaptorName); } } @@ -185,8 +191,16 @@ const autoinstall = async (context: ExecutionContext): Promise => { } if (adaptorsToLoad.length) { - // Add this to the queue - const p = enqueue(adaptorsToLoad); + const p = new Promise((resolve) => { + queue.push({ + adaptors: adaptorsToLoad, + callback: resolve, + context, + repoDir, + logger, + useLock, + }); + }); if (!busy) { processQueue(); @@ -210,9 +224,17 @@ export default autoinstall; const install = (specifier: string, repoDir: string, logger: Logger) => runtimeInstall([specifier], repoDir, logger); -// The actual isInstalled function is not unit tested -// TODO this should probably all be handled (and tested) in @openfn/runtime -const isInstalled = async ( +const fileExists = async (p: string) => { + try { + await stat(p); + return true; + } catch { + return false; + } +}; + +// Exported for unit testing +export const isInstalled = async ( specifier: string, repoDir: string, logger: Logger @@ -232,7 +254,12 @@ const isInstalled = async ( const pkg = await loadRepoPkg(repoDir); if (pkg) { const { dependencies } = pkg; - return dependencies.hasOwnProperty(alias); + if (!dependencies.hasOwnProperty(alias)) { + return false; + } + return fileExists( + path.join(repoDir, 'node_modules', alias, 'package.json') + ); } }; diff --git a/packages/engine-multi/src/util/repo-lock.ts b/packages/engine-multi/src/util/repo-lock.ts new file mode 100644 index 000000000..527fef193 --- /dev/null +++ b/packages/engine-multi/src/util/repo-lock.ts @@ -0,0 +1,88 @@ +import path from 'node:path'; +import { mkdir, writeFile } from 'node:fs/promises'; +import lockfile from 'proper-lockfile'; +import type { Logger } from '@openfn/logger'; + +// k8s CPU throttling can delay the heartbeat setTimeout; 5 min avoids false-stale on tight clusters +const STALE_MS = 5 * 60_000; +// Retry ceiling exceeds STALE_MS so a dead lock-holder's stale window expires before we give up +const MAX_LOCK_WAIT_MS = STALE_MS + 60_000; +const LOCK_INTERVAL_MS = 2_000; +const UPDATE_MS = 5_000; + +const LOCK_RETRY_OPTIONS = { + retries: MAX_LOCK_WAIT_MS / LOCK_INTERVAL_MS, + factor: 1, + minTimeout: LOCK_INTERVAL_MS, + maxTimeout: LOCK_INTERVAL_MS, +}; + +const ensureLockTarget = async (target: string) => { + await mkdir(path.dirname(target), { recursive: true }); + try { + await writeFile(target, '', { flag: 'wx' }); + } catch (e: any) { + if (e.code !== 'EEXIST') throw e; + } +}; + +export const withInstallLock = async ( + repoDir: string, + alias: string, + logger: Logger, + fn: () => Promise +): Promise => { + // Defence-in-depth: refuse aliases that could escape the .locks directory. + // Upstream whitelist filtering should make this unreachable. + if (alias.split(/[/\\]/).includes('..') || path.isAbsolute(alias)) { + throw new Error(`Invalid alias for install lock: ${alias}`); + } + + const locksDir = path.join(repoDir, '.locks'); + const target = path.join(locksDir, `${alias}.lock`); + + await ensureLockTarget(target); + + logger.debug(`acquiring install lock for ${alias}`); + let release: () => Promise; + + const lockOpts = { stale: STALE_MS, update: UPDATE_MS, realpath: false }; + + try { + release = await lockfile.lock(target, { ...lockOpts, retries: 0 }); + } catch (e: any) { + if (e.code !== 'ELOCKED') throw e; + + logger.info( + `waiting for install lock on \`${alias}\` (another worker is installing)` + ); + + try { + release = await lockfile.lock(target, { + ...lockOpts, + retries: LOCK_RETRY_OPTIONS, + }); + } catch (e2: any) { + if (e2.code === 'ELOCKED') { + throw new Error( + `Lock acquisition timed out after ${ + MAX_LOCK_WAIT_MS / 1000 + }s waiting for ${alias}; another worker likely still installing (lock: ${target})` + ); + } + throw e2; + } + } + + logger.debug(`acquired install lock for ${alias}`); + + try { + await fn(); + } finally { + try { + await release(); + } catch (e) { + logger.warn(`failed to release install lock for ${alias}:`, e); + } + } +}; diff --git a/packages/engine-multi/test/api/autoinstall.test.ts b/packages/engine-multi/test/api/autoinstall.test.ts index 36d550164..dfaab7218 100644 --- a/packages/engine-multi/test/api/autoinstall.test.ts +++ b/packages/engine-multi/test/api/autoinstall.test.ts @@ -2,26 +2,24 @@ import test from 'ava'; import { createMockLogger } from '@openfn/logger'; import type { ExecutionPlan, Job } from '@openfn/runtime'; -import autoinstall, { +import path from 'node:path'; +import { mkdir, writeFile, rm } from 'node:fs/promises'; + +import { AutoinstallOptions, identifyAdaptors, + isInstalled, } from '../../src/api/autoinstall'; import { AUTOINSTALL_COMPLETE, AUTOINSTALL_ERROR } from '../../src/events'; import ExecutionContext from '../../src/classes/ExecutionContext'; import whitelist from '../../src/whitelist'; - -type PackageJson = { - name: string; - [x: string]: any; -}; - -const mockIsInstalled = (pkg: PackageJson) => async (specifier: string) => { - const alias = specifier.split('@').join('_'); - return pkg.dependencies.hasOwnProperty(alias); -}; - -const mockHandleInstall = async (_specifier: string): Promise => - new Promise((r) => r()).then(); +import { + createTmpRepo, + removeTmpRepo, + markInstalled, + loadAutoinstall, + createInstallStub, +} from '../util/autoinstall-helpers'; const logger = createMockLogger(); @@ -30,13 +28,11 @@ const wait = (duration = 10) => setTimeout(resolve, duration); }); -const mockAutoinstallOpts = { - handleInstall: mockHandleInstall, - handleIsInstalled: mockIsInstalled, - versionLookup: async () => '2.0.0', -}; - +// Build an ExecutionContext for a freshly-loaded autoinstall module. repoDir +// must already exist on disk because the real isInstalled / install code paths +// read and write to it. const createContext = ( + repoDir: string, autoinstallOpts: AutoinstallOptions = {}, jobs?: Partial[], customWhitelist?: RegExp[] @@ -61,11 +57,12 @@ const createContext = ( options: { logger, whitelist: customWhitelist || whitelist, - repoDir: 'tmp/repo', - + repoDir, // @ts-ignore autoinstall: { - ...mockAutoinstallOpts, + skipRepoValidation: true, + lockRepo: false, + versionLookup: async () => '2.0.0', ...autoinstallOpts, }, }, @@ -75,50 +72,24 @@ test.afterEach(() => { logger._reset(); }); -test('Autoinstall basically works', async (t) => { - const autoinstallOpts = { - handleInstall: mockHandleInstall, - handleIsInstalled: async () => false, - }; - const context = createContext(autoinstallOpts); - - const paths = await autoinstall(context); - t.log(paths); - t.deepEqual(paths, { - '@openfn/language-common@1.0.0': { - path: 'tmp/repo/node_modules/@openfn/language-common_1.0.0', - version: '1.0.0', - }, - }); -}); - -test('mock is installed: should be installed', async (t) => { - const isInstalled = mockIsInstalled({ - name: 'repo', - dependencies: { - 'x_1.0.0': 'path', - }, - }); - - const result = await isInstalled('x@1.0.0'); - t.true(result); -}); - -test('mock is installed: should not be installed', async (t) => { - const isInstalled = mockIsInstalled({ - name: 'repo', - dependencies: { - 'x_1.0.0': 'path', - }, - }); - - const result = await isInstalled('x@1.0.1'); - t.false(result); -}); - -test('mock install: should return async', async (t) => { - await mockHandleInstall('x@1.0.0'); - t.true(true); +test.serial('Autoinstall basically works', async (t) => { + const repoDir = await createTmpRepo(); + try { + const stub = createInstallStub({ seedOnInstall: true }); + const mod = await loadAutoinstall(stub.fn); + const context = createContext(repoDir); + + const paths = await mod.default(context); + t.deepEqual(paths, { + '@openfn/language-common@1.0.0': { + path: `${repoDir}/node_modules/@openfn/language-common_1.0.0`, + version: '1.0.0', + }, + }); + t.is(stub.calls.length, 1); + } finally { + await removeTmpRepo(repoDir); + } }); test('identifyAdaptors: pick out adaptors and remove duplicates', (t) => { @@ -148,492 +119,582 @@ test('identifyAdaptors: pick out adaptors and remove duplicates', (t) => { }); test.serial('autoinstall: handle @latest', async (t) => { - const jobs = [ - { - adaptors: ['x@latest'], - }, - ]; - - const context = createContext({}, jobs, [/x/]); + const repoDir = await createTmpRepo(); + try { + const stub = createInstallStub({ seedOnInstall: true }); + const mod = await loadAutoinstall(stub.fn); + const jobs = [{ adaptors: ['x@latest'] }]; + const context = createContext(repoDir, {}, jobs, [/x/]); - const result = await autoinstall(context); + const result = await mod.default(context); - t.deepEqual(result, { - 'x@latest': { - path: 'tmp/repo/node_modules/x_2.0.0', - version: '2.0.0', - }, - }); + t.deepEqual(result, { + 'x@latest': { + path: `${repoDir}/node_modules/x_2.0.0`, + version: '2.0.0', + }, + }); + } finally { + await removeTmpRepo(repoDir); + } }); test.serial('autoinstall: handle @next', async (t) => { - const jobs = [ - { - adaptors: ['x@next'], - }, - ]; - - const context = createContext({}, jobs, [/x/]); + const repoDir = await createTmpRepo(); + try { + const stub = createInstallStub({ seedOnInstall: true }); + const mod = await loadAutoinstall(stub.fn); + const jobs = [{ adaptors: ['x@next'] }]; + const context = createContext(repoDir, {}, jobs, [/x/]); - const result = await autoinstall(context); + const result = await mod.default(context); - t.deepEqual(result, { - 'x@next': { - path: 'tmp/repo/node_modules/x_2.0.0', - version: '2.0.0', - }, - }); + t.deepEqual(result, { + 'x@next': { + path: `${repoDir}/node_modules/x_2.0.0`, + version: '2.0.0', + }, + }); + } finally { + await removeTmpRepo(repoDir); + } }); -// This doesn't do anything except check that the mocks are installed -test.serial('autoinstall: should call both mock functions', async (t) => { - let didCallIsInstalled = false; - let didCallInstall = true; - - const mockIsInstalled = async () => { - didCallIsInstalled = true; - return false; - }; - const mockInstall = async () => { - didCallInstall = true; - return; - }; - - const autoinstallOpts = { - handleInstall: mockInstall, - handleIsInstalled: mockIsInstalled, - }; - const context = createContext(autoinstallOpts); +test.serial('autoinstall: install is invoked exactly once per adaptor', async (t) => { + const repoDir = await createTmpRepo(); + try { + const stub = createInstallStub({ seedOnInstall: true }); + const mod = await loadAutoinstall(stub.fn); + const context = createContext(repoDir); - await autoinstall(context); + await mod.default(context); - t.true(didCallIsInstalled); - t.true(didCallInstall); + t.is(stub.calls.length, 1); + t.is(stub.calls[0], '@openfn/language-common@1.0.0'); + } finally { + await removeTmpRepo(repoDir); + } }); -// TODO -// error handling -// Queue for multiple installs -// Queue for multiple installs of the same version -// don't autoinstall if it's already there - test.serial( 'autoinstall: only call install once if there are two concurrent install requests', async (t) => { - let callCount = 0; - - const installed: Record = {}; - - const mockInstall = (name: string) => - new Promise((resolve) => { - installed[name] = true; - callCount++; - setTimeout(() => resolve(), 20); - }); - - const options = { - skipRepoValidation: true, - handleInstall: mockInstall, - handleIsInstalled: async (name: string) => name in installed, - }; - - const context = createContext(options); - - await Promise.all([autoinstall(context), autoinstall(context)]); - - t.is(callCount, 1); + const repoDir = await createTmpRepo(); + try { + const stub = createInstallStub({ seedOnInstall: true, delayMs: 20 }); + const mod = await loadAutoinstall(stub.fn); + const c1 = createContext(repoDir); + const c2 = createContext(repoDir); + + await Promise.all([mod.default(c1), mod.default(c2)]); + + t.is(stub.calls.length, 1); + } finally { + await removeTmpRepo(repoDir); + } } ); test.serial('autoinstall: install in sequence', async (t) => { - const installed: Record = {}; - - const states: Record = {}; - - const mockInstall = (name: string) => - new Promise((resolve) => { - // Each time install is called, - // record the time the call was made - // and the install state - states[name] = { - time: new Date().getTime(), - installed: Object.keys(installed).map((s) => s.split('common@')[1]), - }; - installed[name] = true; - setTimeout(() => resolve(), 50); + const repoDir = await createTmpRepo(); + try { + const states: Record = {}; + const installed: string[] = []; + const stub = createInstallStub({ + seedOnInstall: true, + delayMs: 50, + onCall: (name) => { + states[name] = { + time: Date.now(), + installed: [...installed].map((s) => s.split('common@')[1]), + }; + installed.push(name); + }, }); - - const options = { - skipRepoValidation: true, - handleInstall: mockInstall, - handleIsInstalled: false, - } as any; - - const c1 = createContext(options, [ - { adaptors: ['@openfn/language-common@1'] }, - ]); - const c2 = createContext(options, [ - { adaptors: ['@openfn/language-common@2'] }, - ]); - const c3 = createContext(options, [ - { adaptors: ['@openfn/language-common@3'] }, - ]); - - autoinstall(c1); - await wait(1); - autoinstall(c2); - await wait(1); - await autoinstall(c3); - - const s1 = states['@openfn/language-common@1']; - const s2 = states['@openfn/language-common@2']; - const s3 = states['@openfn/language-common@3']; - - // Check that modules are installed in sequence - t.deepEqual(s1.installed, []); - t.deepEqual(s2.installed, ['1']); - t.deepEqual(s3.installed, ['1', '2']); - - // And check for a time gap between installs - t.true(s3.time - s2.time > 40); - t.true(s2.time - s1.time > 40); + const mod = await loadAutoinstall(stub.fn); + + const c1 = createContext(repoDir, {}, [ + { adaptors: ['@openfn/language-common@1'] }, + ]); + const c2 = createContext(repoDir, {}, [ + { adaptors: ['@openfn/language-common@2'] }, + ]); + const c3 = createContext(repoDir, {}, [ + { adaptors: ['@openfn/language-common@3'] }, + ]); + + mod.default(c1); + await wait(1); + mod.default(c2); + await wait(1); + await mod.default(c3); + + const s1 = states['@openfn/language-common@1']; + const s2 = states['@openfn/language-common@2']; + const s3 = states['@openfn/language-common@3']; + + // Check that modules are installed in sequence + t.deepEqual(s1.installed, []); + t.deepEqual(s2.installed, ['1']); + t.deepEqual(s3.installed, ['1', '2']); + + // And check for a time gap between installs + t.true(s3.time - s2.time > 40); + t.true(s2.time - s1.time > 40); + } finally { + await removeTmpRepo(repoDir); + } }); test('autoinstall: handle two seperate, non-overlapping installs', async (t) => { - const options = { - handleInstall: mockHandleInstall, - handleIsInstalled: async () => false, - }; - - const c1 = createContext(options, [ - { adaptors: ['@openfn/language-dhis2@1.0.0'] }, - ]); - const c2 = createContext(options, [ - { adaptors: ['@openfn/language-http@1.0.0'] }, - ]); - - const p1 = await autoinstall(c1); - t.deepEqual(p1, { - '@openfn/language-dhis2@1.0.0': { - path: 'tmp/repo/node_modules/@openfn/language-dhis2_1.0.0', - version: '1.0.0', - }, - }); + const repoDir = await createTmpRepo(); + try { + const stub = createInstallStub({ seedOnInstall: true }); + const mod = await loadAutoinstall(stub.fn); + + const c1 = createContext(repoDir, {}, [ + { adaptors: ['@openfn/language-dhis2@1.0.0'] }, + ]); + const c2 = createContext(repoDir, {}, [ + { adaptors: ['@openfn/language-http@1.0.0'] }, + ]); + + const p1 = await mod.default(c1); + t.deepEqual(p1, { + '@openfn/language-dhis2@1.0.0': { + path: `${repoDir}/node_modules/@openfn/language-dhis2_1.0.0`, + version: '1.0.0', + }, + }); - const p2 = await autoinstall(c2); - t.deepEqual(p2, { - '@openfn/language-http@1.0.0': { - path: 'tmp/repo/node_modules/@openfn/language-http_1.0.0', - version: '1.0.0', - }, - }); + const p2 = await mod.default(c2); + t.deepEqual(p2, { + '@openfn/language-http@1.0.0': { + path: `${repoDir}/node_modules/@openfn/language-http_1.0.0`, + version: '1.0.0', + }, + }); + } finally { + await removeTmpRepo(repoDir); + } }); test.serial( 'autoinstall: do not try to install blacklisted modules', async (t) => { - let callCount = 0; - - const mockInstall = () => - new Promise((resolve) => { - callCount++; - setTimeout(() => resolve(), 20); - }); - - const job = [ - { - adaptors: ['lodash@1.0.0'], - }, - ]; - - const options = { - skipRepoValidation: true, - handleInstall: mockInstall, - handleIsInstalled: async () => false, - }; + const repoDir = await createTmpRepo(); + try { + const stub = createInstallStub({ seedOnInstall: true }); + const mod = await loadAutoinstall(stub.fn); - const context = createContext(options, job); + const job = [{ adaptors: ['lodash@1.0.0'] }]; + const context = createContext(repoDir, {}, job); - await autoinstall(context); + await mod.default(context); - t.is(callCount, 0); + t.is(stub.calls.length, 0); + } finally { + await removeTmpRepo(repoDir); + } } ); test.serial('autoinstall: return a map to modules', async (t) => { - const jobs = [ - { - adaptors: ['@openfn/language-common@1.0.0'], - }, - { - adaptors: ['@openfn/language-http@1.0.0'], - }, - ]; + const repoDir = await createTmpRepo(); + try { + const stub = createInstallStub({ seedOnInstall: true }); + const mod = await loadAutoinstall(stub.fn); - const autoinstallOpts = { - skipRepoValidation: true, - handleInstall: async () => {}, - handleIsInstalled: async () => false, - }; - const context = createContext(autoinstallOpts, jobs); + const jobs = [ + { adaptors: ['@openfn/language-common@1.0.0'] }, + { adaptors: ['@openfn/language-http@1.0.0'] }, + ]; + const context = createContext(repoDir, {}, jobs); - const result = await autoinstall(context); + const result = await mod.default(context); - t.deepEqual(result, { - '@openfn/language-common@1.0.0': { - path: 'tmp/repo/node_modules/@openfn/language-common_1.0.0', - version: '1.0.0', - }, - '@openfn/language-http@1.0.0': { - path: 'tmp/repo/node_modules/@openfn/language-http_1.0.0', - version: '1.0.0', - }, - }); + t.deepEqual(result, { + '@openfn/language-common@1.0.0': { + path: `${repoDir}/node_modules/@openfn/language-common_1.0.0`, + version: '1.0.0', + }, + '@openfn/language-http@1.0.0': { + path: `${repoDir}/node_modules/@openfn/language-http_1.0.0`, + version: '1.0.0', + }, + }); + } finally { + await removeTmpRepo(repoDir); + } }); test.serial('autoinstall: write linker options back to the plan', async (t) => { - const jobs = [ - { - adaptors: ['@openfn/language-common@1.0.0'], - }, - { - adaptors: [ - '@openfn/language-common@2.0.0', - '@openfn/language-collections@1.0.0', - ], - }, - { - adaptors: ['@openfn/language-http@1.0.0'], - }, - ]; + const repoDir = await createTmpRepo(); + try { + const stub = createInstallStub({ seedOnInstall: true }); + const mod = await loadAutoinstall(stub.fn); - const autoinstallOpts = { - skipRepoValidation: true, - handleInstall: async () => {}, - handleIsInstalled: async () => false, - }; - const context = createContext(autoinstallOpts, jobs); + const jobs = [ + { adaptors: ['@openfn/language-common@1.0.0'] }, + { + adaptors: [ + '@openfn/language-common@2.0.0', + '@openfn/language-collections@1.0.0', + ], + }, + { adaptors: ['@openfn/language-http@1.0.0'] }, + ]; + const context = createContext(repoDir, {}, jobs); - await autoinstall(context); + await mod.default(context); - const [a, b, c] = context.state.plan.workflow.steps as Job[]; - t.deepEqual(a.linker, { - '@openfn/language-common': { - path: 'tmp/repo/node_modules/@openfn/language-common_1.0.0', - version: '1.0.0', - }, - }); - t.deepEqual(b.linker, { - '@openfn/language-common': { - path: 'tmp/repo/node_modules/@openfn/language-common_2.0.0', - version: '2.0.0', - }, - '@openfn/language-collections': { - path: 'tmp/repo/node_modules/@openfn/language-collections_1.0.0', - version: '1.0.0', - }, - }); - t.deepEqual(c.linker, { - '@openfn/language-http': { - path: 'tmp/repo/node_modules/@openfn/language-http_1.0.0', - version: '1.0.0', - }, - }); + const [a, b, c] = context.state.plan.workflow.steps as Job[]; + t.deepEqual(a.linker, { + '@openfn/language-common': { + path: `${repoDir}/node_modules/@openfn/language-common_1.0.0`, + version: '1.0.0', + }, + }); + t.deepEqual(b.linker, { + '@openfn/language-common': { + path: `${repoDir}/node_modules/@openfn/language-common_2.0.0`, + version: '2.0.0', + }, + '@openfn/language-collections': { + path: `${repoDir}/node_modules/@openfn/language-collections_1.0.0`, + version: '1.0.0', + }, + }); + t.deepEqual(c.linker, { + '@openfn/language-http': { + path: `${repoDir}/node_modules/@openfn/language-http_1.0.0`, + version: '1.0.0', + }, + }); + } finally { + await removeTmpRepo(repoDir); + } }); test.serial('autoinstall: support custom whitelist', async (t) => { - const whitelist = [/^y/]; - const jobs = [ - { + const repoDir = await createTmpRepo(); + try { + const stub = createInstallStub({ seedOnInstall: true }); + const mod = await loadAutoinstall(stub.fn); + + const customWhitelist = [/^y/]; + const jobs = [ // will be ignored - adaptors: ['x@1.0.0'], - }, - { + { adaptors: ['x@1.0.0'] }, // will be installed - adaptors: ['y@1.0.0'], - }, - ]; - - const autoinstallOpts = { - skipRepoValidation: true, - handleInstall: async () => {}, - handleIsInstalled: async () => false, - }; - const context = createContext(autoinstallOpts, jobs, whitelist); + { adaptors: ['y@1.0.0'] }, + ]; + const context = createContext(repoDir, {}, jobs, customWhitelist); - const result = await autoinstall(context); + const result = await mod.default(context); - t.deepEqual(result, { - 'y@1.0.0': { - path: 'tmp/repo/node_modules/y_1.0.0', - version: '1.0.0', - }, - }); + t.deepEqual(result, { + 'y@1.0.0': { + path: `${repoDir}/node_modules/y_1.0.0`, + version: '1.0.0', + }, + }); + } finally { + await removeTmpRepo(repoDir); + } }); test.serial('autoinstall: emit an event on completion', async (t) => { - let event: any; - const jobs = [ - { - adaptors: ['@openfn/language-common@1.0.0'], - version: '1.0.0', - }, - ]; + const repoDir = await createTmpRepo(); + try { + let event: any; + const stub = createInstallStub({ seedOnInstall: true, delayMs: 50 }); + const mod = await loadAutoinstall(stub.fn); - const autoinstallOpts = { - skipRepoValidation: true, - handleInstall: async () => new Promise((done) => setTimeout(done, 50)), - handleIsInstalled: async () => false, - } as any; - const context = createContext(autoinstallOpts, jobs); + const jobs = [ + { adaptors: ['@openfn/language-common@1.0.0'], version: '1.0.0' }, + ]; + const context = createContext(repoDir, {}, jobs); - context.on(AUTOINSTALL_COMPLETE, (evt) => { - event = evt; - }); + context.on(AUTOINSTALL_COMPLETE, (evt) => { + event = evt; + }); - await autoinstall(context); + await mod.default(context); - t.truthy(event); - t.is(event.module, '@openfn/language-common'); - t.is(event.version, '1.0.0'); - // Duration could be anything really as timeout is only loose - but so long as it's - // more than 10ms that implies it's called handleInstall and returned a reasonable value - t.assert(event.duration >= 10); + t.truthy(event); + t.is(event.module, '@openfn/language-common'); + t.is(event.version, '1.0.0'); + t.assert(event.duration >= 10); + } finally { + await removeTmpRepo(repoDir); + } }); test.serial('autoinstall: throw on error', async (t) => { - const mockIsInstalled = async () => false; - const mockInstall = async () => { - throw new Error('err'); - }; - - const autoinstallOpts = { - handleInstall: mockInstall, - handleIsInstalled: mockIsInstalled, - }; - const context = createContext(autoinstallOpts); + const repoDir = await createTmpRepo(); + try { + const stub = createInstallStub({ throws: new Error('err') }); + const mod = await loadAutoinstall(stub.fn); + const context = createContext(repoDir); - await t.throwsAsync(() => autoinstall(context), { - name: 'AutoinstallError', - message: 'Error installing @openfn/language-common@1.0.0: err', - }); + await t.throwsAsync(() => mod.default(context), { + name: 'AutoinstallError', + message: 'Error installing @openfn/language-common@1.0.0: err', + }); + } finally { + await removeTmpRepo(repoDir); + } }); test.serial('autoinstall: throw on error twice if pending', async (t) => { - return new Promise((done) => { - let callCount = 0; - let errCount = 0; - const mockIsInstalled = async () => false; - const mockInstall = async () => { - callCount++; - return new Promise((_resolve, reject) => { - setTimeout(() => reject(new Error('err')), 10); + const repoDir = await createTmpRepo(); + try { + return await new Promise(async (done) => { + let callCount = 0; + let errCount = 0; + const stub = createInstallStub({ + delayMs: 10, + onCall: () => { + callCount++; + }, + throws: () => new Error('err'), }); - }; - - const autoinstallOpts = { - handleInstall: mockInstall, - handleIsInstalled: mockIsInstalled, - } as any; - const context = createContext(autoinstallOpts); - - autoinstall(context).catch(assertCatches); - - autoinstall(context).catch(assertCatches); - - function assertCatches(e: any) { - t.is(e.name, 'AutoinstallError'); - errCount += 1; - if (errCount === 2) { - t.is(callCount, 2); - t.pass('threw twice!'); - done(); + const mod = await loadAutoinstall(stub.fn); + const context = createContext(repoDir); + + mod.default(context).catch(assertCatches); + mod.default(context).catch(assertCatches); + + function assertCatches(e: any) { + t.is(e.name, 'AutoinstallError'); + errCount += 1; + if (errCount === 2) { + t.is(callCount, 2); + t.pass('threw twice!'); + done(); + } } - } - }); + }); + } finally { + await removeTmpRepo(repoDir); + } }); test.serial('autoinstall: emit on error', async (t) => { - let evt: any; - const mockIsInstalled = async () => false; - const mockInstall = async () => { - throw new Error('err'); - }; + const repoDir = await createTmpRepo(); + try { + let evt: any; + const stub = createInstallStub({ throws: new Error('err') }); + const mod = await loadAutoinstall(stub.fn); + const context = createContext(repoDir); - const autoinstallOpts = { - handleInstall: mockInstall, - handleIsInstalled: mockIsInstalled, - }; - const context = createContext(autoinstallOpts); + context.on(AUTOINSTALL_ERROR, (e) => { + evt = e; + }); - context.on(AUTOINSTALL_ERROR, (e) => { - evt = e; - }); + try { + await mod.default(context); + } catch (e) { + // do nothing + } - try { - await autoinstall(context); - } catch (e) { - // do nothing + t.is(evt.module, '@openfn/language-common'); + t.is(evt.version, '1.0.0'); + t.is(evt.message, 'err'); + t.true(!isNaN(evt.duration)); + } finally { + await removeTmpRepo(repoDir); } - - t.is(evt.module, '@openfn/language-common'); - t.is(evt.version, '1.0.0'); - t.is(evt.message, 'err'); - t.true(!isNaN(evt.duration)); }); test.serial('autoinstall: throw twice in a row', async (t) => { - let callCount = 0; + const repoDir = await createTmpRepo(); + try { + let callCount = 0; + const stub = createInstallStub({ + delayMs: 1, + onCall: () => { + callCount++; + }, + throws: () => new Error('err'), + }); + const mod = await loadAutoinstall(stub.fn); + const context = createContext(repoDir); - const mockIsInstalled = async () => false; - const mockInstall = async () => { - callCount++; - return new Promise((_resolve, reject) => { - setTimeout(() => reject(new Error('err')), 1); + await t.throwsAsync(() => mod.default(context), { + name: 'AutoinstallError', + message: 'Error installing @openfn/language-common@1.0.0: err', }); - }; + t.is(callCount, 1); - const autoinstallOpts = { - handleInstall: mockInstall, - handleIsInstalled: mockIsInstalled, - } as any; - const context = createContext(autoinstallOpts); + await t.throwsAsync(() => mod.default(context), { + name: 'AutoinstallError', + message: 'Error installing @openfn/language-common@1.0.0: err', + }); + t.is(callCount, 2); + } finally { + await removeTmpRepo(repoDir); + } +}); - await t.throwsAsync(() => autoinstall(context), { - name: 'AutoinstallError', - message: 'Error installing @openfn/language-common@1.0.0: err', - }); - t.is(callCount, 1); +test.serial('write versions to context', async (t) => { + const repoDir = await createTmpRepo(); + try { + const stub = createInstallStub({ seedOnInstall: true }); + const mod = await loadAutoinstall(stub.fn); + const context = createContext(repoDir); - await t.throwsAsync(() => autoinstall(context), { - name: 'AutoinstallError', - message: 'Error installing @openfn/language-common@1.0.0: err', - }); - t.is(callCount, 2); + await mod.default(context); + + // @ts-ignore + t.deepEqual(context.versions['@openfn/language-common'], ['1.0.0']); + } finally { + await removeTmpRepo(repoDir); + } }); -test('write versions to context', async (t) => { - const autoinstallOpts = { - handleInstall: mockHandleInstall, - handleIsInstalled: async () => false, - }; - const context = createContext(autoinstallOpts); +test.serial( + "write versions to context even if we don't install", + async (t) => { + const repoDir = await createTmpRepo(); + try { + // Pre-seed installed state so isInstalled returns true and install is skipped. + await markInstalled(repoDir, '@openfn/language-common@1.0.0'); + const stub = createInstallStub(); + const mod = await loadAutoinstall(stub.fn); + const context = createContext(repoDir); - await autoinstall(context); + await mod.default(context); - // @ts-ignore - t.deepEqual(context.versions['@openfn/language-common'], ['1.0.0']); -}); + t.is(stub.calls.length, 0); + // @ts-ignore + t.deepEqual(context.versions['@openfn/language-common'], ['1.0.0']); + } finally { + await removeTmpRepo(repoDir); + } + } +); -test("write versions to context even if we don't install", async (t) => { - const autoinstallOpts = { - handleInstall: mockHandleInstall, - handleIsInstalled: async () => true, - }; - const context = createContext(autoinstallOpts); +// ---- strengthened isInstalled: also checks node_modules//package.json ---- - await autoinstall(context); +const SPECIFIER = '@openfn/language-http@6.5.0'; +const ALIAS = '@openfn/language-http_6.5.0'; - // @ts-ignore - t.deepEqual(context.versions['@openfn/language-common'], ['1.0.0']); -}); +const writeRepoPkg = async (dir: string, deps: Record) => { + await writeFile( + path.join(dir, 'package.json'), + JSON.stringify({ name: 'repo', dependencies: deps }) + ); +}; + +const seedNodeModule = async (dir: string, alias = ALIAS) => { + const modDir = path.join(dir, 'node_modules', alias); + await mkdir(modDir, { recursive: true }); + await writeFile(path.join(modDir, 'package.json'), '{}'); +}; + +test.serial( + 'isInstalled returns false when alias is in repo deps but node_modules pkg is missing', + async (t) => { + const dir = await createTmpRepo(); + try { + await writeRepoPkg(dir, { [ALIAS]: '6.5.0' }); + const result = await isInstalled(SPECIFIER, dir, logger); + t.false(result as boolean); + } finally { + await rm(dir, { recursive: true, force: true }); + } + } +); + +test.serial( + 'isInstalled returns true when alias is in deps AND node_modules pkg exists', + async (t) => { + const dir = await createTmpRepo(); + try { + await writeRepoPkg(dir, { [ALIAS]: '6.5.0' }); + await seedNodeModule(dir); + const result = await isInstalled(SPECIFIER, dir, logger); + t.true(result as boolean); + } finally { + await rm(dir, { recursive: true, force: true }); + } + } +); + +test.serial( + 'isInstalled returns false when alias is not in repo deps', + async (t) => { + const dir = await createTmpRepo(); + try { + await writeRepoPkg(dir, {}); + // node_modules exists but dep entry doesn't — half-installed. + await seedNodeModule(dir); + const result = await isInstalled(SPECIFIER, dir, logger); + t.false(result as boolean); + } finally { + await rm(dir, { recursive: true, force: true }); + } + } +); + +// ---- concurrent autoinstall: lock + in-lock re-check ---- + +// Drives autoinstall against a real tmp repoDir with the lock engaged. +// Spawns two concurrent install attempts; the lock + in-lock isInstalled +// re-check ensures only one actually invokes install. +test.serial( + 'autoinstall: lock + in-lock re-check skips duplicate installFn invocations', + async (t) => { + const repoDir = await createTmpRepo(); + try { + const stub = createInstallStub({ seedOnInstall: true, delayMs: 50 }); + const mod = await loadAutoinstall(stub.fn); + + // Build contexts inline so we can set lockRepo: true (default is false in + // createContext to keep unrelated tests fast). + const makeContext = () => + new ExecutionContext({ + state: { + id: 'x', + status: 'pending', + plan: { + workflow: { + steps: [ + { + adaptors: ['@openfn/language-common@1.0.0'], + expression: '.', + }, + ], + }, + options: {}, + }, + input: {}, + }, + logger, + // @ts-ignore + callWorker: async () => {}, + options: { + logger, + whitelist, + repoDir, + // @ts-ignore + autoinstall: { + skipRepoValidation: true, + // lock on by default — exercise the lock path here + }, + }, + }); + + const c1 = makeContext(); + const c2 = makeContext(); + + await Promise.all([mod.default(c1), mod.default(c2)]); + + t.is(stub.calls.length, 1, 'install should run exactly once across both contexts'); + } finally { + await removeTmpRepo(repoDir); + } + } +); diff --git a/packages/engine-multi/test/api/execute.test.ts b/packages/engine-multi/test/api/execute.test.ts index 8ec2a9ea6..95188b1e4 100644 --- a/packages/engine-multi/test/api/execute.test.ts +++ b/packages/engine-multi/test/api/execute.test.ts @@ -62,8 +62,8 @@ const plan = { const options = { autoinstall: { - handleInstall: async () => {}, - handleIsInstalled: async () => false, + // No adaptors in these plans, so autoinstall has nothing to do. + skipRepoValidation: true, }, } as Partial; diff --git a/packages/engine-multi/test/engine.test.ts b/packages/engine-multi/test/engine.test.ts index a48e13046..8a07153b2 100644 --- a/packages/engine-multi/test/engine.test.ts +++ b/packages/engine-multi/test/engine.test.ts @@ -16,7 +16,9 @@ const options = { logger, repoDir: '.', // doesn't matter for the mock autoinstall: { - handleIsInstalled: async () => true, + // No adaptors are used in these tests so the install path is never hit; + // skipping repo validation keeps autoinstall from touching the cwd. + skipRepoValidation: true, }, }; diff --git a/packages/engine-multi/test/errors.test.ts b/packages/engine-multi/test/errors.test.ts index b904939f7..d4e8e3fe0 100644 --- a/packages/engine-multi/test/errors.test.ts +++ b/packages/engine-multi/test/errors.test.ts @@ -15,8 +15,9 @@ test.before(async () => { logger, repoDir: path.resolve('./test/__repo__'), autoinstall: { - // disable autoinstall - handleIsInstalled: async () => true, + // Fixture repo has the helper adaptor pre-installed; skip repo validation + // so autoinstall never tries to rewrite the fixture's package.json. + skipRepoValidation: true, }, maxWorkers: 1, memoryLimitMb: 200, diff --git a/packages/engine-multi/test/util/autoinstall-helpers.ts b/packages/engine-multi/test/util/autoinstall-helpers.ts new file mode 100644 index 000000000..65a67c6ab --- /dev/null +++ b/packages/engine-multi/test/util/autoinstall-helpers.ts @@ -0,0 +1,89 @@ +import path from 'node:path'; +import os from 'node:os'; +import { mkdir, mkdtemp, rm, writeFile, readFile } from 'node:fs/promises'; +import esmock from 'esmock'; +import * as runtime from '@openfn/runtime'; +import { getAliasedName, getNameAndVersion } from '@openfn/runtime'; + +// Helpers for autoinstall tests. We exercise the real `isInstalled` by +// seeding a tmp repoDir on disk; the engine's autoinstall reads the repo's +// package.json and stats node_modules//package.json. + +export const createTmpRepo = async () => { + const dir = await mkdtemp(path.join(os.tmpdir(), 'engine-autoinstall-')); + await writeFile( + path.join(dir, 'package.json'), + JSON.stringify({ name: 'repo', dependencies: {} }) + ); + return dir; +}; + +export const removeTmpRepo = (dir: string) => + rm(dir, { recursive: true, force: true }); + +// Mark a specifier as installed by writing the dep into package.json and +// seeding node_modules//package.json. The alias matches @openfn/runtime's +// getAliasedName output. +export const markInstalled = async (repoDir: string, specifier: string) => { + const alias = getAliasedName(specifier); + const pkgPath = path.join(repoDir, 'package.json'); + const raw = await readFile(pkgPath, 'utf8'); + const pkg = JSON.parse(raw); + pkg.dependencies = pkg.dependencies || {}; + const { version } = getNameAndVersion(specifier); + pkg.dependencies[alias] = version || '*'; + await writeFile(pkgPath, JSON.stringify(pkg)); + const modDir = path.join(repoDir, 'node_modules', alias); + await mkdir(modDir, { recursive: true }); + await writeFile(path.join(modDir, 'package.json'), '{}'); +}; + +export type InstallImpl = ( + specifiers: string[], + repoDir: string, + logger: any +) => Promise; + +export type AutoinstallModule = typeof import('../../src/api/autoinstall'); + +// Load autoinstall with @openfn/runtime's install swapped out so tests can +// control install behaviour without touching npm. Each call returns a fresh +// module so module-level queue/busy state is isolated between tests. +export const loadAutoinstall = async ( + installImpl: InstallImpl +): Promise => + esmock('../../src/api/autoinstall.ts', { + '@openfn/runtime': { + ...runtime, + install: installImpl, + }, + }); + +// Returns a stubbed install + counters. Pass `seedOnInstall: true` to write +// node_modules//package.json + repo dep entry so the real `isInstalled` +// returns true after the stub runs. +export const createInstallStub = (opts: { + seedOnInstall?: boolean; + delayMs?: number; + onCall?: (specifier: string) => void; + throws?: Error | (() => Error); +} = {}) => { + const calls: string[] = []; + const fn: InstallImpl = async (specifiers, repoDir) => { + for (const s of specifiers) { + calls.push(s); + opts.onCall?.(s); + if (opts.delayMs) { + await new Promise((r) => setTimeout(r, opts.delayMs)); + } + if (opts.throws) { + throw typeof opts.throws === 'function' ? opts.throws() : opts.throws; + } + if (opts.seedOnInstall) { + await markInstalled(repoDir, s); + } + } + }; + return { fn, calls }; +}; + diff --git a/packages/engine-multi/test/util/repo-lock-worker.ts b/packages/engine-multi/test/util/repo-lock-worker.ts new file mode 100644 index 000000000..f05537c56 --- /dev/null +++ b/packages/engine-multi/test/util/repo-lock-worker.ts @@ -0,0 +1,152 @@ +/** + * Worker script for cross-process repo-lock tests in engine-multi. + * + * Drives `withInstallLock` directly (the new public lock surface) rather than + * any wrapping handler. The install marker is `node_modules//package.json` + * — npm's per-package extraction is atomic so its presence is sufficient proof + * of a successful install. + * + * Config (env vars): + * REPO_DIR - path to the shared tmpdir + * SPECIFIER - package specifier to install + * WORKER_MODE - one of: race | double-check | partial | default + * INSTALL_DELAY - ms to sleep inside the install fn (default 0) + * INSTALL_FAIL - if '1', the install fn throws + * INSTALL_NOOP - if '1', resolves without seeding node_modules + * + * IPC out -> parent: + * { event: 'ready' } - module loaded + * { event: 'install-start', t } - inside install fn + * { event: 'lock-acquired' } - inside lock, before work (double-check) + * { event: 'lock-attempting', t } - about to acquire lock + * { event: 'done', installCount, skipped?, wasInstalled?, installedAfter?, error? } + * + * IPC in <- parent: + * { event: 'go' } - start the lock attempt (race) + * { event: 'proceed' } - complete the install fn (double-check) + */ + +import path from 'node:path'; +import { mkdir, writeFile, stat } from 'node:fs/promises'; +import { createMockLogger } from '@openfn/logger'; +import { getAliasedName } from '@openfn/runtime'; +import { withInstallLock } from '../../src/util/repo-lock.js'; + +const repoDir = process.env.REPO_DIR!; +const specifier = process.env.SPECIFIER!; +const installDelay = parseInt(process.env.INSTALL_DELAY ?? '0', 10); +const installFail = process.env.INSTALL_FAIL === '1'; +const installNoop = process.env.INSTALL_NOOP === '1'; +const workerMode = process.env.WORKER_MODE ?? 'default'; + +const logger = createMockLogger(); +const alias = getAliasedName(specifier); + +const send = (msg: Record) => { + if (process.send) process.send(msg); +}; + +const waitForMessage = (eventName: string): Promise => + new Promise((resolve) => { + const handler = (msg: any) => { + if (msg?.event === eventName) { + process.off('message', handler); + resolve(); + } + }; + process.on('message', handler); + }); + +const seedModule = async () => { + const dir = path.join(repoDir, 'node_modules', alias); + await mkdir(dir, { recursive: true }); + await writeFile(path.join(dir, 'package.json'), '{}'); +}; + +const moduleInstalled = async () => { + try { + await stat(path.join(repoDir, 'node_modules', alias, 'package.json')); + return true; + } catch { + return false; + } +}; + +const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms)); + +async function main() { + let installCount = 0; + + let installFn: () => Promise; + + if (workerMode === 'double-check') { + // Acquire lock, signal parent, wait for 'proceed', then complete. + installFn = async () => { + installCount++; + const proceedPromise = waitForMessage('proceed'); + send({ event: 'lock-acquired' }); + await proceedPromise; + await seedModule(); + if (installDelay > 0) await sleep(installDelay); + }; + } else { + installFn = async () => { + send({ event: 'install-start', t: Date.now() }); + installCount++; + if (installFail) { + throw new Error('npm exploded'); + } + if (installDelay > 0) await sleep(installDelay); + if (!installNoop) { + await seedModule(); + } + }; + } + + // Wrap fn with the same double-check pattern autoinstall uses inside the lock: + // re-check installed state before running the install fn. + const lockedInstall = async () => { + await withInstallLock(repoDir, alias, logger, async () => { + if (await moduleInstalled()) { + return; + } + await installFn(); + }); + }; + + const goPromise = workerMode === 'race' ? waitForMessage('go') : null; + + send({ event: 'ready' }); + + if (goPromise) { + await goPromise; + } + + if (workerMode === 'partial') { + const wasInstalled = await moduleInstalled(); + send({ event: 'lock-attempting', t: Date.now() }); + try { + await lockedInstall(); + } catch (e: any) { + send({ event: 'done', wasInstalled, installCount, error: e.message }); + return; + } + const installedAfter = await moduleInstalled(); + send({ event: 'done', wasInstalled, installCount, installedAfter }); + return; + } + + send({ event: 'lock-attempting', t: Date.now() }); + + try { + await lockedInstall(); + const skipped = installCount === 0; + send({ event: 'done', installCount, skipped }); + } catch (e: any) { + send({ event: 'done', installCount, error: e.message }); + } +} + +main().catch((e) => { + send({ event: 'done', installCount: 0, error: e.message }); +}); diff --git a/packages/engine-multi/test/util/repo-lock.test.ts b/packages/engine-multi/test/util/repo-lock.test.ts new file mode 100644 index 000000000..2b54aa3e5 --- /dev/null +++ b/packages/engine-multi/test/util/repo-lock.test.ts @@ -0,0 +1,327 @@ +/** + * Cross-process integration tests for withInstallLock. + * + * Each test forks real child processes pointing at the same tmpdir to exercise + * the proper-lockfile filesystem lock path that in-process tests cannot reach. + * IPC barriers keep ordering deterministic. + */ + +import test from 'ava'; +import path from 'node:path'; +import os from 'node:os'; +import { mkdir, mkdtemp, rm, stat } from 'node:fs/promises'; +import { fork, type ChildProcess } from 'node:child_process'; +import { fileURLToPath } from 'node:url'; + +const __dirname = path.dirname(fileURLToPath(import.meta.url)); + +const WORKER_SCRIPT = path.resolve(__dirname, 'repo-lock-worker.ts'); + +const EXEC_ARGV = ['--import=@swc-node/register/esm-register', '--no-warnings']; + +const SPECIFIER_A = '@openfn/language-http@6.5.0'; +const ALIAS_A = '@openfn/language-http_6.5.0'; + +const fileExists = async (p: string) => { + try { + await stat(p); + return true; + } catch { + return false; + } +}; + +interface WorkerMessage { + event: string; + [key: string]: unknown; +} + +function spawnWorker( + env: Record +): { child: ChildProcess; messages: WorkerMessage[] } { + const messages: WorkerMessage[] = []; + + const child = fork(WORKER_SCRIPT, [], { + execArgv: EXEC_ARGV, + env: { ...process.env, ...env }, + }); + + child.on('message', (msg: any) => { + messages.push(msg as WorkerMessage); + }); + + return { child, messages }; +} + +function waitForEvent( + worker: { child: ChildProcess; messages: WorkerMessage[] }, + eventName: string, + timeoutMs = 8000 +): Promise { + const buffered = worker.messages.find((m) => m.event === eventName); + if (buffered) return Promise.resolve(buffered); + + return new Promise((resolve, reject) => { + const timer = setTimeout( + () => reject(new Error(`Timed out waiting for event '${eventName}'`)), + timeoutMs + ); + const handler = (msg: any) => { + if (msg?.event === eventName) { + clearTimeout(timer); + worker.child.off('message', handler); + resolve(msg as WorkerMessage); + } + }; + worker.child.on('message', handler); + }); +} + +function collectDone( + children: ChildProcess[], + count: number, + timeoutMs = 8000 +): Promise { + return new Promise((resolve, reject) => { + const results: WorkerMessage[] = []; + const timer = setTimeout( + () => + reject( + new Error( + `Timed out waiting for ${count} done messages (got ${results.length})` + ) + ), + timeoutMs + ); + const handler = (msg: any) => { + if (msg?.event === 'done') { + results.push(msg); + if (results.length >= count) { + clearTimeout(timer); + for (const c of children) c.off('message', handler); + resolve(results); + } + } + }; + for (const c of children) c.on('message', handler); + }); +} + +function waitAllReady(children: ChildProcess[], timeoutMs = 8000): Promise { + return new Promise((resolve, reject) => { + const timer = setTimeout( + () => reject(new Error('Timed out waiting for all workers to be ready')), + timeoutMs + ); + let remaining = children.length; + for (const c of children) { + const handler = (msg: any) => { + if (msg?.event === 'ready') { + c.off('message', handler); + remaining--; + if (remaining === 0) { + clearTimeout(timer); + resolve(); + } + } + }; + c.on('message', handler); + } + }); +} + +test.beforeEach(async (t) => { + const dir = await mkdtemp(path.join(os.tmpdir(), 'engine-multi-mp-lock-')); + t.context = { dir, children: [] as ChildProcess[] }; +}); + +test.afterEach.always(async (t) => { + const { dir, children } = t.context as { + dir: string; + children: ChildProcess[]; + }; + await Promise.all( + children.map((c) => { + if (c.exitCode !== null || c.killed) return Promise.resolve(); + return new Promise((r) => { + c.once('exit', () => r()); + c.kill(); + }); + }) + ); + await rm(dir, { recursive: true, force: true }); +}); + +function track(t: any, ...workers: ReturnType[]) { + const ctx = t.context as { children: ChildProcess[] }; + for (const w of workers) ctx.children.push(w.child); + return workers; +} + +// Scenario: race — multiple workers attempt the lock at the same time. +test.serial( + 'race: exactly one of N concurrent workers runs installFn', + async (t) => { + const { dir } = t.context as { dir: string }; + + const env = { + REPO_DIR: dir, + SPECIFIER: SPECIFIER_A, + WORKER_MODE: 'race', + INSTALL_DELAY: '500', + }; + + const workers = [spawnWorker(env), spawnWorker(env), spawnWorker(env)]; + track(t, ...workers); + + await waitAllReady(workers.map((w) => w.child)); + for (const { child } of workers) child.send({ event: 'go' }); + + const dones = await collectDone(workers.map((w) => w.child), 3); + + const totalInstalls = dones.reduce( + (s, m) => s + ((m.installCount as number) ?? 0), + 0 + ); + const installStartEvents = workers.flatMap((w) => + w.messages.filter((m) => m.event === 'install-start') + ); + + t.is(totalInstalls, 1, 'exactly one worker should have run installFn'); + t.is(installStartEvents.length, 1, 'exactly one install-start emitted'); + t.true( + await fileExists(path.join(dir, 'node_modules', ALIAS_A, 'package.json')), + 'node_modules pkg must exist' + ); + } +); + +// Scenario: double-check — A holds the lock, B waits, A finishes, B re-checks and skips. +test.serial( + 'double-check: waiting worker skips install after winner finishes', + async (t) => { + const { dir } = t.context as { dir: string }; + + const wA = spawnWorker({ + REPO_DIR: dir, + SPECIFIER: SPECIFIER_A, + WORKER_MODE: 'double-check', + }); + track(t, wA); + + await waitForEvent(wA, 'lock-acquired'); + + const wB = spawnWorker({ + REPO_DIR: dir, + SPECIFIER: SPECIFIER_A, + WORKER_MODE: 'default', + }); + track(t, wB); + + await waitForEvent(wB, 'lock-attempting'); + + wA.child.send({ event: 'proceed' }); + + await collectDone([wA.child, wB.child], 2); + + const doneA = wA.messages.find((m) => m.event === 'done')!; + const doneB = wB.messages.find((m) => m.event === 'done')!; + + t.is(doneA.installCount, 1, 'worker A should have run installFn once'); + t.true(doneB.skipped as boolean, 'worker B should have skipped (re-check)'); + t.is(doneB.installCount, 0, 'worker B installCount should be 0'); + + t.true( + await fileExists(path.join(dir, 'node_modules', ALIAS_A, 'package.json')) + ); + } +); + +// Scenario: partial — node_modules pkg absent triggers install; present afterwards. +test.serial( + 'partial: missing node_modules pkg triggers install; present afterwards', + async (t) => { + const { dir } = t.context as { dir: string }; + + await mkdir(path.join(dir, '.locks'), { recursive: true }); + + const w = spawnWorker({ + REPO_DIR: dir, + SPECIFIER: SPECIFIER_A, + WORKER_MODE: 'partial', + }); + track(t, w); + + const done = await waitForEvent(w, 'done'); + + t.false( + done.wasInstalled as boolean, + 'should not be installed before the lock runs' + ); + t.is(done.installCount, 1, 'install should have run once'); + t.true( + done.installedAfter as boolean, + 'should be installed once install fn has seeded node_modules' + ); + t.true( + await fileExists(path.join(dir, 'node_modules', ALIAS_A, 'package.json')) + ); + } +); + +// Scenario: default — single worker, no contention, just works. +test.serial('default: single worker installs cleanly', async (t) => { + const { dir } = t.context as { dir: string }; + + const w = spawnWorker({ + REPO_DIR: dir, + SPECIFIER: SPECIFIER_A, + WORKER_MODE: 'default', + }); + track(t, w); + + const done = await waitForEvent(w, 'done'); + + t.falsy(done.error, `should complete without error, got: ${done.error}`); + t.is(done.installCount, 1, 'install should have run once'); + t.true( + await fileExists(path.join(dir, 'node_modules', ALIAS_A, 'package.json')) + ); +}); + +// Sanity check: install failure releases the lockdir so retries are possible. +test.serial( + 'failure: install error releases the lock for a retry', + async (t) => { + const { dir } = t.context as { dir: string }; + + const wA = spawnWorker({ + REPO_DIR: dir, + SPECIFIER: SPECIFIER_A, + WORKER_MODE: 'default', + INSTALL_FAIL: '1', + }); + track(t, wA); + + const doneA = await waitForEvent(wA, 'done'); + t.truthy(doneA.error, 'child A should report an error'); + t.false( + await fileExists(path.join(dir, '.locks', `${ALIAS_A}.lock.lock`)), + 'proper-lockfile lockdir must be released after install throws' + ); + + const wB = spawnWorker({ + REPO_DIR: dir, + SPECIFIER: SPECIFIER_A, + WORKER_MODE: 'default', + }); + track(t, wB); + + const doneB = await waitForEvent(wB, 'done'); + t.falsy(doneB.error); + t.is(doneB.installCount, 1, 'second worker should have run install'); + t.true( + await fileExists(path.join(dir, 'node_modules', ALIAS_A, 'package.json')) + ); + } +); diff --git a/packages/engine-multi/test/util/repo-lock.unit.test.ts b/packages/engine-multi/test/util/repo-lock.unit.test.ts new file mode 100644 index 000000000..a39e4cd12 --- /dev/null +++ b/packages/engine-multi/test/util/repo-lock.unit.test.ts @@ -0,0 +1,43 @@ +/** + * In-process unit tests for withInstallLock alias validation. + * + * The sibling `repo-lock.test.ts` exercises the real cross-process filesystem + * lock via forked workers; these tests focus on the alias guard. + */ + +import test from 'ava'; +import path from 'node:path'; +import os from 'node:os'; +import { mkdtemp, rm } from 'node:fs/promises'; +import { createMockLogger } from '@openfn/logger'; + +import { withInstallLock } from '../../src/util/repo-lock'; + +const logger = createMockLogger(); + +const shouldNotRun = async () => { + throw new Error('install fn must not run when alias is rejected'); +}; + +test('rejects ".." as a path component but allows it as a substring', async (t) => { + await t.throwsAsync( + () => withInstallLock('/tmp/repo', '../evil', logger, shouldNotRun), + { message: /Invalid alias for install lock/ } + ); + + const dir = await mkdtemp(path.join(os.tmpdir(), 'engine-multi-lock-unit-')); + t.teardown(() => rm(dir, { recursive: true, force: true })); + + let ran = false; + await withInstallLock(dir, 'foo..bar', logger, async () => { + ran = true; + }); + t.true(ran); +}); + +test('rejects absolute-path alias', async (t) => { + await t.throwsAsync( + () => withInstallLock('/tmp/repo', '/etc/passwd', logger, shouldNotRun), + { message: /Invalid alias for install lock/ } + ); +}); diff --git a/packages/ws-worker/package.json b/packages/ws-worker/package.json index 25e8aa448..17b2466b0 100644 --- a/packages/ws-worker/package.json +++ b/packages/ws-worker/package.json @@ -47,6 +47,7 @@ "@types/phoenix": "^1.6.7", "@types/yargs": "^17.0.35", "ava": "5.3.1", + "esmock": "^2.7.5", "nodemon": "3.1.14", "sentry-testkit": "^6.2.2", "tslib": "^2.8.1", diff --git a/packages/ws-worker/src/start.ts b/packages/ws-worker/src/start.ts index 9d901c7af..5a0f4eeb5 100644 --- a/packages/ws-worker/src/start.ts +++ b/packages/ws-worker/src/start.ts @@ -108,6 +108,13 @@ if (args.mock) { engineReady(engine); }); } else { + const lockRepo = args.repoLock !== false; + if (lockRepo && !args.repoDir) { + logger.warn( + 'WARNING: repo lock is enabled but --repo-dir is not set; lock will be a no-op' + ); + } + const engineOptions = { repoDir: args.repoDir, memoryLimitMb: args.runMemory, @@ -118,6 +125,7 @@ if (args.mock) { workerValidationRetries: args.engineValidationRetries, profile: args.profile, profilePollInterval: args.profilePollIntervalMs, + autoinstall: { lockRepo }, }; logger.debug('Creating runtime engine...'); logger.debug('Engine options:', engineOptions); diff --git a/packages/ws-worker/src/util/cli.ts b/packages/ws-worker/src/util/cli.ts index f8d27a126..12af50b2e 100644 --- a/packages/ws-worker/src/util/cli.ts +++ b/packages/ws-worker/src/util/cli.ts @@ -37,6 +37,7 @@ type Args = { profile?: boolean; profilePollIntervalMs?: number; repoDir?: string; + repoLock?: boolean; runMemory?: number; secret?: string; sentryDsn?: string; @@ -96,6 +97,7 @@ export default function parseArgs(argv: string[]): Args { WORKER_PROFILE_POLL_INTERVAL_MS, WORKER_PROFILE, WORKER_REPO_DIR, + WORKER_NO_REPO_LOCK, WORKER_SECRET, WORKER_SENTRY_DSN, WORKER_SENTRY_ENV, @@ -129,6 +131,11 @@ export default function parseArgs(argv: string[]): Args { description: 'Path to the runtime repo (where modules will be installed). Env: WORKER_REPO_DIR', }) + .option('repo-lock', { + description: + 'Enable a cross-worker filesystem lock that coordinates adaptor installs when multiple workers share a single --repo-dir. On by default; pass --no-repo-lock to disable. Env: WORKER_NO_REPO_LOCK=true also disables it.', + type: 'boolean', + }) .option('monorepo-dir', { alias: 'm', description: @@ -306,6 +313,11 @@ export default function parseArgs(argv: string[]): Args { 'ws://localhost:4000/worker' ), repoDir: setArg(args.repoDir, WORKER_REPO_DIR), + repoLock: + args.repoLock ?? + (WORKER_NO_REPO_LOCK !== undefined + ? !(WORKER_NO_REPO_LOCK === 'true' || WORKER_NO_REPO_LOCK === '1') + : true), batchLogs: setArg(args.batchLogs, WORKER_BATCH_LOGS, false), batchInterval: setArg(args.batchInterval, WORKER_BATCH_INTERVAL, 10), batchLimit: setArg(args.batchLimit, WORKER_BATCH_LIMIT, 50), diff --git a/packages/ws-worker/test/reasons.test.ts b/packages/ws-worker/test/reasons.test.ts index 605ff497a..681518445 100644 --- a/packages/ws-worker/test/reasons.test.ts +++ b/packages/ws-worker/test/reasons.test.ts @@ -1,5 +1,9 @@ import test from 'ava'; -import createRTE from '@openfn/engine-multi'; +import esmock from 'esmock'; +import * as runtime from '@openfn/runtime'; +import path from 'node:path'; +import os from 'node:os'; +import { mkdtemp, rm, writeFile } from 'node:fs/promises'; import { createMockLogger } from '@openfn/logger'; import type { ExitReason } from '@openfn/lexicon/lightning'; @@ -18,23 +22,38 @@ import { ExecutionPlan } from '@openfn/runtime'; let engine: any; let logger: any; +let repoDir: string; test.before(async () => { logger = createMockLogger(); - // logger = createLogger(null, { level: 'debug' }); - // Note: this is the REAL engine, not a mock - engine = await createRTE({ - maxWorkers: 1, - logger, - autoinstall: { - handleIsInstalled: async () => false, - handleInstall: () => + repoDir = await mkdtemp(path.join(os.tmpdir(), 'ws-worker-reasons-')); + await writeFile( + path.join(repoDir, 'package.json'), + JSON.stringify({ name: 'repo', dependencies: {} }) + ); + + // Load the real engine but swap @openfn/runtime's install with a fast-failing + // stub. This exercises the engine's autoinstall path without touching npm. + const engineModule: any = await esmock('@openfn/engine-multi', { + '@openfn/runtime': { + ...runtime, + install: () => new Promise((_resolve, reject) => { setTimeout(() => reject(new Error('not the way to amarillo')), 1); }), }, }); + + engine = await engineModule.default({ + maxWorkers: 1, + logger, + repoDir, + }); +}); + +test.after.always(async () => { + if (repoDir) await rm(repoDir, { recursive: true, force: true }); }); test.after(async () => engine.destroy()); diff --git a/packages/ws-worker/test/util/cli.test.ts b/packages/ws-worker/test/util/cli.test.ts index 698008dc8..a2edd2b98 100644 --- a/packages/ws-worker/test/util/cli.test.ts +++ b/packages/ws-worker/test/util/cli.test.ts @@ -77,6 +77,34 @@ test('cli should handle boolean options correctly', (t) => { t.is(args.mock, true); }); +test('cli should default repoLock to true', (t) => { + const args = cli('pnpm start'.split(' ')); + t.is(args.repoLock, true); +}); + +test('cli should disable repoLock when --no-repo-lock is passed', (t) => { + const args = cli('pnpm start --no-repo-lock'.split(' ')); + t.is(args.repoLock, false); +}); + +test('cli should disable repoLock when WORKER_NO_REPO_LOCK=true', (t) => { + process.env.WORKER_NO_REPO_LOCK = 'true'; + const args = cli('pnpm start'.split(' ')); + t.is(args.repoLock, false); +}); + +test('cli should keep repoLock enabled when WORKER_NO_REPO_LOCK=false', (t) => { + process.env.WORKER_NO_REPO_LOCK = 'false'; + const args = cli('pnpm start'.split(' ')); + t.is(args.repoLock, true); +}); + +test('cli flag should override WORKER_NO_REPO_LOCK env var', (t) => { + process.env.WORKER_NO_REPO_LOCK = 'true'; + const args = cli('pnpm start --repo-lock'.split(' ')); + t.is(args.repoLock, true); +}); + test('cli should configure sentry directly', (t) => { const argv = 'pnpm start --sentry-dsn abc --sentry-env local'.split(' '); const args = cli(argv); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index fa9301662..caeeb8a25 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -435,13 +435,22 @@ importers: json-stream-stringify: specifier: ^3.1.6 version: 3.1.6 + proper-lockfile: + specifier: ^4.1.2 + version: 4.1.2 devDependencies: '@types/node': specifier: ^18.19.130 version: 18.19.130 + '@types/proper-lockfile': + specifier: ^4.1.4 + version: 4.1.4 ava: specifier: 5.3.1 version: 5.3.1 + esmock: + specifier: ^2.7.5 + version: 2.7.5 tslib: specifier: ^2.8.1 version: 2.8.1 @@ -787,6 +796,9 @@ importers: ava: specifier: 5.3.1 version: 5.3.1 + esmock: + specifier: ^2.7.5 + version: 2.7.5 nodemon: specifier: 3.1.14 version: 3.1.14 @@ -2091,6 +2103,9 @@ packages: '@types/phoenix@1.6.7': resolution: {integrity: sha512-oN9ive//QSBkf19rfDv45M7eZPi0eEXylht2OLEXicu5b4KoQ1OzXIw+xDSGWxSxe1JmepRR/ZH283vsu518/Q==} + '@types/proper-lockfile@4.1.4': + resolution: {integrity: sha512-uo2ABllncSqg9F1D4nugVl9v93RmjxF6LJzQLMLDdPaXCUIDPeOJ21Gbqi43xNKzBi/WQ0Q0dICqufzQbMjipQ==} + '@types/qs@6.15.0': resolution: {integrity: sha512-JawvT8iBVWpzTrz3EGw9BTQFg3BQNmwERdKE22vlTxawwtbyUSlMppvZYKLZzB5zgACXdXxbD3m1bXaMqP/9ow==} @@ -2875,6 +2890,10 @@ packages: resolution: {integrity: sha512-U1suiZ2oDVWv4zPO56S0NcR5QriEahGtdN2OR6FiOG4WJvcjBVFB0qI4+eKoWFH483PKGuLuu6V8Z4T5g63UVA==} engines: {node: '>=6'} + esmock@2.7.5: + resolution: {integrity: sha512-jKwL7yYpVOERalCllSnPur59s9M0gV5BKijtmOKclqDMuhqdS7DT/a7cODjz6w1XusE0wAaHBTrK+zgab/ENgw==} + engines: {node: '>=14.16.0'} + esprima@1.2.5: resolution: {integrity: sha512-S9VbPDU0adFErpDai3qDkjq8+G05ONtKzcyNrPKg/ZKa+tf879nX2KexNU95b31UoTJjRLInNBHHHjFPoCd7lQ==} engines: {node: '>=0.4.0'} @@ -3770,6 +3789,9 @@ packages: process-nextick-args@2.0.1: resolution: {integrity: sha512-3ouUOpQhtgrbOa17J7+uxOTpITYWaGP7/AhoR3+A+/1e9skrzelGi/dXzEYyvbxubEF6Wn2ypscTKiKJFFn1ag==} + proper-lockfile@4.1.2: + resolution: {integrity: sha512-TjNPblN4BwAWMXU8s9AEz4JmQxnD1NNL7bNOY/AKUzyamc379FWASUhc/K1pL2noVb+XmZKLL68cjzLsiOAMaA==} + proxy-addr@2.0.7: resolution: {integrity: sha512-llQsMLSUDUPT44jdrU/O37qlnifitDP+ZwrmmZcoSKyLKvtZxpyV0n2/bD/N4tBAAZ/gJEdZU7KMraoK1+XYAg==} engines: {node: '>= 0.10'} @@ -3855,6 +3877,10 @@ packages: resolve-pkg-maps@1.0.0: resolution: {integrity: sha512-seS2Tj26TBVOC2NIc2rOe2y2ZO7efxITtLZcGSOnHHNOQ7CkiUBfw0Iw2ck6xkIhPwLhKNLS8BO+hEpngQlqzw==} + retry@0.12.0: + resolution: {integrity: sha512-9LkiTwjUh6rT555DtE9rTX+BKByPfrMzEAtnlEtdEwr3Nkffwiihqe2bWADg+OQRjt9gl6ICdmB/ZFDCGAtSow==} + engines: {node: '>= 4'} + retry@0.13.1: resolution: {integrity: sha512-XQBQ3I8W1Cge0Seh+6gjj03LbmRFWuoszgK9ooCpwYIrhhoO80pfq4cUkU5DkknwfOfFteRwlZ56PYOGYyFWdg==} engines: {node: '>= 4'} @@ -3940,6 +3966,9 @@ packages: resolution: {integrity: sha512-ZX99e6tRweoUXqR+VBrslhda51Nh5MTQwou5tnUDgbtyM0dBgmhEDtWGP/xbKn6hqfPRHujUNwz5fy/wbbhnpw==} engines: {node: '>= 0.4'} + signal-exit@3.0.7: + resolution: {integrity: sha512-wnD2ZE+l+SPC/uoS0vXeE9L1+0wuaMqKlfz9AMUo38JsyLSBWSFcHR1Rri62LZc12vLr1gb3jl7iwQhgwpAbGQ==} + signal-exit@4.1.0: resolution: {integrity: sha512-bzyZ1e88w9O1iNJbKnOlvYTrWPDl46O1bG0D3XInv+9tkPrxrN8jUUTiFlDkkmKWgn1M6CfIA13SuGqOa9Korw==} engines: {node: '>=14'} @@ -5685,6 +5714,10 @@ snapshots: '@types/phoenix@1.6.7': {} + '@types/proper-lockfile@4.1.4': + dependencies: + '@types/retry': 0.12.0 + '@types/qs@6.15.0': {} '@types/range-parser@1.2.7': {} @@ -6465,6 +6498,8 @@ snapshots: esm@3.2.25: optional: true + esmock@2.7.5: {} + esprima@1.2.5: {} esprima@4.0.1: {} @@ -7337,6 +7372,12 @@ snapshots: process-nextick-args@2.0.1: {} + proper-lockfile@4.1.2: + dependencies: + graceful-fs: 4.2.11 + retry: 0.12.0 + signal-exit: 3.0.7 + proxy-addr@2.0.7: dependencies: forwarded: 0.2.0 @@ -7433,6 +7474,8 @@ snapshots: resolve-pkg-maps@1.0.0: {} + retry@0.12.0: {} + retry@0.13.1: {} reusify@1.1.0: {} @@ -7567,6 +7610,8 @@ snapshots: side-channel-map: 1.0.1 side-channel-weakmap: 1.0.2 + signal-exit@3.0.7: {} + signal-exit@4.1.0: {} simple-update-notifier@2.0.0: