Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/distributed-adaptor-install-lock.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@openfn/engine-multi': minor
'@openfn/ws-worker': minor
---

Add a filesystem-based lock so multiple workers can safely share a single adaptor repo directory (e.g. an NFS mount or k8s PVC). The lock lives inside engine-multi's autoinstall and serialises installs across processes via a per-adaptor lockfile. It is enabled by default and can be disabled with `WORKER_NO_REPO_LOCK=true` (or `--no-repo-lock`) on ws-worker. The engine's `isInstalled` check has also been tightened to verify the adaptor's `node_modules` entry exists on disk, not just the repo `package.json` dependency entry — this catches half-installed adaptors left behind by a crashed worker.
5 changes: 4 additions & 1 deletion packages/engine-multi/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
"@openfn/logger": "workspace:*",
"@openfn/runtime": "workspace:*",
"fast-safe-stringify": "^2.1.1",
"json-stream-stringify": "^3.1.6"
"json-stream-stringify": "^3.1.6",
"proper-lockfile": "^4.1.2"
},
"devDependencies": {
"@types/node": "^18.19.130",
"@types/proper-lockfile": "^4.1.4",
"ava": "5.3.1",
"esmock": "^2.7.5",
"tslib": "^2.8.1",
"tsm": "^2.3.0",
"tsup": "^8.5.1",
Expand Down
183 changes: 105 additions & 78 deletions packages/engine-multi/src/api/autoinstall.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import path from 'node:path';
import { stat } from 'node:fs/promises';
import {
ensureRepo,
getAliasedName,
Expand All @@ -13,98 +15,102 @@ import type { Logger } from '@openfn/logger';
import { AUTOINSTALL_COMPLETE, AUTOINSTALL_ERROR } from '../events';
import { AutoinstallError } from '../errors';
import ExecutionContext from '../classes/ExecutionContext';
import { withInstallLock } from '../util/repo-lock';

// none of these options should be on the plan actually
export type AutoinstallOptions = {
skipRepoValidation?: boolean;
handleInstall?(fn: string, repoDir: string, logger: Logger): Promise<void>;
handleIsInstalled?(
fn: string,
repoDir: string,
logger: Logger
): Promise<boolean>;
lockRepo?: boolean;
versionLookup?: (specifier: string) => Promise<string>;
};

const pending: Record<string, Promise<void>> = {};
// Per-entry options are pinned at enqueue time so the worker that drains the
// queue doesn't accidentally use the *first* caller's closure. See PR #1416.
type QueueEntry = {
adaptors: string[];
callback: (err?: any) => void;
context: ExecutionContext;
repoDir: string;
logger: Logger;
useLock: boolean;
};

let busy = false;
const queue: QueueEntry[] = [];

const processQueue = async () => {
const next = queue.shift();
if (next) {
busy = true;
await doAutoinstall(next);
processQueue();
} else {
busy = false;
}
};

const queue: Array<{ adaptors: string[]; callback: (err?: any) => void }> = [];

const enqueue = (adaptors: string[]) =>
new Promise((resolve) => {
queue.push({ adaptors, callback: resolve });
});
const doAutoinstall = async (entry: QueueEntry) => {
const { adaptors, callback, context, repoDir, logger, useLock } = entry;

// Install any modules for an Execution Plan that are not already installed
// This will enforce a queue ensuring only one module is installed at a time
// This fixes https://github.com/OpenFn/kit/issues/503
const autoinstall = async (context: ExecutionContext): Promise<ModulePaths> => {
// TODO not a huge fan of these functions in the closure, but it's ok for now
const processQueue = async () => {
const next = queue.shift();
if (next) {
busy = true;
const { adaptors, callback } = next;
await doAutoinstall(adaptors, callback);
processQueue();
} else {
// do nothing
busy = false;
for (const a of adaptors) {
const { name, version } = getNameAndVersion(a);
if (await isInstalled(a, repoDir, logger)) {
continue;
}
};

// This will actually do the autoinstall for an run (all adaptors)
const doAutoinstall = async (
adaptors: string[],
onComplete: (err?: any) => void
) => {
// Check whether we still need to do any work
for (const a of adaptors) {
const { name, version } = getNameAndVersion(a);
if (await isInstalledFn(a, repoDir, logger)) {
continue;
}

const startTime = Date.now();
try {
await installFn(a, repoDir, logger);

const duration = Date.now() - startTime;
logger.success(`autoinstalled ${a} in ${duration / 1000}s`);
context.emit(AUTOINSTALL_COMPLETE, {
module: name,
version: version!,
duration,
});
} catch (e: any) {
delete pending[a];

logger.error(`ERROR autoinstalling ${a}: ${e.message}`);
logger.error(e);
const duration = Date.now() - startTime;
context.emit(AUTOINSTALL_ERROR, {
module: name,
version: version!,
duration,
message: e.message || e.toString(),
const startTime = Date.now();
try {
if (useLock) {
// Re-check inside the lock so we skip work a peer worker
// completed while we were waiting.
await withInstallLock(repoDir, getAliasedName(a), logger, async () => {
if (await isInstalled(a, repoDir, logger)) {
logger.debug(
`another worker installed ${a} while waiting for lock; skipping`
);
return;
}
await install(a, repoDir, logger);
});

// Abort on the first error
return onComplete(new AutoinstallError(a, e));
} else {
await install(a, repoDir, logger);
}

const duration = Date.now() - startTime;
logger.success(`autoinstalled ${a} in ${duration / 1000}s`);
context.emit(AUTOINSTALL_COMPLETE, {
module: name,
version: version!,
duration,
});
} catch (e: any) {
logger.error(`ERROR autoinstalling ${a}: ${e.message}`);
logger.error(e);
const duration = Date.now() - startTime;
context.emit(AUTOINSTALL_ERROR, {
module: name,
version: version!,
duration,
message: e.message || e.toString(),
});

// Abort on the first error
return callback(new AutoinstallError(a, e));
}
onComplete();
};
}
callback();
};

// Install any modules for an Execution Plan that are not already installed
// This will enforce a queue ensuring only one module is installed at a time
// This fixes https://github.com/OpenFn/kit/issues/503
const autoinstall = async (context: ExecutionContext): Promise<ModulePaths> => {
const { logger, state, options } = context;
const { plan } = state;
const { repoDir, whitelist } = options;
const autoinstallOptions = options.autoinstall || {};
const useLock = autoinstallOptions.lockRepo !== false;

const installFn = autoinstallOptions?.handleInstall || install;
const isInstalledFn = autoinstallOptions?.handleIsInstalled || isInstalled;
const versionlookup = autoinstallOptions?.versionLookup || getLatestVersion;

let didValidateRepo = false;
Expand All @@ -124,7 +130,7 @@ const autoinstall = async (context: ExecutionContext): Promise<ModulePaths> => {
const adaptors = Array.from(identifyAdaptors(plan));
const paths: ModulePaths = {};

const adaptorsToLoad = [];
const adaptorsToLoad: string[] = [];
for (const a of adaptors) {
// Ensure that this is not blacklisted
if (whitelist && !whitelist.find((r) => r.exec(a))) {
Expand Down Expand Up @@ -166,7 +172,7 @@ const autoinstall = async (context: ExecutionContext): Promise<ModulePaths> => {
version: v,
};

if (!(await isInstalledFn(resolvedAdaptorName, repoDir, logger))) {
if (!(await isInstalled(resolvedAdaptorName, repoDir, logger))) {
adaptorsToLoad.push(resolvedAdaptorName);
}
}
Expand All @@ -185,8 +191,16 @@ const autoinstall = async (context: ExecutionContext): Promise<ModulePaths> => {
}

if (adaptorsToLoad.length) {
// Add this to the queue
const p = enqueue(adaptorsToLoad);
const p = new Promise((resolve) => {
queue.push({
adaptors: adaptorsToLoad,
callback: resolve,
context,
repoDir,
logger,
useLock,
});
});

if (!busy) {
processQueue();
Expand All @@ -210,9 +224,17 @@ export default autoinstall;
const install = (specifier: string, repoDir: string, logger: Logger) =>
runtimeInstall([specifier], repoDir, logger);

// The actual isInstalled function is not unit tested
// TODO this should probably all be handled (and tested) in @openfn/runtime
const isInstalled = async (
const fileExists = async (p: string) => {
try {
await stat(p);
return true;
} catch {
return false;
}
};

// Exported for unit testing
export const isInstalled = async (
specifier: string,
repoDir: string,
logger: Logger
Expand All @@ -232,7 +254,12 @@ const isInstalled = async (
const pkg = await loadRepoPkg(repoDir);
if (pkg) {
const { dependencies } = pkg;
return dependencies.hasOwnProperty(alias);
if (!dependencies.hasOwnProperty(alias)) {
return false;
}
return fileExists(
path.join(repoDir, 'node_modules', alias, 'package.json')
);
}
};

Expand Down
88 changes: 88 additions & 0 deletions packages/engine-multi/src/util/repo-lock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import path from 'node:path';
import { mkdir, writeFile } from 'node:fs/promises';
import lockfile from 'proper-lockfile';
import type { Logger } from '@openfn/logger';

// k8s CPU throttling can delay the heartbeat setTimeout; 5 min avoids false-stale on tight clusters
const STALE_MS = 5 * 60_000;
// Retry ceiling exceeds STALE_MS so a dead lock-holder's stale window expires before we give up
const MAX_LOCK_WAIT_MS = STALE_MS + 60_000;
const LOCK_INTERVAL_MS = 2_000;
const UPDATE_MS = 5_000;

const LOCK_RETRY_OPTIONS = {
retries: MAX_LOCK_WAIT_MS / LOCK_INTERVAL_MS,
factor: 1,
minTimeout: LOCK_INTERVAL_MS,
maxTimeout: LOCK_INTERVAL_MS,
};

const ensureLockTarget = async (target: string) => {
await mkdir(path.dirname(target), { recursive: true });
try {
await writeFile(target, '', { flag: 'wx' });
} catch (e: any) {
if (e.code !== 'EEXIST') throw e;
}
};

export const withInstallLock = async (
repoDir: string,
alias: string,
logger: Logger,
fn: () => Promise<void>
): Promise<void> => {
// Defence-in-depth: refuse aliases that could escape the .locks directory.
// Upstream whitelist filtering should make this unreachable.
if (alias.split(/[/\\]/).includes('..') || path.isAbsolute(alias)) {
throw new Error(`Invalid alias for install lock: ${alias}`);
}

const locksDir = path.join(repoDir, '.locks');
const target = path.join(locksDir, `${alias}.lock`);

await ensureLockTarget(target);

logger.debug(`acquiring install lock for ${alias}`);
let release: () => Promise<void>;

const lockOpts = { stale: STALE_MS, update: UPDATE_MS, realpath: false };

try {
release = await lockfile.lock(target, { ...lockOpts, retries: 0 });
} catch (e: any) {
if (e.code !== 'ELOCKED') throw e;

logger.info(
`waiting for install lock on \`${alias}\` (another worker is installing)`
);

try {
release = await lockfile.lock(target, {
...lockOpts,
retries: LOCK_RETRY_OPTIONS,
});
} catch (e2: any) {
if (e2.code === 'ELOCKED') {
throw new Error(
`Lock acquisition timed out after ${
MAX_LOCK_WAIT_MS / 1000
}s waiting for ${alias}; another worker likely still installing (lock: ${target})`
);
}
throw e2;
}
}

logger.debug(`acquired install lock for ${alias}`);

try {
await fn();
} finally {
try {
await release();
} catch (e) {
logger.warn(`failed to release install lock for ${alias}:`, e);
}
}
};
Loading