From 5b9c67325d76b4c0f96ba23946b92b847ef64a65 Mon Sep 17 00:00:00 2001 From: denisiuriet Date: Fri, 27 Mar 2026 14:43:38 +0200 Subject: [PATCH 01/23] switch envs, resources are shared on multiple envs and are exclusive --- src/@types/C2D/C2D.ts | 25 +- src/components/c2d/compute_engine_base.ts | 33 ++- src/components/c2d/compute_engine_docker.ts | 281 ++++++++++---------- src/utils/config/schemas.ts | 64 +++-- 4 files changed, 215 insertions(+), 188 deletions(-) diff --git a/src/@types/C2D/C2D.ts b/src/@types/C2D/C2D.ts index 603e8b188..ad4257d3a 100644 --- a/src/@types/C2D/C2D.ts +++ b/src/@types/C2D/C2D.ts @@ -139,6 +139,19 @@ export interface ComputeEnvironment extends ComputeEnvironmentBaseConfig { runMaxWaitTimeFree: number } +export interface C2DEnvironmentConfig { + id?: string + description?: string + storageExpiry?: number + minJobDuration?: number + maxJobDuration?: number + maxJobs?: number + fees?: ComputeEnvFeesStructure + access?: ComputeAccessList + free?: ComputeEnvironmentFreeOptions + resources?: string[] +} + export interface C2DDockerConfig { socketPath: string protocol: string @@ -147,17 +160,11 @@ export interface C2DDockerConfig { caPath: string certPath: string keyPath: string - storageExpiry?: number - maxJobDuration?: number - minJobDuration?: number - maxJobs?: number - fees: ComputeEnvFeesStructure - resources?: ComputeResource[] // optional, owner can overwrite - free?: ComputeEnvironmentFreeOptions - access: ComputeAccessList + resources?: ComputeResource[] imageRetentionDays?: number // Default: 7 days imageCleanupInterval?: number // Default: 86400 seconds (24 hours) - paymentClaimInterval?: number // Default: 3600 seconds (1 hours) + paymentClaimInterval?: number // Default: 3600 seconds (1 hour) + environments: C2DEnvironmentConfig[] } export type ComputeResultType = diff --git a/src/components/c2d/compute_engine_base.ts b/src/components/c2d/compute_engine_base.ts index da9de13e3..513812038 100644 --- a/src/components/c2d/compute_engine_base.ts +++ b/src/components/c2d/compute_engine_base.ts @@ -316,6 +316,9 @@ export abstract class C2DEngine { } catch (e) { CORE_LOGGER.error('Failed to get running jobs:' + e.message) } + + const envResourceIds = new Set((env.resources || []).map((r) => r.id)) + let totalJobs = 0 let totalFreeJobs = 0 let queuedJobs = 0 @@ -324,9 +327,13 @@ export abstract class C2DEngine { let maxWaitTimeFree = 0 let maxRunningTime = 0 let maxRunningTimeFree = 0 + for (const job of jobs) { - if (job.environment === env.id) { - if (job.queueMaxWaitTime === 0) { + const isThisEnv = job.environment === env.id + const isRunning = job.queueMaxWaitTime === 0 + + if (isThisEnv) { + if (isRunning) { const timeElapsed = new Date().getTime() / 1000 - Number.parseFloat(job?.algoStartTimestamp) totalJobs++ @@ -335,8 +342,20 @@ export abstract class C2DEngine { totalFreeJobs++ maxRunningTimeFree += job.maxJobDuration - timeElapsed } + } else { + queuedJobs++ + maxWaitTime += job.maxJobDuration + if (job.isFree) { + queuedFreeJobs++ + maxWaitTimeFree += job.maxJobDuration + } + } + } - for (const resource of job.resources) { + // Resource usage is checked on ALL envs + if (isRunning) { + for (const resource of job.resources) { + if (envResourceIds.has(resource.id)) { if (!(resource.id in usedResources)) usedResources[resource.id] = 0 usedResources[resource.id] += resource.amount if (job.isFree) { @@ -344,14 +363,6 @@ export abstract class C2DEngine { usedFreeResources[resource.id] += resource.amount } } - } else { - // queued job - queuedJobs++ - maxWaitTime += job.maxJobDuration - if (job.isFree) { - queuedFreeJobs++ - maxWaitTimeFree += job.maxJobDuration - } } } } diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index 41d1f1513..a9b99c487 100755 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -19,7 +19,9 @@ import type { RunningPlatform, ComputeEnvFeesStructure, ComputeResourceRequest, - ComputeEnvFees + ComputeEnvFees, + ComputeResource, + C2DEnvironmentConfig } from '../../@types/C2D/C2D.js' import { getConfiguration } from '../../utils/config.js' import { C2DEngine } from './compute_engine_base.js' @@ -57,6 +59,11 @@ import { ZeroAddress } from 'ethers' export class C2DEngineDocker extends C2DEngine { private envs: ComputeEnvironment[] = [] + private engineResources: ComputeResource[] = [] + + public getEngineResources(): ComputeResource[] { + return this.engineResources + } public docker: Dockerode private cronTimer: any @@ -121,8 +128,65 @@ export class C2DEngineDocker extends C2DEngine { // envs are build on start function } + private processFeesForEnvironment( + rawFees: ComputeEnvFeesStructure | undefined, + supportedChains: number[] + ): ComputeEnvFeesStructure | null { + if (!rawFees || Object.keys(rawFees).length === 0) return null + let fees: ComputeEnvFeesStructure = null + for (const feeChain of Object.keys(rawFees)) { + if (!supportedChains.includes(parseInt(feeChain))) continue + if (fees === null) fees = {} + if (!(feeChain in fees)) fees[feeChain] = [] + const tmpFees: ComputeEnvFees[] = [] + for (const feeEntry of rawFees[feeChain]) { + if (!feeEntry.prices || feeEntry.prices.length === 0) { + CORE_LOGGER.error( + `Unable to find prices for fee ${JSON.stringify(feeEntry)} on chain ${feeChain}` + ) + continue + } + if (!feeEntry.feeToken) { + const tokenAddress = getOceanTokenAddressForChain(parseInt(feeChain)) + if (tokenAddress) { + feeEntry.feeToken = tokenAddress + tmpFees.push(feeEntry) + } else { + CORE_LOGGER.error( + `Unable to find Ocean token address for chain ${feeChain} and no custom token provided` + ) + } + } else { + tmpFees.push(feeEntry) + } + } + fees[feeChain] = tmpFees + } + return fees + } + + private buildEnvResources(envDef: C2DEnvironmentConfig): ComputeResource[] { + let resources: ComputeResource[] + if (envDef.resources && envDef.resources.length > 0) { + // Filter engine resources to only those referenced by this environment + const allowedIds = new Set(envDef.resources) + resources = this.engineResources + .filter((r) => allowedIds.has(r.id)) + .map((r) => JSON.parse(JSON.stringify(r))) + for (const id of envDef.resources) { + if (!this.engineResources.some((r) => r.id === id)) { + CORE_LOGGER.warn( + `Environment references resource "${id}" which does not exist at engine level` + ) + } + } + } else { + resources = this.engineResources.map((r) => JSON.parse(JSON.stringify(r))) + } + return resources + } + public override async start() { - // let's build the env. Swarm and k8 will build multiple envs, based on arhitecture const config = await getConfiguration() const envConfig = await this.getC2DConfig().connection let sysinfo = null @@ -133,87 +197,17 @@ export class C2DEngineDocker extends C2DEngine { // since we cannot connect to docker, we cannot start the engine -> no envs return } - let fees: ComputeEnvFeesStructure = null + + // Determine supported chains const supportedChains: number[] = [] if (config.supportedNetworks) { for (const chain of Object.keys(config.supportedNetworks)) { supportedChains.push(parseInt(chain)) } } - if (envConfig.fees && Object.keys(envConfig.fees).length > 0) { - for (const feeChain of Object.keys(envConfig.fees)) { - // for (const feeConfig of envConfig.fees) { - if (supportedChains.includes(parseInt(feeChain))) { - if (fees === null) fees = {} - if (!(feeChain in fees)) fees[feeChain] = [] - const tmpFees: ComputeEnvFees[] = [] - for (let i = 0; i < envConfig.fees[feeChain].length; i++) { - if ( - envConfig.fees[feeChain][i].prices && - envConfig.fees[feeChain][i].prices.length > 0 - ) { - if (!envConfig.fees[feeChain][i].feeToken) { - const tokenAddress = getOceanTokenAddressForChain(parseInt(feeChain)) - if (tokenAddress) { - envConfig.fees[feeChain][i].feeToken = tokenAddress - tmpFees.push(envConfig.fees[feeChain][i]) - } else { - CORE_LOGGER.error( - `Unable to find Ocean token address for chain ${feeChain} and no custom token provided` - ) - } - } else { - tmpFees.push(envConfig.fees[feeChain][i]) - } - } else { - CORE_LOGGER.error( - `Unable to find prices for fee ${JSON.stringify( - envConfig.fees[feeChain][i] - )} on chain ${feeChain}` - ) - } - } - fees[feeChain] = tmpFees - } - } - /* for (const chain of Object.keys(config.supportedNetworks)) { - const chainId = parseInt(chain) - if (task.chainId && task.chainId !== chainId) continue - result[chainId] = await computeEngines.fetchEnvironments(chainId) - } */ - } - this.envs.push({ - id: '', // this.getC2DConfig().hash + '-' + create256Hash(JSON.stringify(this.envs[i])), - runningJobs: 0, - consumerAddress: this.getKeyManager().getEthAddress(), - platform: { - architecture: sysinfo.Architecture, - os: sysinfo.OSType - }, - access: { - addresses: [], - accessLists: null - }, - fees, - queuedJobs: 0, - queuedFreeJobs: 0, - queMaxWaitTime: 0, - queMaxWaitTimeFree: 0, - runMaxWaitTime: 0, - runMaxWaitTimeFree: 0 - }) - if (`access` in envConfig) this.envs[0].access = envConfig.access - - if (`storageExpiry` in envConfig) this.envs[0].storageExpiry = envConfig.storageExpiry - if (`minJobDuration` in envConfig) - this.envs[0].minJobDuration = envConfig.minJobDuration - if (`maxJobDuration` in envConfig) - this.envs[0].maxJobDuration = envConfig.maxJobDuration - if (`maxJobs` in envConfig) this.envs[0].maxJobs = envConfig.maxJobs - // let's add resources - this.envs[0].resources = [] - const cpuResources = { + // Build engine-level resources from system info + user config overrides + const cpuResources: ComputeResource = { id: 'cpu', type: 'cpu', total: sysinfo.NCPU, @@ -221,98 +215,103 @@ export class C2DEngineDocker extends C2DEngine { min: 1, description: os.cpus()[0].model } - const ramResources = { + const ramResources: ComputeResource = { id: 'ram', type: 'ram', total: Math.floor(sysinfo.MemTotal / 1024 / 1024 / 1024), max: Math.floor(sysinfo.MemTotal / 1024 / 1024 / 1024), min: 1 } - + this.engineResources = [] if (envConfig.resources) { for (const res of envConfig.resources) { - // allow user to add other resources if (res.id === 'cpu') { if (res.total) cpuResources.total = res.total if (res.max) cpuResources.max = res.max if (res.min) cpuResources.min = res.min - } - if (res.id === 'ram') { + } else if (res.id === 'ram') { if (res.total) ramResources.total = res.total if (res.max) ramResources.max = res.max if (res.min) ramResources.min = res.min - } - - if (res.id !== 'cpu' && res.id !== 'ram') { + } else { if (!res.max) res.max = res.total if (!res.min) res.min = 0 - this.envs[0].resources.push(res) + this.engineResources.push(res) } } } - this.envs[0].resources.push(cpuResources) - this.envs[0].resources.push(ramResources) - // Build the list of physical CPU core indices for this environment + this.engineResources.push(cpuResources) + this.engineResources.push(ramResources) + + // Build CPU core affinity map (engine-level, shared across all environments) this.envCpuCores = Array.from( { length: cpuResources.total }, (_, i) => this.cpuOffset + i ) CORE_LOGGER.info( - `CPU affinity: environment cores ${this.envCpuCores[0]}-${this.envCpuCores[this.envCpuCores.length - 1]} (offset=${this.cpuOffset}, total=${cpuResources.total})` + `CPU affinity: engine cores ${this.envCpuCores[0]}-${this.envCpuCores[this.envCpuCores.length - 1]} (offset=${this.cpuOffset}, total=${cpuResources.total})` ) - /* TODO - get namedresources & discreete one - if (sysinfo.GenericResources) { - for (const [key, value] of Object.entries(sysinfo.GenericResources)) { - for (const [type, val] of Object.entries(value)) { - // for (const resType in sysinfo.GenericResources) { - if (type === 'NamedResourceSpec') { - // if we have it, ignore it - const resourceId = val.Value - const resourceType = val.Kind - let found = false - for (const res of this.envs[0].resources) { - if (res.id === resourceId) { - found = true - break - } - } - if (!found) { - this.envs[0].resources.push({ - id: resourceId, - kind: resourceType, - total: 1, - max: 1, - min: 0 - }) - } - } - } - } + + const platform: RunningPlatform = { + architecture: sysinfo.Architecture, + os: sysinfo.OSType } - */ - // limits for free env - if ('free' in envConfig) { - this.envs[0].free = { - access: { - addresses: [], - accessLists: null - } - } - if (`access` in envConfig.free) this.envs[0].free.access = envConfig.free.access - if (`storageExpiry` in envConfig.free) - this.envs[0].free.storageExpiry = envConfig.free.storageExpiry - if (`minJobDuration` in envConfig.free) - this.envs[0].free.minJobDuration = envConfig.free.minJobDuration - if (`maxJobDuration` in envConfig.free) - this.envs[0].free.maxJobDuration = envConfig.free.maxJobDuration - if (`maxJobs` in envConfig.free) this.envs[0].free.maxJobs = envConfig.free.maxJobs - if ('resources' in envConfig.free) { - // TO DO - check if resource is also listed in this.envs[0].resources, if not, ignore it - this.envs[0].free.resources = envConfig.free.resources + const consumerAddress = this.getKeyManager().getEthAddress() + + for (let envIdx = 0; envIdx < envConfig.environments.length; envIdx++) { + const envDef: C2DEnvironmentConfig = envConfig.environments[envIdx] + + const fees = this.processFeesForEnvironment(envDef.fees, supportedChains) + + const envResources = this.buildEnvResources(envDef) + + const env: ComputeEnvironment = { + id: '', + runningJobs: 0, + consumerAddress, + platform, + access: envDef.access || { addresses: [], accessLists: null }, + fees, + resources: envResources, + queuedJobs: 0, + queuedFreeJobs: 0, + queMaxWaitTime: 0, + queMaxWaitTimeFree: 0, + runMaxWaitTime: 0, + runMaxWaitTimeFree: 0 } + + if (envDef.storageExpiry !== undefined) env.storageExpiry = envDef.storageExpiry + if (envDef.minJobDuration !== undefined) env.minJobDuration = envDef.minJobDuration + if (envDef.maxJobDuration !== undefined) env.maxJobDuration = envDef.maxJobDuration + if (envDef.maxJobs !== undefined) env.maxJobs = envDef.maxJobs + + // Free tier config for this environment + if (envDef.free) { + env.free = { + access: envDef.free.access || { addresses: [], accessLists: null } + } + if (envDef.free.storageExpiry !== undefined) + env.free.storageExpiry = envDef.free.storageExpiry + if (envDef.free.minJobDuration !== undefined) + env.free.minJobDuration = envDef.free.minJobDuration + if (envDef.free.maxJobDuration !== undefined) + env.free.maxJobDuration = envDef.free.maxJobDuration + if (envDef.free.maxJobs !== undefined) env.free.maxJobs = envDef.free.maxJobs + if (envDef.free.resources) env.free.resources = envDef.free.resources + } + + const envIdSuffix = envDef.id || String(envIdx) + env.id = + this.getC2DConfig().hash + + '-' + + create256Hash(JSON.stringify(env.fees) + envIdSuffix) + + this.envs.push(env) + CORE_LOGGER.info( + `Engine ${this.getC2DConfig().hash}: created environment ${env.id} (index=${envIdx}, resources=${envResources.map((r) => r.id).join(',')})` + ) } - this.envs[0].id = - this.getC2DConfig().hash + '-' + create256Hash(JSON.stringify(this.envs[0].fees)) // Rebuild CPU allocations from running containers (handles node restart) await this.rebuildCpuAllocations() @@ -1608,8 +1607,8 @@ export class C2DEngineDocker extends C2DEngine { if (job.status === C2DStatusNumber.ConfiguringVolumes) { // create the volume & create container // TO DO C2D: Choose driver & size - // get env info - const envResource = this.envs[0].resources + // get engine-level resources for Docker device/hardware configuration + const envResource = this.engineResources const volume: VolumeCreateOptions = { Name: job.jobId + '-volume' } diff --git a/src/utils/config/schemas.ts b/src/utils/config/schemas.ts index 07104524e..c132e4efb 100644 --- a/src/utils/config/schemas.ts +++ b/src/utils/config/schemas.ts @@ -156,6 +156,40 @@ export const ComputeEnvironmentFreeOptionsSchema = z.object({ .optional() }) +export const C2DEnvironmentConfigSchema = z + .object({ + id: z.string().optional(), + description: z.string().optional(), + storageExpiry: z.number().int().optional().default(604800), + minJobDuration: z.number().int().optional().default(60), + maxJobDuration: z.number().int().optional().default(3600), + maxJobs: z.number().int().optional(), + fees: z.record(z.string(), z.array(ComputeEnvFeesSchema)).optional(), + access: z + .object({ + addresses: z.array(z.string()), + accessLists: z + .array(z.record(z.string(), z.array(z.string()))) + .nullable() + .optional() + }) + .optional(), + free: ComputeEnvironmentFreeOptionsSchema.optional(), + resources: z.array(z.string()).optional() // references to engine-level resource IDs + }) + .refine( + (data) => + (data.fees !== undefined && Object.keys(data.fees).length > 0) || + (data.free !== undefined && data.free !== null), + { + message: + 'Each environment must have either a non-empty "fees" configuration or a "free" configuration' + } + ) + .refine((data) => data.storageExpiry >= data.maxJobDuration, { + message: '"storageExpiry" should be greater than "maxJobDuration"' + }) + export const C2DDockerConfigSchema = z.array( z .object({ @@ -167,34 +201,10 @@ export const C2DDockerConfigSchema = z.array( certPath: z.string().optional(), keyPath: z.string().optional(), resources: z.array(ComputeResourceSchema).optional(), - storageExpiry: z.number().int().optional().default(604800), - maxJobDuration: z.number().int().optional().default(3600), - minJobDuration: z.number().int().optional().default(60), - access: z - .object({ - addresses: z.array(z.string()), - accessLists: z - .array(z.record(z.string(), z.array(z.string()))) - .nullable() - .optional() - }) - .optional(), - fees: z.record(z.string(), z.array(ComputeEnvFeesSchema)).optional(), - free: ComputeEnvironmentFreeOptionsSchema.optional(), imageRetentionDays: z.number().int().min(1).optional().default(7), - imageCleanupInterval: z.number().int().min(3600).optional().default(86400) // min 1 hour, default 24 hours - }) - .refine( - (data) => - (data.fees !== undefined && Object.keys(data.fees).length > 0) || - (data.free !== undefined && data.free !== null), - { - message: - 'Each docker compute environment must have either a non-empty "fees" configuration or a "free" configuration' - } - ) - .refine((data) => data.storageExpiry >= data.maxJobDuration, { - message: '"storageExpiry" should be greater than "maxJobDuration"' + imageCleanupInterval: z.number().int().min(3600).optional().default(86400), + paymentClaimInterval: z.number().int().optional(), + environments: z.array(C2DEnvironmentConfigSchema).min(1) }) .refine( (data) => { From c3c274f4a5e04ebf5261fded091e4e978139af6a Mon Sep 17 00:00:00 2001 From: denisiuriet Date: Fri, 27 Mar 2026 14:53:58 +0200 Subject: [PATCH 02/23] fixed tests envs --- src/test/integration/algorithmsAccess.test.ts | 4 +- src/test/integration/compute.test.ts | 72 ++++++++++--------- src/test/integration/credentials.test.ts | 4 +- src/test/integration/imageCleanup.test.ts | 36 +++++----- src/test/unit/compute.test.ts | 2 +- 5 files changed, 63 insertions(+), 55 deletions(-) diff --git a/src/test/integration/algorithmsAccess.test.ts b/src/test/integration/algorithmsAccess.test.ts index 95fb54bd9..9721c1087 100644 --- a/src/test/integration/algorithmsAccess.test.ts +++ b/src/test/integration/algorithmsAccess.test.ts @@ -97,11 +97,11 @@ describe('Trusted algorithms Flow', () => { '0xc594c6e5def4bab63ac29eed19a134c130388f74f019bc74b8f4389df2837a58', JSON.stringify(['0xe2DD09d719Da89e5a3D0F2549c7E24566e947260']), `${homedir}/.ocean/ocean-contracts/artifacts/address.json`, - '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"fees":{"' + + '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"fees":{"' + DEVELOPMENT_CHAIN_ID + '":[{"feeToken":"' + paymentToken + - '","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60, "minJobDuration":10, "maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]' + '","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"minJobDuration":10,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]}]' ] ) ) diff --git a/src/test/integration/compute.test.ts b/src/test/integration/compute.test.ts index 3d29c96bb..eb57aecdc 100644 --- a/src/test/integration/compute.test.ts +++ b/src/test/integration/compute.test.ts @@ -183,11 +183,11 @@ describe('Compute', () => { '0xc594c6e5def4bab63ac29eed19a134c130388f74f019bc74b8f4389df2837a58', JSON.stringify(['0xe2DD09d719Da89e5a3D0F2549c7E24566e947260']), `${homedir}/.ocean/ocean-contracts/artifacts/address.json`, - '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"fees":{"' + + '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"fees":{"' + DEVELOPMENT_CHAIN_ID + '":[{"feeToken":"' + paymentToken + - '","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"minJobDuration":10,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]' + '","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"minJobDuration":10,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]}]' ] ) ) @@ -2339,7 +2339,7 @@ describe('Compute Access Restrictions', () => { '0xc594c6e5def4bab63ac29eed19a134c130388f74f019bc74b8f4389df2837a58', JSON.stringify(['0xe2DD09d719Da89e5a3D0F2549c7E24566e947260']), `${homedir}/.ocean/ocean-contracts/artifacts/address.json`, - '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"access":{"addresses":["' + + '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"access":{"addresses":["' + allowedAddress + '"],"accessLists":[]},"fees":{"' + DEVELOPMENT_CHAIN_ID + @@ -2347,7 +2347,7 @@ describe('Compute Access Restrictions', () => { paymentToken + '","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"minJobDuration":10,"maxJobs":3,"access":{"addresses":["' + allowedAddress + - '"],"accessLists":[]},"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]' + '"],"accessLists":[]},"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]}]' ] ) ) @@ -2504,35 +2504,39 @@ describe('Compute Access Restrictions', () => { { socketPath: '/var/run/docker.sock', resources: [{ id: 'disk', total: 10 }], - storageExpiry: 604800, - maxJobDuration: 3600, - minJobDuration: 60, - access: { - addresses: [], - accessLists: [{ [DEVELOPMENT_CHAIN_ID]: [accessListAddress] }] - }, - fees: { - [DEVELOPMENT_CHAIN_ID]: [ - { - feeToken: paymentToken, - prices: [{ id: 'cpu', price: 1 }] + environments: [ + { + storageExpiry: 604800, + maxJobDuration: 3600, + minJobDuration: 60, + access: { + addresses: [], + accessLists: [{ [DEVELOPMENT_CHAIN_ID]: [accessListAddress] }] + }, + fees: { + [DEVELOPMENT_CHAIN_ID]: [ + { + feeToken: paymentToken, + prices: [{ id: 'cpu', price: 1 }] + } + ] + }, + free: { + maxJobDuration: 60, + minJobDuration: 10, + maxJobs: 3, + access: { + addresses: [], + accessLists: [{ [DEVELOPMENT_CHAIN_ID]: [accessListAddress] }] + }, + resources: [ + { id: 'cpu', max: 1 }, + { id: 'ram', max: 1 }, + { id: 'disk', max: 1 } + ] } - ] - }, - free: { - maxJobDuration: 60, - minJobDuration: 10, - maxJobs: 3, - access: { - addresses: [], - accessLists: [{ [DEVELOPMENT_CHAIN_ID]: [accessListAddress] }] - }, - resources: [ - { id: 'cpu', max: 1 }, - { id: 'ram', max: 1 }, - { id: 'disk', max: 1 } - ] - } + } + ] } ]) ] @@ -2660,11 +2664,11 @@ describe('Compute Access Restrictions', () => { JSON.stringify([DEVELOPMENT_CHAIN_ID]), '0xc594c6e5def4bab63ac29eed19a134c130388f74f019bc74b8f4389df2837a58', `${homedir}/.ocean/ocean-contracts/artifacts/address.json`, - '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"paymentClaimInterval":60,"fees":{"' + + '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"paymentClaimInterval":60,"environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"fees":{"' + DEVELOPMENT_CHAIN_ID + '":[{"feeToken":"' + paymentToken + - '","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"minJobDuration":10,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]' + '","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"minJobDuration":10,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]}]' ] ) ) diff --git a/src/test/integration/credentials.test.ts b/src/test/integration/credentials.test.ts index db9945ea6..97254bac3 100644 --- a/src/test/integration/credentials.test.ts +++ b/src/test/integration/credentials.test.ts @@ -126,11 +126,11 @@ describe('[Credentials Flow] - Should run a complete node flow.', () => { JSON.stringify([await publisherAccount.getAddress()]), JSON.stringify([await publisherAccount.getAddress()]), `${homedir}/.ocean/ocean-contracts/artifacts/address.json`, - '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"fees":{"' + + '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"fees":{"' + DEVELOPMENT_CHAIN_ID + '":[{"feeToken":"' + paymentToken + - '","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"minJobDuration":10,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]' + '","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"minJobDuration":10,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]}]' ] ) ) diff --git a/src/test/integration/imageCleanup.test.ts b/src/test/integration/imageCleanup.test.ts index ab3b5304c..be03f1c90 100644 --- a/src/test/integration/imageCleanup.test.ts +++ b/src/test/integration/imageCleanup.test.ts @@ -34,23 +34,27 @@ describe('Docker Image Cleanup Integration Tests', () => { { socketPath: '/var/run/docker.sock', resources: [{ id: 'disk', total: 10 }], - storageExpiry: 604800, - maxJobDuration: 3600, - minJobDuration: 60, - fees: { - '1': [ - { - feeToken: '0x123', - prices: [{ id: 'cpu', price: 1 }] - } - ] - }, - access: { - addresses: [], - accessLists: null - }, imageRetentionDays: 7, - imageCleanupInterval: 60 // 1 minute for testing + imageCleanupInterval: 60, // 1 minute for testing + environments: [ + { + storageExpiry: 604800, + maxJobDuration: 3600, + minJobDuration: 60, + fees: { + '1': [ + { + feeToken: '0x123', + prices: [{ id: 'cpu', price: 1 }] + } + ] + }, + access: { + addresses: [], + accessLists: null + } + } + ] } ]) ] diff --git a/src/test/unit/compute.test.ts b/src/test/unit/compute.test.ts index 8836e2ae0..0ad5f114d 100644 --- a/src/test/unit/compute.test.ts +++ b/src/test/unit/compute.test.ts @@ -130,7 +130,7 @@ describe('Compute Jobs Database', () => { envOverrides = buildEnvOverrideConfig( [ENVIRONMENT_VARIABLES.DOCKER_COMPUTE_ENVIRONMENTS], [ - '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"fees":{"1":[{"feeToken":"0x123","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"minJobDuration":10,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]' + '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"fees":{"1":[{"feeToken":"0x123","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"minJobDuration":10,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]}]' ] ) envOverrides = await setupEnvironment(TEST_ENV_CONFIG_FILE, envOverrides) From 5de452648839b35c99d8dab250aece0ec8514633 Mon Sep 17 00:00:00 2001 From: denisiuriet Date: Fri, 27 Mar 2026 14:58:05 +0200 Subject: [PATCH 03/23] update config --- src/test/config.json | 108 ++++++++++++++++++++++--------------------- 1 file changed, 56 insertions(+), 52 deletions(-) diff --git a/src/test/config.json b/src/test/config.json index 778a1c1be..04dfd96ac 100644 --- a/src/test/config.json +++ b/src/test/config.json @@ -102,70 +102,74 @@ "total": 1000000000 } ], - "storageExpiry": 604800, - "maxJobDuration": 3600, - "minJobDuration": 60, - "fees": { - "11155111": [ - { - "feeToken": "0x1B083D8584dd3e6Ff37d04a6e7e82b5F622f3985", - "prices": [ + "environments": [ + { + "storageExpiry": 604800, + "maxJobDuration": 3600, + "minJobDuration": 60, + "fees": { + "11155111": [ { - "id": "cpu", - "price": 1 - } - ] - }, - { - "feeToken": "0xfff9976782d46cc05630d1f6ebab18b2324d6b14", - "prices": [ + "feeToken": "0x1B083D8584dd3e6Ff37d04a6e7e82b5F622f3985", + "prices": [ + { + "id": "cpu", + "price": 1 + } + ] + }, { - "id": "cpu", - "price": 1 + "feeToken": "0xfff9976782d46cc05630d1f6ebab18b2324d6b14", + "prices": [ + { + "id": "cpu", + "price": 1 + } + ] } - ] - } - ], - "11155420": [ - { - "feeToken": "0xf26c6C93f9f1d725e149d95f8E7B2334a406aD10", - "prices": [ + ], + "11155420": [ { - "id": "cpu", - "price": 1 + "feeToken": "0xf26c6C93f9f1d725e149d95f8E7B2334a406aD10", + "prices": [ + { + "id": "cpu", + "price": 1 + } + ] + }, + { + "feeToken": "0x4200000000000000000000000000000000000006", + "prices": [ + { + "id": "cpu", + "price": 1 + } + ] } ] }, - { - "feeToken": "0x4200000000000000000000000000000000000006", - "prices": [ + "free": { + "maxJobDuration": 3600, + "minJobDuration": 60, + "maxJobs": 3, + "resources": [ { "id": "cpu", - "price": 1 + "max": 1 + }, + { + "id": "ram", + "max": 1000000000 + }, + { + "id": "disk", + "max": 1000000000 } ] } - ] - }, - "free": { - "maxJobDuration": 3600, - "minJobDuration": 60, - "maxJobs": 3, - "resources": [ - { - "id": "cpu", - "max": 1 - }, - { - "id": "ram", - "max": 1000000000 - }, - { - "id": "disk", - "max": 1000000000 - } - ] - } + } + ] } ] } From 3ca174f8a46f5c1bb12054af87b36fc5fac97285 Mon Sep 17 00:00:00 2001 From: denisiuriet Date: Fri, 27 Mar 2026 15:23:04 +0200 Subject: [PATCH 04/23] update docker envs --- .github/workflows/ci.yml | 2 +- config.json | 73 ++++++++++++++++++++-------------------- 2 files changed, 37 insertions(+), 38 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6f1fa84bd..d08c369ae 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -290,7 +290,7 @@ jobs: DB_PASSWORD: 'changeme' MAX_REQ_PER_MINUTE: 320 MAX_CONNECTIONS_PER_MINUTE: 320 - DOCKER_COMPUTE_ENVIRONMENTS: '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration": 60,"fees":{"8996":[{"prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"minJobDuration": 10,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]' + DOCKER_COMPUTE_ENVIRONMENTS: '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"fees":{"8996":[{"prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]}]' DOCKER_REGISTRY_AUTHS: ${{ env.DOCKER_REGISTRY_AUTHS }} - name: Check Ocean Node is running run: | diff --git a/config.json b/config.json index 2fa5f640b..886c4f2da 100644 --- a/config.json +++ b/config.json @@ -102,48 +102,47 @@ "total": 1 } ], - "storageExpiry": 604800, - "maxJobDuration": 3600, - "minJobDuration": 60, - "access": { - "addresses": [], - "accessLists": [] - }, - "fees": { - "8996": [ - { - "prices": [ + "environments": [ + { + "storageExpiry": 604800, + "maxJobDuration": 3600, + "minJobDuration": 60, + "access": { + "addresses": [], + "accessLists": [] + }, + "fees": { + "8996": [ { - "id": "cpu", - "price": 1 + "prices": [ + { + "id": "cpu", + "price": 1 + } + ] } ] - } - ] - }, - "free": { - "maxJobDuration": 3600, - "minJobDuration": 60, - "maxJobs": 3, - "access": { - "addresses": [], - "accessLists": [] - }, - "resources": [ - { - "id": "cpu", - "max": 1 - }, - { - "id": "ram", - "max": 1 }, - { - "id": "disk", - "max": 1 + "free": { + "maxJobDuration": 3600, + "maxJobs": 3, + "resources": [ + { + "id": "cpu", + "max": 1 + }, + { + "id": "ram", + "max": 1 + }, + { + "id": "disk", + "max": 1 + } + ] } - ] - } + } + ] } ] } From 46cce6104c377bdfb975baab72a1f5f399dec31a Mon Sep 17 00:00:00 2001 From: denisiuriet Date: Sun, 29 Mar 2026 15:26:31 +0300 Subject: [PATCH 05/23] remove engine level resources --- .env.example | 7 +- .github/workflows/ci.yml | 2 +- src/@types/C2D/C2D.ts | 4 +- src/components/c2d/compute_engine_docker.ts | 109 ++++-------------- src/components/c2d/compute_engines.ts | 11 +- src/test/config.json | 15 ++- src/test/integration/algorithmsAccess.test.ts | 2 +- src/test/integration/compute.test.ts | 8 +- src/test/integration/credentials.test.ts | 2 +- src/test/integration/imageCleanup.test.ts | 2 +- src/test/unit/compute.test.ts | 2 +- src/utils/config/schemas.ts | 35 +++--- 12 files changed, 70 insertions(+), 129 deletions(-) diff --git a/.env.example b/.env.example index 473bb672f..00feed7b0 100644 --- a/.env.example +++ b/.env.example @@ -65,8 +65,11 @@ export P2P_BOOTSTRAP_NODES= export P2P_FILTER_ANNOUNCED_ADDRESSES= ## compute -# Example with cross-resource constraints (constraints are optional and backwards-compatible): -# export DOCKER_COMPUTE_ENVIRONMENTS='[{"socketPath":"/var/run/docker.sock","storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"fees":{"1":[{"feeToken":"0x123","prices":[{"id":"cpu","price":1},{"id":"ram","price":0.1},{"id":"disk","price":0.01}]}]},"resources":[{"id":"cpu","total":8,"max":8,"min":1,"constraints":[{"id":"ram","min":1,"max":3},{"id":"disk","min":10,"max":100}]},{"id":"ram","total":32,"max":32,"min":1},{"id":"disk","total":500,"max":500,"min":10},{"id":"gpu","total":4,"max":4,"min":0,"constraints":[{"id":"ram","min":8,"max":32},{"id":"cpu","min":2,"max":4}]}]}]' +# Each environment defines its own resources (CPU, RAM, disk, GPUs) with full configuration. +# GPUs, RAM, and disk are shared-exclusive: visible in multiple envs, inUse tracked globally. +# CPU cores are hard-partitioned per environment via cpuCores (e.g., envA gets cores 0-9, envB gets 10-19). +# If a job on envA uses gpu0, it shows as in-use on envB too. +# export DOCKER_COMPUTE_ENVIRONMENTS='[{"socketPath":"/var/run/docker.sock","environments":[{"id":"envA","cpuCores":[0,1,2,3],"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"cpu","total":4,"max":4,"min":1,"type":"cpu"},{"id":"ram","total":16,"max":16,"min":1,"type":"ram"},{"id":"disk","total":500,"max":500,"min":10,"type":"disk"},{"id":"gpu0","total":1,"max":1,"min":0,"type":"gpu","init":{"deviceRequests":{"Driver":"nvidia","DeviceIDs":["0"],"Capabilities":[["gpu"]]}}}],"fees":{"1":[{"feeToken":"0x123","prices":[{"id":"cpu","price":1},{"id":"ram","price":0.1},{"id":"disk","price":0.01},{"id":"gpu0","price":5}]}]}}]}]' export DOCKER_COMPUTE_ENVIRONMENTS= diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d08c369ae..5d265047c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -290,7 +290,7 @@ jobs: DB_PASSWORD: 'changeme' MAX_REQ_PER_MINUTE: 320 MAX_CONNECTIONS_PER_MINUTE: 320 - DOCKER_COMPUTE_ENVIRONMENTS: '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"fees":{"8996":[{"prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]}]' + DOCKER_COMPUTE_ENVIRONMENTS: '[{"socketPath":"/var/run/docker.sock","environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"disk","total":10,"max":10,"min":0,"type":"disk"}],"fees":{"8996":[{"prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]}]' DOCKER_REGISTRY_AUTHS: ${{ env.DOCKER_REGISTRY_AUTHS }} - name: Check Ocean Node is running run: | diff --git a/src/@types/C2D/C2D.ts b/src/@types/C2D/C2D.ts index ad4257d3a..f2d8d01d5 100644 --- a/src/@types/C2D/C2D.ts +++ b/src/@types/C2D/C2D.ts @@ -149,7 +149,8 @@ export interface C2DEnvironmentConfig { fees?: ComputeEnvFeesStructure access?: ComputeAccessList free?: ComputeEnvironmentFreeOptions - resources?: string[] + resources?: ComputeResource[] + cpuCores?: number[] // explicit physical CPU core indices for this environment } export interface C2DDockerConfig { @@ -160,7 +161,6 @@ export interface C2DDockerConfig { caPath: string certPath: string keyPath: string - resources?: ComputeResource[] imageRetentionDays?: number // Default: 7 days imageCleanupInterval?: number // Default: 86400 seconds (24 hours) paymentClaimInterval?: number // Default: 3600 seconds (1 hour) diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index a9b99c487..ee92b5e10 100755 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -59,11 +59,6 @@ import { ZeroAddress } from 'ethers' export class C2DEngineDocker extends C2DEngine { private envs: ComputeEnvironment[] = [] - private engineResources: ComputeResource[] = [] - - public getEngineResources(): ComputeResource[] { - return this.engineResources - } public docker: Dockerode private cronTimer: any @@ -77,18 +72,15 @@ export class C2DEngineDocker extends C2DEngine { private cleanupInterval: number private paymentClaimInterval: number private cpuAllocations: Map = new Map() - private envCpuCores: number[] = [] - private cpuOffset: number + private envCpuCoresMap: Map = new Map() public constructor( clusterConfig: C2DClusterInfo, db: C2DDatabase, escrow: Escrow, keyManager: KeyManager, - dockerRegistryAuths: dockerRegistrysAuth, - cpuOffset: number = 0 + dockerRegistryAuths: dockerRegistrysAuth ) { super(clusterConfig, db, escrow, keyManager, dockerRegistryAuths) - this.cpuOffset = cpuOffset this.docker = null if (clusterConfig.connection.socketPath) { @@ -165,26 +157,6 @@ export class C2DEngineDocker extends C2DEngine { return fees } - private buildEnvResources(envDef: C2DEnvironmentConfig): ComputeResource[] { - let resources: ComputeResource[] - if (envDef.resources && envDef.resources.length > 0) { - // Filter engine resources to only those referenced by this environment - const allowedIds = new Set(envDef.resources) - resources = this.engineResources - .filter((r) => allowedIds.has(r.id)) - .map((r) => JSON.parse(JSON.stringify(r))) - for (const id of envDef.resources) { - if (!this.engineResources.some((r) => r.id === id)) { - CORE_LOGGER.warn( - `Environment references resource "${id}" which does not exist at engine level` - ) - } - } - } else { - resources = this.engineResources.map((r) => JSON.parse(JSON.stringify(r))) - } - return resources - } public override async start() { const config = await getConfiguration() @@ -206,52 +178,6 @@ export class C2DEngineDocker extends C2DEngine { } } - // Build engine-level resources from system info + user config overrides - const cpuResources: ComputeResource = { - id: 'cpu', - type: 'cpu', - total: sysinfo.NCPU, - max: sysinfo.NCPU, - min: 1, - description: os.cpus()[0].model - } - const ramResources: ComputeResource = { - id: 'ram', - type: 'ram', - total: Math.floor(sysinfo.MemTotal / 1024 / 1024 / 1024), - max: Math.floor(sysinfo.MemTotal / 1024 / 1024 / 1024), - min: 1 - } - this.engineResources = [] - if (envConfig.resources) { - for (const res of envConfig.resources) { - if (res.id === 'cpu') { - if (res.total) cpuResources.total = res.total - if (res.max) cpuResources.max = res.max - if (res.min) cpuResources.min = res.min - } else if (res.id === 'ram') { - if (res.total) ramResources.total = res.total - if (res.max) ramResources.max = res.max - if (res.min) ramResources.min = res.min - } else { - if (!res.max) res.max = res.total - if (!res.min) res.min = 0 - this.engineResources.push(res) - } - } - } - this.engineResources.push(cpuResources) - this.engineResources.push(ramResources) - - // Build CPU core affinity map (engine-level, shared across all environments) - this.envCpuCores = Array.from( - { length: cpuResources.total }, - (_, i) => this.cpuOffset + i - ) - CORE_LOGGER.info( - `CPU affinity: engine cores ${this.envCpuCores[0]}-${this.envCpuCores[this.envCpuCores.length - 1]} (offset=${this.cpuOffset}, total=${cpuResources.total})` - ) - const platform: RunningPlatform = { architecture: sysinfo.Architecture, os: sysinfo.OSType @@ -263,7 +189,9 @@ export class C2DEngineDocker extends C2DEngine { const fees = this.processFeesForEnvironment(envDef.fees, supportedChains) - const envResources = this.buildEnvResources(envDef) + const envResources: ComputeResource[] = envDef.resources + ? envDef.resources.map((r) => JSON.parse(JSON.stringify(r))) + : [] const env: ComputeEnvironment = { id: '', @@ -285,6 +213,7 @@ export class C2DEngineDocker extends C2DEngine { if (envDef.minJobDuration !== undefined) env.minJobDuration = envDef.minJobDuration if (envDef.maxJobDuration !== undefined) env.maxJobDuration = envDef.maxJobDuration if (envDef.maxJobs !== undefined) env.maxJobs = envDef.maxJobs + if (envDef.description !== undefined) env.description = envDef.description // Free tier config for this environment if (envDef.free) { @@ -307,6 +236,14 @@ export class C2DEngineDocker extends C2DEngine { '-' + create256Hash(JSON.stringify(env.fees) + envIdSuffix) + // Per-environment CPU core affinity + if (envDef.cpuCores && envDef.cpuCores.length > 0) { + this.envCpuCoresMap.set(env.id, envDef.cpuCores) + CORE_LOGGER.info( + `CPU affinity: environment ${env.id} cores [${envDef.cpuCores.join(',')}]` + ) + } + this.envs.push(env) CORE_LOGGER.info( `Engine ${this.getC2DConfig().hash}: created environment ${env.id} (index=${envIdx}, resources=${envResources.map((r) => r.id).join(',')})` @@ -1607,8 +1544,9 @@ export class C2DEngineDocker extends C2DEngine { if (job.status === C2DStatusNumber.ConfiguringVolumes) { // create the volume & create container // TO DO C2D: Choose driver & size - // get engine-level resources for Docker device/hardware configuration - const envResource = this.engineResources + // get environment-specific resources for Docker device/hardware configuration + const env = this.envs.find((e) => e.id === job.environment) + const envResource = env?.resources || [] const volume: VolumeCreateOptions = { Name: job.jobId + '-volume' } @@ -1662,7 +1600,7 @@ export class C2DEngineDocker extends C2DEngine { hostConfig.CpuPeriod = 100000 // 100 miliseconds is usually the default hostConfig.CpuQuota = Math.floor(cpus * hostConfig.CpuPeriod) // Pin the container to specific physical CPU cores - const cpusetStr = this.allocateCpus(job.jobId, cpus) + const cpusetStr = this.allocateCpus(job.jobId, cpus, job.environment) if (cpusetStr) { hostConfig.CpusetCpus = cpusetStr } @@ -1958,8 +1896,9 @@ export class C2DEngineDocker extends C2DEngine { return cores } - private allocateCpus(jobId: string, count: number): string | null { - if (this.envCpuCores.length === 0 || count <= 0) return null + private allocateCpus(jobId: string, count: number, envId: string): string | null { + const envCores = this.envCpuCoresMap.get(envId) + if (!envCores || envCores.length === 0 || count <= 0) return null const usedCores = new Set() for (const cores of this.cpuAllocations.values()) { @@ -1969,7 +1908,7 @@ export class C2DEngineDocker extends C2DEngine { } const freeCores: number[] = [] - for (const core of this.envCpuCores) { + for (const core of envCores) { if (!usedCores.has(core)) { freeCores.push(core) if (freeCores.length === count) break @@ -1978,7 +1917,7 @@ export class C2DEngineDocker extends C2DEngine { if (freeCores.length < count) { CORE_LOGGER.warn( - `CPU affinity: not enough free cores for job ${jobId} (requested=${count}, available=${freeCores.length}/${this.envCpuCores.length})` + `CPU affinity: not enough free cores for job ${jobId} in env ${envId} (requested=${count}, available=${freeCores.length}/${envCores.length})` ) return null } @@ -2003,7 +1942,7 @@ export class C2DEngineDocker extends C2DEngine { * On startup, inspects running Docker containers to rebuild the CPU allocation map. */ private async rebuildCpuAllocations(): Promise { - if (this.envCpuCores.length === 0) return + if (this.envCpuCoresMap.size === 0) return try { const jobs = await this.db.getRunningJobs(this.getC2DConfig().hash) for (const job of jobs) { diff --git a/src/components/c2d/compute_engines.ts b/src/components/c2d/compute_engines.ts index 4934d9416..26ad035f9 100644 --- a/src/components/c2d/compute_engines.ts +++ b/src/components/c2d/compute_engines.ts @@ -21,7 +21,6 @@ export class C2DEngines { // if yes, do not create multiple engines if (config && config.c2dClusters) { this.engines = [] - let cpuOffset = 0 for (const cluster of config.c2dClusters) { if (cluster.type === C2DClusterType.DOCKER) { this.engines.push( @@ -30,17 +29,9 @@ export class C2DEngines { db, escrow, keyManager, - config.dockerRegistrysAuth, - cpuOffset + config.dockerRegistrysAuth ) ) - // Advance the CPU offset by this cluster's configured CPU total - if (cluster.connection?.resources) { - const cpuRes = cluster.connection.resources.find((r: any) => r.id === 'cpu') - if (cpuRes?.total) { - cpuOffset += cpuRes.total - } - } } } } diff --git a/src/test/config.json b/src/test/config.json index 04dfd96ac..a3d475419 100644 --- a/src/test/config.json +++ b/src/test/config.json @@ -96,17 +96,20 @@ "dockerComputeEnvironments": [ { "socketPath": "/var/run/docker.sock", - "resources": [ - { - "id": "disk", - "total": 1000000000 - } - ], "environments": [ { "storageExpiry": 604800, "maxJobDuration": 3600, "minJobDuration": 60, + "resources": [ + { + "id": "disk", + "total": 1000000000, + "max": 1000000000, + "min": 0, + "type": "disk" + } + ], "fees": { "11155111": [ { diff --git a/src/test/integration/algorithmsAccess.test.ts b/src/test/integration/algorithmsAccess.test.ts index 9721c1087..6fad44fe5 100644 --- a/src/test/integration/algorithmsAccess.test.ts +++ b/src/test/integration/algorithmsAccess.test.ts @@ -97,7 +97,7 @@ describe('Trusted algorithms Flow', () => { '0xc594c6e5def4bab63ac29eed19a134c130388f74f019bc74b8f4389df2837a58', JSON.stringify(['0xe2DD09d719Da89e5a3D0F2549c7E24566e947260']), `${homedir}/.ocean/ocean-contracts/artifacts/address.json`, - '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"fees":{"' + + '[{"socketPath":"/var/run/docker.sock","environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"disk","total":10,"max":10,"min":0,"type":"disk"}],"fees":{"' + DEVELOPMENT_CHAIN_ID + '":[{"feeToken":"' + paymentToken + diff --git a/src/test/integration/compute.test.ts b/src/test/integration/compute.test.ts index eb57aecdc..68868f126 100644 --- a/src/test/integration/compute.test.ts +++ b/src/test/integration/compute.test.ts @@ -183,7 +183,7 @@ describe('Compute', () => { '0xc594c6e5def4bab63ac29eed19a134c130388f74f019bc74b8f4389df2837a58', JSON.stringify(['0xe2DD09d719Da89e5a3D0F2549c7E24566e947260']), `${homedir}/.ocean/ocean-contracts/artifacts/address.json`, - '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"fees":{"' + + '[{"socketPath":"/var/run/docker.sock","environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"disk","total":10,"max":10,"min":0,"type":"disk"}],"fees":{"' + DEVELOPMENT_CHAIN_ID + '":[{"feeToken":"' + paymentToken + @@ -2339,7 +2339,7 @@ describe('Compute Access Restrictions', () => { '0xc594c6e5def4bab63ac29eed19a134c130388f74f019bc74b8f4389df2837a58', JSON.stringify(['0xe2DD09d719Da89e5a3D0F2549c7E24566e947260']), `${homedir}/.ocean/ocean-contracts/artifacts/address.json`, - '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"access":{"addresses":["' + + '[{"socketPath":"/var/run/docker.sock","environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"disk","total":10,"max":10,"min":0,"type":"disk"}],"access":{"addresses":["' + allowedAddress + '"],"accessLists":[]},"fees":{"' + DEVELOPMENT_CHAIN_ID + @@ -2503,12 +2503,12 @@ describe('Compute Access Restrictions', () => { JSON.stringify([ { socketPath: '/var/run/docker.sock', - resources: [{ id: 'disk', total: 10 }], environments: [ { storageExpiry: 604800, maxJobDuration: 3600, minJobDuration: 60, + resources: [{ id: 'disk', total: 10, max: 10, min: 0, type: 'disk' }], access: { addresses: [], accessLists: [{ [DEVELOPMENT_CHAIN_ID]: [accessListAddress] }] @@ -2664,7 +2664,7 @@ describe('Compute Access Restrictions', () => { JSON.stringify([DEVELOPMENT_CHAIN_ID]), '0xc594c6e5def4bab63ac29eed19a134c130388f74f019bc74b8f4389df2837a58', `${homedir}/.ocean/ocean-contracts/artifacts/address.json`, - '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"paymentClaimInterval":60,"environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"fees":{"' + + '[{"socketPath":"/var/run/docker.sock","paymentClaimInterval":60,"environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"disk","total":10,"max":10,"min":0,"type":"disk"}],"fees":{"' + DEVELOPMENT_CHAIN_ID + '":[{"feeToken":"' + paymentToken + diff --git a/src/test/integration/credentials.test.ts b/src/test/integration/credentials.test.ts index 97254bac3..fa7365590 100644 --- a/src/test/integration/credentials.test.ts +++ b/src/test/integration/credentials.test.ts @@ -126,7 +126,7 @@ describe('[Credentials Flow] - Should run a complete node flow.', () => { JSON.stringify([await publisherAccount.getAddress()]), JSON.stringify([await publisherAccount.getAddress()]), `${homedir}/.ocean/ocean-contracts/artifacts/address.json`, - '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"fees":{"' + + '[{"socketPath":"/var/run/docker.sock","environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"disk","total":10,"max":10,"min":0,"type":"disk"}],"fees":{"' + DEVELOPMENT_CHAIN_ID + '":[{"feeToken":"' + paymentToken + diff --git a/src/test/integration/imageCleanup.test.ts b/src/test/integration/imageCleanup.test.ts index be03f1c90..7d1178620 100644 --- a/src/test/integration/imageCleanup.test.ts +++ b/src/test/integration/imageCleanup.test.ts @@ -33,7 +33,6 @@ describe('Docker Image Cleanup Integration Tests', () => { JSON.stringify([ { socketPath: '/var/run/docker.sock', - resources: [{ id: 'disk', total: 10 }], imageRetentionDays: 7, imageCleanupInterval: 60, // 1 minute for testing environments: [ @@ -41,6 +40,7 @@ describe('Docker Image Cleanup Integration Tests', () => { storageExpiry: 604800, maxJobDuration: 3600, minJobDuration: 60, + resources: [{ id: 'disk', total: 10, max: 10, min: 0, type: 'disk' }], fees: { '1': [ { diff --git a/src/test/unit/compute.test.ts b/src/test/unit/compute.test.ts index 0ad5f114d..a9aeb1aa7 100644 --- a/src/test/unit/compute.test.ts +++ b/src/test/unit/compute.test.ts @@ -130,7 +130,7 @@ describe('Compute Jobs Database', () => { envOverrides = buildEnvOverrideConfig( [ENVIRONMENT_VARIABLES.DOCKER_COMPUTE_ENVIRONMENTS], [ - '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"fees":{"1":[{"feeToken":"0x123","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"minJobDuration":10,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]}]' + '[{"socketPath":"/var/run/docker.sock","environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"disk","total":10,"max":10,"min":0,"type":"disk"}],"fees":{"1":[{"feeToken":"0x123","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"minJobDuration":10,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]}]' ] ) envOverrides = await setupEnvironment(TEST_ENV_CONFIG_FILE, envOverrides) diff --git a/src/utils/config/schemas.ts b/src/utils/config/schemas.ts index c132e4efb..02f8457cf 100644 --- a/src/utils/config/schemas.ts +++ b/src/utils/config/schemas.ts @@ -175,7 +175,8 @@ export const C2DEnvironmentConfigSchema = z }) .optional(), free: ComputeEnvironmentFreeOptionsSchema.optional(), - resources: z.array(z.string()).optional() // references to engine-level resource IDs + resources: z.array(ComputeResourceSchema).optional(), + cpuCores: z.array(z.number().int().min(0)).optional() }) .refine( (data) => @@ -189,6 +190,13 @@ export const C2DEnvironmentConfigSchema = z .refine((data) => data.storageExpiry >= data.maxJobDuration, { message: '"storageExpiry" should be greater than "maxJobDuration"' }) + .refine( + (data) => { + if (!data.resources) return false + return data.resources.some((r) => r.id === 'disk' && r.total) + }, + { message: 'Each environment must have a "disk" resource with a total configured' } + ) export const C2DDockerConfigSchema = z.array( z @@ -200,7 +208,6 @@ export const C2DDockerConfigSchema = z.array( caPath: z.string().optional(), certPath: z.string().optional(), keyPath: z.string().optional(), - resources: z.array(ComputeResourceSchema).optional(), imageRetentionDays: z.number().int().min(1).optional().default(7), imageCleanupInterval: z.number().int().min(3600).optional().default(86400), paymentClaimInterval: z.number().int().optional(), @@ -208,21 +215,19 @@ export const C2DDockerConfigSchema = z.array( }) .refine( (data) => { - if (!data.resources) return false - return data.resources.some((r) => r.id === 'disk' && r.total) - }, - { message: 'There is no "disk" resource configured. This is mandatory' } - ) - .transform((data) => { - if (data.resources) { - for (const resource of data.resources) { - if (resource.id === 'disk' && resource.total) { - resource.type = 'disk' + // CPU core assignments must not overlap across environments + const seenCores = new Set() + for (const env of data.environments) { + if (!env.cpuCores) continue + for (const core of env.cpuCores) { + if (seenCores.has(core)) return false + seenCores.add(core) } } - } - return data - }) + return true + }, + { message: 'CPU core assignments must not overlap across environments' } + ) ) export const C2DClusterInfoSchema = z.object({ From cd62c2ae322a801109c3120eed90e2ca02cbe87b Mon Sep 17 00:00:00 2001 From: denisiuriet Date: Sun, 29 Mar 2026 15:34:36 +0300 Subject: [PATCH 06/23] remove unused import --- src/components/c2d/compute_engine_docker.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index ee92b5e10..dd1e5c596 100755 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -1,6 +1,5 @@ /* eslint-disable security/detect-non-literal-fs-filename */ import { Readable } from 'stream' -import os from 'os' import { C2DStatusNumber, C2DStatusText, @@ -157,7 +156,6 @@ export class C2DEngineDocker extends C2DEngine { return fees } - public override async start() { const config = await getConfiguration() const envConfig = await this.getC2DConfig().connection From 00e67b7aeb0a75173c0f8e585472cf56132d05b6 Mon Sep 17 00:00:00 2001 From: denisiuriet Date: Sun, 29 Mar 2026 21:41:28 +0300 Subject: [PATCH 07/23] fix cpus shared resources --- src/components/c2d/compute_engine_base.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/components/c2d/compute_engine_base.ts b/src/components/c2d/compute_engine_base.ts index 513812038..24f922336 100644 --- a/src/components/c2d/compute_engine_base.ts +++ b/src/components/c2d/compute_engine_base.ts @@ -352,10 +352,11 @@ export abstract class C2DEngine { } } - // Resource usage is checked on ALL envs if (isRunning) { for (const resource of job.resources) { if (envResourceIds.has(resource.id)) { + const isCpu = resource.id === 'cpu' + if (isCpu && !isThisEnv) continue // CPU is partitioned, skip other envs if (!(resource.id in usedResources)) usedResources[resource.id] = 0 usedResources[resource.id] += resource.amount if (job.isFree) { From 84299093140db61ec54ffaa0b7814f18f28f64f3 Mon Sep 17 00:00:00 2001 From: denisiuriet Date: Sun, 29 Mar 2026 21:45:01 +0300 Subject: [PATCH 08/23] removed unnecessary validation --- src/utils/config/schemas.ts | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/utils/config/schemas.ts b/src/utils/config/schemas.ts index 02f8457cf..9df845e74 100644 --- a/src/utils/config/schemas.ts +++ b/src/utils/config/schemas.ts @@ -190,13 +190,6 @@ export const C2DEnvironmentConfigSchema = z .refine((data) => data.storageExpiry >= data.maxJobDuration, { message: '"storageExpiry" should be greater than "maxJobDuration"' }) - .refine( - (data) => { - if (!data.resources) return false - return data.resources.some((r) => r.id === 'disk' && r.total) - }, - { message: 'Each environment must have a "disk" resource with a total configured' } - ) export const C2DDockerConfigSchema = z.array( z From 80b0dfe15e9aed31bd1f1078231775c34a2d1168 Mon Sep 17 00:00:00 2001 From: denisiuriet Date: Sun, 29 Mar 2026 22:25:20 +0300 Subject: [PATCH 09/23] fix envs --- .github/workflows/ci.yml | 2 +- src/test/config.json | 14 ++++++++++++++ src/test/integration/algorithmsAccess.test.ts | 2 +- src/test/integration/compute.test.ts | 8 ++++---- src/test/integration/credentials.test.ts | 2 +- src/test/integration/imageCleanup.test.ts | 2 +- src/test/unit/compute.test.ts | 2 +- 7 files changed, 23 insertions(+), 9 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5d265047c..400f17c81 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -290,7 +290,7 @@ jobs: DB_PASSWORD: 'changeme' MAX_REQ_PER_MINUTE: 320 MAX_CONNECTIONS_PER_MINUTE: 320 - DOCKER_COMPUTE_ENVIRONMENTS: '[{"socketPath":"/var/run/docker.sock","environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"disk","total":10,"max":10,"min":0,"type":"disk"}],"fees":{"8996":[{"prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]}]' + DOCKER_COMPUTE_ENVIRONMENTS: '[{"socketPath":"/var/run/docker.sock","environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"cpu","total":4,"max":4,"min":1,"type":"cpu"},{"id":"ram","total":10,"max":10,"min":1,"type":"ram"},{"id":"disk","total":10,"max":10,"min":0,"type":"disk"}],"fees":{"8996":[{"prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]}]' DOCKER_REGISTRY_AUTHS: ${{ env.DOCKER_REGISTRY_AUTHS }} - name: Check Ocean Node is running run: | diff --git a/src/test/config.json b/src/test/config.json index a3d475419..90b32b9f8 100644 --- a/src/test/config.json +++ b/src/test/config.json @@ -102,6 +102,20 @@ "maxJobDuration": 3600, "minJobDuration": 60, "resources": [ + { + "id": "cpu", + "total": 4, + "max": 4, + "min": 1, + "type": "cpu" + }, + { + "id": "ram", + "total": 10, + "max": 10, + "min": 1, + "type": "ram" + }, { "id": "disk", "total": 1000000000, diff --git a/src/test/integration/algorithmsAccess.test.ts b/src/test/integration/algorithmsAccess.test.ts index 6fad44fe5..677f7343c 100644 --- a/src/test/integration/algorithmsAccess.test.ts +++ b/src/test/integration/algorithmsAccess.test.ts @@ -97,7 +97,7 @@ describe('Trusted algorithms Flow', () => { '0xc594c6e5def4bab63ac29eed19a134c130388f74f019bc74b8f4389df2837a58', JSON.stringify(['0xe2DD09d719Da89e5a3D0F2549c7E24566e947260']), `${homedir}/.ocean/ocean-contracts/artifacts/address.json`, - '[{"socketPath":"/var/run/docker.sock","environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"disk","total":10,"max":10,"min":0,"type":"disk"}],"fees":{"' + + '[{"socketPath":"/var/run/docker.sock","environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"cpu","total":4,"max":4,"min":1,"type":"cpu"},{"id":"ram","total":10,"max":10,"min":1,"type":"ram"},{"id":"disk","total":10,"max":10,"min":0,"type":"disk"}],"fees":{"' + DEVELOPMENT_CHAIN_ID + '":[{"feeToken":"' + paymentToken + diff --git a/src/test/integration/compute.test.ts b/src/test/integration/compute.test.ts index 68868f126..c13c863a3 100644 --- a/src/test/integration/compute.test.ts +++ b/src/test/integration/compute.test.ts @@ -183,7 +183,7 @@ describe('Compute', () => { '0xc594c6e5def4bab63ac29eed19a134c130388f74f019bc74b8f4389df2837a58', JSON.stringify(['0xe2DD09d719Da89e5a3D0F2549c7E24566e947260']), `${homedir}/.ocean/ocean-contracts/artifacts/address.json`, - '[{"socketPath":"/var/run/docker.sock","environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"disk","total":10,"max":10,"min":0,"type":"disk"}],"fees":{"' + + '[{"socketPath":"/var/run/docker.sock","environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"cpu","total":4,"max":4,"min":1,"type":"cpu"},{"id":"ram","total":10,"max":10,"min":1,"type":"ram"},{"id":"disk","total":10,"max":10,"min":0,"type":"disk"}],"fees":{"' + DEVELOPMENT_CHAIN_ID + '":[{"feeToken":"' + paymentToken + @@ -2339,7 +2339,7 @@ describe('Compute Access Restrictions', () => { '0xc594c6e5def4bab63ac29eed19a134c130388f74f019bc74b8f4389df2837a58', JSON.stringify(['0xe2DD09d719Da89e5a3D0F2549c7E24566e947260']), `${homedir}/.ocean/ocean-contracts/artifacts/address.json`, - '[{"socketPath":"/var/run/docker.sock","environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"disk","total":10,"max":10,"min":0,"type":"disk"}],"access":{"addresses":["' + + '[{"socketPath":"/var/run/docker.sock","environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"cpu","total":4,"max":4,"min":1,"type":"cpu"},{"id":"ram","total":10,"max":10,"min":1,"type":"ram"},{"id":"disk","total":10,"max":10,"min":0,"type":"disk"}],"access":{"addresses":["' + allowedAddress + '"],"accessLists":[]},"fees":{"' + DEVELOPMENT_CHAIN_ID + @@ -2508,7 +2508,7 @@ describe('Compute Access Restrictions', () => { storageExpiry: 604800, maxJobDuration: 3600, minJobDuration: 60, - resources: [{ id: 'disk', total: 10, max: 10, min: 0, type: 'disk' }], + resources: [{ id: 'cpu', total: 4, max: 4, min: 1, type: 'cpu' }, { id: 'ram', total: 10, max: 10, min: 1, type: 'ram' }, { id: 'disk', total: 10, max: 10, min: 0, type: 'disk' }], access: { addresses: [], accessLists: [{ [DEVELOPMENT_CHAIN_ID]: [accessListAddress] }] @@ -2664,7 +2664,7 @@ describe('Compute Access Restrictions', () => { JSON.stringify([DEVELOPMENT_CHAIN_ID]), '0xc594c6e5def4bab63ac29eed19a134c130388f74f019bc74b8f4389df2837a58', `${homedir}/.ocean/ocean-contracts/artifacts/address.json`, - '[{"socketPath":"/var/run/docker.sock","paymentClaimInterval":60,"environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"disk","total":10,"max":10,"min":0,"type":"disk"}],"fees":{"' + + '[{"socketPath":"/var/run/docker.sock","paymentClaimInterval":60,"environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"cpu","total":4,"max":4,"min":1,"type":"cpu"},{"id":"ram","total":10,"max":10,"min":1,"type":"ram"},{"id":"disk","total":10,"max":10,"min":0,"type":"disk"}],"fees":{"' + DEVELOPMENT_CHAIN_ID + '":[{"feeToken":"' + paymentToken + diff --git a/src/test/integration/credentials.test.ts b/src/test/integration/credentials.test.ts index fa7365590..6bd7d83ba 100644 --- a/src/test/integration/credentials.test.ts +++ b/src/test/integration/credentials.test.ts @@ -126,7 +126,7 @@ describe('[Credentials Flow] - Should run a complete node flow.', () => { JSON.stringify([await publisherAccount.getAddress()]), JSON.stringify([await publisherAccount.getAddress()]), `${homedir}/.ocean/ocean-contracts/artifacts/address.json`, - '[{"socketPath":"/var/run/docker.sock","environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"disk","total":10,"max":10,"min":0,"type":"disk"}],"fees":{"' + + '[{"socketPath":"/var/run/docker.sock","environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"cpu","total":4,"max":4,"min":1,"type":"cpu"},{"id":"ram","total":10,"max":10,"min":1,"type":"ram"},{"id":"disk","total":10,"max":10,"min":0,"type":"disk"}],"fees":{"' + DEVELOPMENT_CHAIN_ID + '":[{"feeToken":"' + paymentToken + diff --git a/src/test/integration/imageCleanup.test.ts b/src/test/integration/imageCleanup.test.ts index 7d1178620..ef501465c 100644 --- a/src/test/integration/imageCleanup.test.ts +++ b/src/test/integration/imageCleanup.test.ts @@ -40,7 +40,7 @@ describe('Docker Image Cleanup Integration Tests', () => { storageExpiry: 604800, maxJobDuration: 3600, minJobDuration: 60, - resources: [{ id: 'disk', total: 10, max: 10, min: 0, type: 'disk' }], + resources: [{ id: 'cpu', total: 4, max: 4, min: 1, type: 'cpu' }, { id: 'ram', total: 10, max: 10, min: 1, type: 'ram' }, { id: 'disk', total: 10, max: 10, min: 0, type: 'disk' }], fees: { '1': [ { diff --git a/src/test/unit/compute.test.ts b/src/test/unit/compute.test.ts index a9aeb1aa7..1164213f8 100644 --- a/src/test/unit/compute.test.ts +++ b/src/test/unit/compute.test.ts @@ -130,7 +130,7 @@ describe('Compute Jobs Database', () => { envOverrides = buildEnvOverrideConfig( [ENVIRONMENT_VARIABLES.DOCKER_COMPUTE_ENVIRONMENTS], [ - '[{"socketPath":"/var/run/docker.sock","environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"disk","total":10,"max":10,"min":0,"type":"disk"}],"fees":{"1":[{"feeToken":"0x123","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"minJobDuration":10,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]}]' + '[{"socketPath":"/var/run/docker.sock","environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"cpu","total":4,"max":4,"min":1,"type":"cpu"},{"id":"ram","total":10,"max":10,"min":1,"type":"ram"},{"id":"disk","total":10,"max":10,"min":0,"type":"disk"}],"fees":{"1":[{"feeToken":"0x123","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"minJobDuration":10,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]}]' ] ) envOverrides = await setupEnvironment(TEST_ENV_CONFIG_FILE, envOverrides) From df300cad7c8be50c0e235bce7729a5ea5dc61164 Mon Sep 17 00:00:00 2001 From: denisiuriet Date: Sun, 29 Mar 2026 22:28:44 +0300 Subject: [PATCH 10/23] fix lint errors --- src/test/integration/compute.test.ts | 6 +++++- src/test/integration/imageCleanup.test.ts | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/test/integration/compute.test.ts b/src/test/integration/compute.test.ts index c13c863a3..0eb42ed28 100644 --- a/src/test/integration/compute.test.ts +++ b/src/test/integration/compute.test.ts @@ -2508,7 +2508,11 @@ describe('Compute Access Restrictions', () => { storageExpiry: 604800, maxJobDuration: 3600, minJobDuration: 60, - resources: [{ id: 'cpu', total: 4, max: 4, min: 1, type: 'cpu' }, { id: 'ram', total: 10, max: 10, min: 1, type: 'ram' }, { id: 'disk', total: 10, max: 10, min: 0, type: 'disk' }], + resources: [ + { id: 'cpu', total: 4, max: 4, min: 1, type: 'cpu' }, + { id: 'ram', total: 10, max: 10, min: 1, type: 'ram' }, + { id: 'disk', total: 10, max: 10, min: 0, type: 'disk' } + ], access: { addresses: [], accessLists: [{ [DEVELOPMENT_CHAIN_ID]: [accessListAddress] }] diff --git a/src/test/integration/imageCleanup.test.ts b/src/test/integration/imageCleanup.test.ts index ef501465c..31f60d561 100644 --- a/src/test/integration/imageCleanup.test.ts +++ b/src/test/integration/imageCleanup.test.ts @@ -40,7 +40,11 @@ describe('Docker Image Cleanup Integration Tests', () => { storageExpiry: 604800, maxJobDuration: 3600, minJobDuration: 60, - resources: [{ id: 'cpu', total: 4, max: 4, min: 1, type: 'cpu' }, { id: 'ram', total: 10, max: 10, min: 1, type: 'ram' }, { id: 'disk', total: 10, max: 10, min: 0, type: 'disk' }], + resources: [ + { id: 'cpu', total: 4, max: 4, min: 1, type: 'cpu' }, + { id: 'ram', total: 10, max: 10, min: 1, type: 'ram' }, + { id: 'disk', total: 10, max: 10, min: 0, type: 'disk' } + ], fees: { '1': [ { From cd6fbfd526c6566aa4f2abaf4eba28f3f39f3222 Mon Sep 17 00:00:00 2001 From: denisiuriet Date: Mon, 30 Mar 2026 10:19:51 +0300 Subject: [PATCH 11/23] add back the sysinfo resources --- src/components/c2d/compute_engine_docker.ts | 44 +++++++++++++++++++-- 1 file changed, 41 insertions(+), 3 deletions(-) diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index 3dd234d31..5eb0b9a71 100755 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -1,5 +1,6 @@ /* eslint-disable security/detect-non-literal-fs-filename */ import { Readable } from 'stream' +import os from 'os' import { C2DStatusNumber, C2DStatusText, @@ -187,9 +188,46 @@ export class C2DEngineDocker extends C2DEngine { const fees = this.processFeesForEnvironment(envDef.fees, supportedChains) - const envResources: ComputeResource[] = envDef.resources - ? envDef.resources.map((r) => JSON.parse(JSON.stringify(r))) - : [] + const envResources: ComputeResource[] = [] + const cpuResources = { + id: 'cpu', + type: 'cpu', + total: sysinfo.NCPU, + max: sysinfo.NCPU, + min: 1, + description: os.cpus()[0].model + } + const ramResources = { + id: 'ram', + type: 'ram', + total: Math.floor(sysinfo.MemTotal / 1024 / 1024 / 1024), + max: Math.floor(sysinfo.MemTotal / 1024 / 1024 / 1024), + min: 1 + } + + if (envDef.resources) { + for (const res of envDef.resources) { + // allow user to add other resources + if (res.id === 'cpu') { + if (res.total) cpuResources.total = res.total + if (res.max) cpuResources.max = res.max + if (res.min) cpuResources.min = res.min + } + if (res.id === 'ram') { + if (res.total) ramResources.total = res.total + if (res.max) ramResources.max = res.max + if (res.min) ramResources.min = res.min + } + + if (res.id !== 'cpu' && res.id !== 'ram') { + if (!res.max) res.max = res.total + if (!res.min) res.min = 0 + envResources.push(res) + } + } + } + envResources.push(cpuResources) + envResources.push(ramResources) const env: ComputeEnvironment = { id: '', From f6422f2ed0f3da98e690f6f757b541199c37af18 Mon Sep 17 00:00:00 2001 From: denisiuriet Date: Mon, 30 Mar 2026 11:32:39 +0300 Subject: [PATCH 12/23] check used resources on all envs for disk and ram --- src/components/c2d/compute_engine_base.ts | 62 +++++++++++++++++++-- src/components/c2d/compute_engine_docker.ts | 11 ++-- src/components/core/compute/startCompute.ts | 12 ++-- 3 files changed, 70 insertions(+), 15 deletions(-) diff --git a/src/components/c2d/compute_engine_base.ts b/src/components/c2d/compute_engine_base.ts index 24f922336..ceb5333ac 100644 --- a/src/components/c2d/compute_engine_base.ts +++ b/src/components/c2d/compute_engine_base.ts @@ -317,7 +317,7 @@ export abstract class C2DEngine { CORE_LOGGER.error('Failed to get running jobs:' + e.message) } - const envResourceIds = new Set((env.resources || []).map((r) => r.id)) + const envResourceMap = new Map((env.resources || []).map((r) => [r.id, r])) let totalJobs = 0 let totalFreeJobs = 0 @@ -354,9 +354,12 @@ export abstract class C2DEngine { if (isRunning) { for (const resource of job.resources) { - if (envResourceIds.has(resource.id)) { - const isCpu = resource.id === 'cpu' - if (isCpu && !isThisEnv) continue // CPU is partitioned, skip other envs + const envRes = envResourceMap.get(resource.id) + if (envRes) { + // GPUs are shared-exclusive: inUse tracked globally across all envs + // Everything else (cpu, ram, disk) is per-env exclusive + const isSharedExclusive = envRes.type === 'gpu' + if (!isSharedExclusive && !isThisEnv) continue if (!(resource.id in usedResources)) usedResources[resource.id] = 0 usedResources[resource.id] += resource.amount if (job.isFree) { @@ -381,12 +384,40 @@ export abstract class C2DEngine { } } + private checkGlobalResourceAvailability( + allEnvironments: ComputeEnvironment[], + resourceId: string, + amount: number, + isFree: boolean + ) { + let globalUsed = 0 + let globalTotal = 0 + for (const e of allEnvironments) { + const res = isFree + ? e.free + ? this.getResource(e.free.resources, resourceId) + : null + : this.getResource(e.resources, resourceId) + if (res) { + globalTotal += res.total || 0 + globalUsed += res.inUse || 0 + } + } + const globalRemainder = globalTotal - globalUsed + if (globalRemainder < amount) { + throw new Error( + `Not enough available ${resourceId} globally (remaining: ${globalRemainder}, requested: ${amount})` + ) + } + } + // overridden by each engine if required // eslint-disable-next-line require-await public async checkIfResourcesAreAvailable( resourcesRequest: ComputeResourceRequest[], env: ComputeEnvironment, - isFree: boolean + isFree: boolean, + allEnvironments?: ComputeEnvironment[] ) { // Filter out resources with amount 0 as they're not actually being requested const activeResources = resourcesRequest.filter((r) => r.amount > 0) @@ -396,12 +427,33 @@ export abstract class C2DEngine { if (!envResource) throw new Error(`No such resource ${request.id}`) if (envResource.total - envResource.inUse < request.amount) throw new Error(`Not enough available ${request.id}`) + + // Global check for non-GPU resources (cpu, ram, disk are per-env exclusive) + // GPUs are shared-exclusive so their inUse already reflects global usage + if (allEnvironments && envResource.type !== 'gpu') { + this.checkGlobalResourceAvailability( + allEnvironments, + request.id, + request.amount, + false + ) + } + if (isFree) { if (!env.free) throw new Error(`No free resources`) envResource = this.getResource(env.free?.resources, request.id) if (!envResource) throw new Error(`No such free resource ${request.id}`) if (envResource.total - envResource.inUse < request.amount) throw new Error(`Not enough available ${request.id} for free`) + + if (allEnvironments && envResource.type !== 'gpu') { + this.checkGlobalResourceAvailability( + allEnvironments, + request.id, + request.amount, + true + ) + } } } if ('maxJobs' in env && env.maxJobs && env.runningJobs + 1 > env.maxJobs) { diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index 5eb0b9a71..ebfa3296e 100755 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -1553,12 +1553,11 @@ export class C2DEngineDocker extends C2DEngine { } // check if resources are available now try { - const env = await this.getComputeEnvironment( - job.payment && job.payment.chainId ? job.payment.chainId : null, - job.environment, - null - ) - await this.checkIfResourcesAreAvailable(job.resources, env, job.isFree) + const chainId = job.payment && job.payment.chainId ? job.payment.chainId : null + const allEnvs = await this.getComputeEnvironments(chainId) + const env = allEnvs.find((e) => e.id === job.environment) + if (!env) throw new Error(`Environment ${job.environment} not found`) + await this.checkIfResourcesAreAvailable(job.resources, env, job.isFree, allEnvs) } catch (err) { // resources are still not available return diff --git a/src/components/core/compute/startCompute.ts b/src/components/core/compute/startCompute.ts index e43c668fa..311f27401 100644 --- a/src/components/core/compute/startCompute.ts +++ b/src/components/core/compute/startCompute.ts @@ -29,6 +29,7 @@ import { import { EncryptMethod } from '../../../@types/fileObject.js' import { ComputeAccessList, + ComputeEnvironment, ComputeResourceRequestWithPrice } from '../../../@types/C2D/C2D.js' // import { verifyProviderFees } from '../utils/feesHandler.js' @@ -119,8 +120,10 @@ export class PaidComputeStartHandler extends CommonComputeHandler { } } + let allEnvs: ComputeEnvironment[] try { - env = await engine.getComputeEnvironment(null, task.environment) + allEnvs = await engine.getComputeEnvironments() + env = allEnvs.find((e) => e.id === task.environment) if (!env) { return { stream: null, @@ -150,7 +153,7 @@ export class PaidComputeStartHandler extends CommonComputeHandler { } } try { - await engine.checkIfResourcesAreAvailable(task.resources, env, false) + await engine.checkIfResourcesAreAvailable(task.resources, env, false, allEnvs) } catch (e) { if (task.queueMaxWaitTime > 0) { CORE_LOGGER.verbose( @@ -885,7 +888,8 @@ export class FreeComputeStartHandler extends CommonComputeHandler { } } } - const env = await engine.getComputeEnvironment(null, task.environment) + const allFreeEnvs = await engine.getComputeEnvironments() + const env = allFreeEnvs.find((e) => e.id === task.environment) if (!env) { return { stream: null, @@ -931,7 +935,7 @@ export class FreeComputeStartHandler extends CommonComputeHandler { } } try { - await engine.checkIfResourcesAreAvailable(task.resources, env, true) + await engine.checkIfResourcesAreAvailable(task.resources, env, true, allFreeEnvs) } catch (e) { if (task.queueMaxWaitTime > 0) { CORE_LOGGER.verbose( From e109913d93a8f6342cf30ceb165cf8050d712ab3 Mon Sep 17 00:00:00 2001 From: denisiuriet Date: Mon, 30 Mar 2026 13:29:53 +0300 Subject: [PATCH 13/23] remove free global check, fix algoStartTime issue --- src/components/c2d/compute_engine_base.ts | 36 +++++++---------------- 1 file changed, 10 insertions(+), 26 deletions(-) diff --git a/src/components/c2d/compute_engine_base.ts b/src/components/c2d/compute_engine_base.ts index ceb5333ac..46ad0c16d 100644 --- a/src/components/c2d/compute_engine_base.ts +++ b/src/components/c2d/compute_engine_base.ts @@ -334,13 +334,16 @@ export abstract class C2DEngine { if (isThisEnv) { if (isRunning) { - const timeElapsed = - new Date().getTime() / 1000 - Number.parseFloat(job?.algoStartTimestamp) totalJobs++ - maxRunningTime += job.maxJobDuration - timeElapsed + const algoStart = Number.parseFloat(job?.algoStartTimestamp) + const timeRemaining = + algoStart > 0 + ? job.maxJobDuration - (new Date().getTime() / 1000 - algoStart) + : job.maxJobDuration + maxRunningTime += timeRemaining if (job.isFree) { totalFreeJobs++ - maxRunningTimeFree += job.maxJobDuration - timeElapsed + maxRunningTimeFree += timeRemaining } } else { queuedJobs++ @@ -387,17 +390,12 @@ export abstract class C2DEngine { private checkGlobalResourceAvailability( allEnvironments: ComputeEnvironment[], resourceId: string, - amount: number, - isFree: boolean + amount: number ) { let globalUsed = 0 let globalTotal = 0 for (const e of allEnvironments) { - const res = isFree - ? e.free - ? this.getResource(e.free.resources, resourceId) - : null - : this.getResource(e.resources, resourceId) + const res = this.getResource(e.resources, resourceId) if (res) { globalTotal += res.total || 0 globalUsed += res.inUse || 0 @@ -431,12 +429,7 @@ export abstract class C2DEngine { // Global check for non-GPU resources (cpu, ram, disk are per-env exclusive) // GPUs are shared-exclusive so their inUse already reflects global usage if (allEnvironments && envResource.type !== 'gpu') { - this.checkGlobalResourceAvailability( - allEnvironments, - request.id, - request.amount, - false - ) + this.checkGlobalResourceAvailability(allEnvironments, request.id, request.amount) } if (isFree) { @@ -445,15 +438,6 @@ export abstract class C2DEngine { if (!envResource) throw new Error(`No such free resource ${request.id}`) if (envResource.total - envResource.inUse < request.amount) throw new Error(`Not enough available ${request.id} for free`) - - if (allEnvironments && envResource.type !== 'gpu') { - this.checkGlobalResourceAvailability( - allEnvironments, - request.id, - request.amount, - true - ) - } } } if ('maxJobs' in env && env.maxJobs && env.runningJobs + 1 > env.maxJobs) { From c1bbd3a764223f0e9e15546958025d7ff83d1e06 Mon Sep 17 00:00:00 2001 From: denisiuriet Date: Mon, 30 Mar 2026 13:35:38 +0300 Subject: [PATCH 14/23] fix env.example --- .env.example | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/.env.example b/.env.example index 00feed7b0..74613d7d2 100644 --- a/.env.example +++ b/.env.example @@ -66,9 +66,11 @@ export P2P_FILTER_ANNOUNCED_ADDRESSES= ## compute # Each environment defines its own resources (CPU, RAM, disk, GPUs) with full configuration. -# GPUs, RAM, and disk are shared-exclusive: visible in multiple envs, inUse tracked globally. -# CPU cores are hard-partitioned per environment via cpuCores (e.g., envA gets cores 0-9, envB gets 10-19). -# If a job on envA uses gpu0, it shows as in-use on envB too. +# CPU, RAM, and disk are per-env exclusive: inUse tracked only within the environment where the job runs. +# A global check ensures the aggregate usage across all environments does not exceed physical capacity. +# GPUs are shared-exclusive: if a job on envA uses gpu0, it shows as in-use on envB too. +# CPU cores can be hard-partitioned per environment via cpuCores (e.g., envA gets cores 0-3, envB gets 4-7). +# CPU and RAM defaults are auto-detected from the system when not configured. # export DOCKER_COMPUTE_ENVIRONMENTS='[{"socketPath":"/var/run/docker.sock","environments":[{"id":"envA","cpuCores":[0,1,2,3],"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"cpu","total":4,"max":4,"min":1,"type":"cpu"},{"id":"ram","total":16,"max":16,"min":1,"type":"ram"},{"id":"disk","total":500,"max":500,"min":10,"type":"disk"},{"id":"gpu0","total":1,"max":1,"min":0,"type":"gpu","init":{"deviceRequests":{"Driver":"nvidia","DeviceIDs":["0"],"Capabilities":[["gpu"]]}}}],"fees":{"1":[{"feeToken":"0x123","prices":[{"id":"cpu","price":1},{"id":"ram","price":0.1},{"id":"disk","price":0.01},{"id":"gpu0","price":5}]}]}}]}]' export DOCKER_COMPUTE_ENVIRONMENTS= From 3be45dc437527f03f7e7e14dd5cea142b7403caa Mon Sep 17 00:00:00 2001 From: denisiuriet Date: Mon, 30 Mar 2026 15:11:20 +0300 Subject: [PATCH 15/23] remove cpu cores from config --- .env.example | 4 ++-- src/@types/C2D/C2D.ts | 1 - src/components/c2d/compute_engine_docker.ts | 21 +++++++++++++-------- src/utils/config/schemas.ts | 18 +----------------- 4 files changed, 16 insertions(+), 28 deletions(-) diff --git a/.env.example b/.env.example index 74613d7d2..5b151e342 100644 --- a/.env.example +++ b/.env.example @@ -69,9 +69,9 @@ export P2P_FILTER_ANNOUNCED_ADDRESSES= # CPU, RAM, and disk are per-env exclusive: inUse tracked only within the environment where the job runs. # A global check ensures the aggregate usage across all environments does not exceed physical capacity. # GPUs are shared-exclusive: if a job on envA uses gpu0, it shows as in-use on envB too. -# CPU cores can be hard-partitioned per environment via cpuCores (e.g., envA gets cores 0-3, envB gets 4-7). +# CPU cores are automatically partitioned across environments based on each env's cpu.total. # CPU and RAM defaults are auto-detected from the system when not configured. -# export DOCKER_COMPUTE_ENVIRONMENTS='[{"socketPath":"/var/run/docker.sock","environments":[{"id":"envA","cpuCores":[0,1,2,3],"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"cpu","total":4,"max":4,"min":1,"type":"cpu"},{"id":"ram","total":16,"max":16,"min":1,"type":"ram"},{"id":"disk","total":500,"max":500,"min":10,"type":"disk"},{"id":"gpu0","total":1,"max":1,"min":0,"type":"gpu","init":{"deviceRequests":{"Driver":"nvidia","DeviceIDs":["0"],"Capabilities":[["gpu"]]}}}],"fees":{"1":[{"feeToken":"0x123","prices":[{"id":"cpu","price":1},{"id":"ram","price":0.1},{"id":"disk","price":0.01},{"id":"gpu0","price":5}]}]}}]}]' +# export DOCKER_COMPUTE_ENVIRONMENTS='[{"socketPath":"/var/run/docker.sock","environments":[{"id":"envA","storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"cpu","total":4,"max":4,"min":1,"type":"cpu"},{"id":"ram","total":16,"max":16,"min":1,"type":"ram"},{"id":"disk","total":500,"max":500,"min":10,"type":"disk"},{"id":"gpu0","total":1,"max":1,"min":0,"type":"gpu","init":{"deviceRequests":{"Driver":"nvidia","DeviceIDs":["0"],"Capabilities":[["gpu"]]}}}],"fees":{"1":[{"feeToken":"0x123","prices":[{"id":"cpu","price":1},{"id":"ram","price":0.1},{"id":"disk","price":0.01},{"id":"gpu0","price":5}]}]}}]}]' export DOCKER_COMPUTE_ENVIRONMENTS= diff --git a/src/@types/C2D/C2D.ts b/src/@types/C2D/C2D.ts index f2d8d01d5..8408f4d67 100644 --- a/src/@types/C2D/C2D.ts +++ b/src/@types/C2D/C2D.ts @@ -150,7 +150,6 @@ export interface C2DEnvironmentConfig { access?: ComputeAccessList free?: ComputeEnvironmentFreeOptions resources?: ComputeResource[] - cpuCores?: number[] // explicit physical CPU core indices for this environment } export interface C2DDockerConfig { diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index ebfa3296e..4b4c0d918 100755 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -272,20 +272,25 @@ export class C2DEngineDocker extends C2DEngine { '-' + create256Hash(JSON.stringify(env.fees) + envIdSuffix) - // Per-environment CPU core affinity - if (envDef.cpuCores && envDef.cpuCores.length > 0) { - this.envCpuCoresMap.set(env.id, envDef.cpuCores) - CORE_LOGGER.info( - `CPU affinity: environment ${env.id} cores [${envDef.cpuCores.join(',')}]` - ) - } - this.envs.push(env) CORE_LOGGER.info( `Engine ${this.getC2DConfig().hash}: created environment ${env.id} (index=${envIdx}, resources=${envResources.map((r) => r.id).join(',')})` ) } + let cpuOffset = 0 + for (const env of this.envs) { + const cpuRes = this.getResource(env.resources, 'cpu') + if (cpuRes && cpuRes.total > 0) { + const cores = Array.from({ length: cpuRes.total }, (_, i) => cpuOffset + i) + this.envCpuCoresMap.set(env.id, cores) + CORE_LOGGER.info( + `CPU affinity: environment ${env.id} cores ${cores[0]}-${cores[cores.length - 1]} (offset=${cpuOffset}, total=${cpuRes.total})` + ) + cpuOffset += cpuRes.total + } + } + // Rebuild CPU allocations from running containers (handles node restart) await this.rebuildCpuAllocations() diff --git a/src/utils/config/schemas.ts b/src/utils/config/schemas.ts index 9df845e74..d5afe69b1 100644 --- a/src/utils/config/schemas.ts +++ b/src/utils/config/schemas.ts @@ -175,8 +175,7 @@ export const C2DEnvironmentConfigSchema = z }) .optional(), free: ComputeEnvironmentFreeOptionsSchema.optional(), - resources: z.array(ComputeResourceSchema).optional(), - cpuCores: z.array(z.number().int().min(0)).optional() + resources: z.array(ComputeResourceSchema).optional() }) .refine( (data) => @@ -206,21 +205,6 @@ export const C2DDockerConfigSchema = z.array( paymentClaimInterval: z.number().int().optional(), environments: z.array(C2DEnvironmentConfigSchema).min(1) }) - .refine( - (data) => { - // CPU core assignments must not overlap across environments - const seenCores = new Set() - for (const env of data.environments) { - if (!env.cpuCores) continue - for (const core of env.cpuCores) { - if (seenCores.has(core)) return false - seenCores.add(core) - } - } - return true - }, - { message: 'CPU core assignments must not overlap across environments' } - ) ) export const C2DClusterInfoSchema = z.object({ From 180801a632d159f23709ab45ba7e1f9caa851cdd Mon Sep 17 00:00:00 2001 From: denisiuriet Date: Mon, 30 Mar 2026 15:21:15 +0300 Subject: [PATCH 16/23] fix lint issue --- src/utils/config/schemas.ts | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/src/utils/config/schemas.ts b/src/utils/config/schemas.ts index d5afe69b1..80a9a69d8 100644 --- a/src/utils/config/schemas.ts +++ b/src/utils/config/schemas.ts @@ -191,20 +191,19 @@ export const C2DEnvironmentConfigSchema = z }) export const C2DDockerConfigSchema = z.array( - z - .object({ - socketPath: z.string().optional(), - protocol: z.string().optional(), - host: z.string().optional(), - port: z.number().optional(), - caPath: z.string().optional(), - certPath: z.string().optional(), - keyPath: z.string().optional(), - imageRetentionDays: z.number().int().min(1).optional().default(7), - imageCleanupInterval: z.number().int().min(3600).optional().default(86400), - paymentClaimInterval: z.number().int().optional(), - environments: z.array(C2DEnvironmentConfigSchema).min(1) - }) + z.object({ + socketPath: z.string().optional(), + protocol: z.string().optional(), + host: z.string().optional(), + port: z.number().optional(), + caPath: z.string().optional(), + certPath: z.string().optional(), + keyPath: z.string().optional(), + imageRetentionDays: z.number().int().min(1).optional().default(7), + imageCleanupInterval: z.number().int().min(3600).optional().default(86400), + paymentClaimInterval: z.number().int().optional(), + environments: z.array(C2DEnvironmentConfigSchema).min(1) + }) ) export const C2DClusterInfoSchema = z.object({ From 5b950b2d3e3d7262f6924e1987c919e5fb9b7995 Mon Sep 17 00:00:00 2001 From: denisiuriet Date: Mon, 30 Mar 2026 16:55:13 +0300 Subject: [PATCH 17/23] add check based on total physical limit --- src/components/c2d/compute_engine_base.ts | 6 ++++++ src/components/c2d/compute_engine_docker.ts | 3 +++ 2 files changed, 9 insertions(+) diff --git a/src/components/c2d/compute_engine_base.ts b/src/components/c2d/compute_engine_base.ts index 46ad0c16d..d6c08dfa4 100644 --- a/src/components/c2d/compute_engine_base.ts +++ b/src/components/c2d/compute_engine_base.ts @@ -387,6 +387,8 @@ export abstract class C2DEngine { } } + protected physicalLimits: Map = new Map() + private checkGlobalResourceAvailability( allEnvironments: ComputeEnvironment[], resourceId: string, @@ -401,6 +403,10 @@ export abstract class C2DEngine { globalUsed += res.inUse || 0 } } + const physicalLimit = this.physicalLimits.get(resourceId) + if (physicalLimit !== undefined && globalTotal > physicalLimit) { + globalTotal = physicalLimit + } const globalRemainder = globalTotal - globalUsed if (globalRemainder < amount) { throw new Error( diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index 4b4c0d918..ac3fd77f9 100755 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -169,6 +169,9 @@ export class C2DEngineDocker extends C2DEngine { return } + this.physicalLimits.set('cpu', sysinfo.NCPU) + this.physicalLimits.set('ram', Math.floor(sysinfo.MemTotal / 1024 / 1024 / 1024)) + // Determine supported chains const supportedChains: number[] = [] if (config.supportedNetworks) { From 0b3ff90196a3043d1bfbe4d33a8b965f88771a5b Mon Sep 17 00:00:00 2001 From: denisiuriet Date: Mon, 30 Mar 2026 17:37:59 +0300 Subject: [PATCH 18/23] fixed conflict --- src/components/c2d/compute_engine_base.ts | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/src/components/c2d/compute_engine_base.ts b/src/components/c2d/compute_engine_base.ts index 86632ab00..96708ee45 100644 --- a/src/components/c2d/compute_engine_base.ts +++ b/src/components/c2d/compute_engine_base.ts @@ -338,15 +338,10 @@ export abstract class C2DEngine { ? new Date().getTime() / 1000 - Number.parseFloat(job?.buildStartTimestamp) : new Date().getTime() / 1000 - Number.parseFloat(job?.algoStartTimestamp) totalJobs++ - const algoStart = Number.parseFloat(job?.algoStartTimestamp) - const timeRemaining = - algoStart > 0 - ? job.maxJobDuration - (new Date().getTime() / 1000 - algoStart) - : job.maxJobDuration - maxRunningTime += timeRemaining + maxRunningTime += job.maxJobDuration - timeElapsed if (job.isFree) { totalFreeJobs++ - maxRunningTimeFree += timeRemaining + maxRunningTimeFree += job.maxJobDuration - timeElapsed } } else { queuedJobs++ From 865f65803160fe098959a2d469c19e96144b0e2d Mon Sep 17 00:00:00 2001 From: denisiuriet Date: Mon, 30 Mar 2026 23:13:03 +0300 Subject: [PATCH 19/23] fix errors raised --- src/utils/config/schemas.ts | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/utils/config/schemas.ts b/src/utils/config/schemas.ts index b4cc74eab..8d98b59ae 100644 --- a/src/utils/config/schemas.ts +++ b/src/utils/config/schemas.ts @@ -142,6 +142,7 @@ export const ComputeEnvFeesSchema = z.object({ }) export const ComputeEnvironmentFreeOptionsSchema = z.object({ + minJobDuration: z.number().int().optional().default(60), maxJobDuration: z.number().int().optional().default(3600), maxJobs: z.number().int().optional().default(3), resources: z.array(ComputeResourceSchema).optional(), @@ -190,6 +191,23 @@ export const C2DEnvironmentConfigSchema = z .refine((data) => data.storageExpiry >= data.maxJobDuration, { message: '"storageExpiry" should be greater than "maxJobDuration"' }) + .refine( + (data) => { + if (!data.resources) return false + return data.resources.some((r) => r.id === 'disk' && r.total) + }, + { message: 'There is no "disk" resource configured. This is mandatory' } + ) + .transform((data) => { + if (data.resources) { + for (const resource of data.resources) { + if (resource.id === 'disk' && resource.total) { + resource.type = 'disk' + } + } + } + return data + }) export const C2DDockerConfigSchema = z.array( z.object({ From cab0f9a59fc72599b1145543ea811a50e25e53bf Mon Sep 17 00:00:00 2001 From: denisiuriet Date: Mon, 30 Mar 2026 23:27:03 +0300 Subject: [PATCH 20/23] fix test issue --- src/utils/config/schemas.ts | 65 ++++++++++++++++++++----------------- 1 file changed, 35 insertions(+), 30 deletions(-) diff --git a/src/utils/config/schemas.ts b/src/utils/config/schemas.ts index 8d98b59ae..5ff0315fd 100644 --- a/src/utils/config/schemas.ts +++ b/src/utils/config/schemas.ts @@ -191,38 +191,43 @@ export const C2DEnvironmentConfigSchema = z .refine((data) => data.storageExpiry >= data.maxJobDuration, { message: '"storageExpiry" should be greater than "maxJobDuration"' }) - .refine( - (data) => { - if (!data.resources) return false - return data.resources.some((r) => r.id === 'disk' && r.total) - }, - { message: 'There is no "disk" resource configured. This is mandatory' } - ) - .transform((data) => { - if (data.resources) { - for (const resource of data.resources) { - if (resource.id === 'disk' && resource.total) { - resource.type = 'disk' - } - } - } - return data - }) export const C2DDockerConfigSchema = z.array( - z.object({ - socketPath: z.string().optional(), - protocol: z.string().optional(), - host: z.string().optional(), - port: z.number().optional(), - caPath: z.string().optional(), - certPath: z.string().optional(), - keyPath: z.string().optional(), - imageRetentionDays: z.number().int().min(1).optional().default(7), - imageCleanupInterval: z.number().int().min(3600).optional().default(86400), - paymentClaimInterval: z.number().int().optional(), - environments: z.array(C2DEnvironmentConfigSchema).min(1) - }) + z + .object({ + socketPath: z.string().optional(), + protocol: z.string().optional(), + host: z.string().optional(), + port: z.number().optional(), + caPath: z.string().optional(), + certPath: z.string().optional(), + keyPath: z.string().optional(), + imageRetentionDays: z.number().int().min(1).optional().default(7), + imageCleanupInterval: z.number().int().min(3600).optional().default(86400), + paymentClaimInterval: z.number().int().optional(), + environments: z.array(C2DEnvironmentConfigSchema).min(1) + }) + .refine( + (data) => { + return data.environments.every((env) => { + if (!env.resources) return false + return env.resources.some((r) => r.id === 'disk' && r.total) + }) + }, + { message: 'There is no "disk" resource configured. This is mandatory' } + ) + .transform((data) => { + for (const env of data.environments) { + if (env.resources) { + for (const resource of env.resources) { + if (resource.id === 'disk' && resource.total) { + resource.type = 'disk' + } + } + } + } + return data + }) ) export const C2DClusterInfoSchema = z.object({ From 87cb58be0e553c704be2f1635604429fc32fcb83 Mon Sep 17 00:00:00 2001 From: denisiuriet Date: Mon, 30 Mar 2026 23:46:11 +0300 Subject: [PATCH 21/23] attempt to fix tests issue --- src/components/c2d/compute_engine_docker.ts | 1 + src/utils/config/schemas.ts | 55 ++++++++------------- 2 files changed, 21 insertions(+), 35 deletions(-) diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index cbd04132d..fd31e3f5a 100755 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -223,6 +223,7 @@ export class C2DEngineDocker extends C2DEngine { } if (res.id !== 'cpu' && res.id !== 'ram') { + if (res.id === 'disk' && res.total) res.type = 'disk' if (!res.max) res.max = res.total if (!res.min) res.min = 0 envResources.push(res) diff --git a/src/utils/config/schemas.ts b/src/utils/config/schemas.ts index 5ff0315fd..a71851549 100644 --- a/src/utils/config/schemas.ts +++ b/src/utils/config/schemas.ts @@ -191,43 +191,28 @@ export const C2DEnvironmentConfigSchema = z .refine((data) => data.storageExpiry >= data.maxJobDuration, { message: '"storageExpiry" should be greater than "maxJobDuration"' }) + .refine( + (data) => { + if (!data.resources) return false + return data.resources.some((r) => r.id === 'disk' && r.total) + }, + { message: 'There is no "disk" resource configured. This is mandatory' } + ) export const C2DDockerConfigSchema = z.array( - z - .object({ - socketPath: z.string().optional(), - protocol: z.string().optional(), - host: z.string().optional(), - port: z.number().optional(), - caPath: z.string().optional(), - certPath: z.string().optional(), - keyPath: z.string().optional(), - imageRetentionDays: z.number().int().min(1).optional().default(7), - imageCleanupInterval: z.number().int().min(3600).optional().default(86400), - paymentClaimInterval: z.number().int().optional(), - environments: z.array(C2DEnvironmentConfigSchema).min(1) - }) - .refine( - (data) => { - return data.environments.every((env) => { - if (!env.resources) return false - return env.resources.some((r) => r.id === 'disk' && r.total) - }) - }, - { message: 'There is no "disk" resource configured. This is mandatory' } - ) - .transform((data) => { - for (const env of data.environments) { - if (env.resources) { - for (const resource of env.resources) { - if (resource.id === 'disk' && resource.total) { - resource.type = 'disk' - } - } - } - } - return data - }) + z.object({ + socketPath: z.string().optional(), + protocol: z.string().optional(), + host: z.string().optional(), + port: z.number().optional(), + caPath: z.string().optional(), + certPath: z.string().optional(), + keyPath: z.string().optional(), + imageRetentionDays: z.number().int().min(1).optional().default(7), + imageCleanupInterval: z.number().int().min(3600).optional().default(86400), + paymentClaimInterval: z.number().int().optional(), + environments: z.array(C2DEnvironmentConfigSchema).min(1) + }) ) export const C2DClusterInfoSchema = z.object({ From c3694e15194891c06df6c0f0010b64294cf6c386 Mon Sep 17 00:00:00 2001 From: denisiuriet Date: Tue, 31 Mar 2026 00:03:09 +0300 Subject: [PATCH 22/23] fix config --- config.json | 12 ++++++------ src/test/config.json | 8 ++++---- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/config.json b/config.json index 886c4f2da..fcb19438a 100644 --- a/config.json +++ b/config.json @@ -96,17 +96,17 @@ "dockerComputeEnvironments": [ { "socketPath": "/var/run/docker.sock", - "resources": [ - { - "id": "disk", - "total": 1 - } - ], "environments": [ { "storageExpiry": 604800, "maxJobDuration": 3600, "minJobDuration": 60, + "resources": [ + { + "id": "disk", + "total": 1 + } + ], "access": { "addresses": [], "accessLists": [] diff --git a/src/test/config.json b/src/test/config.json index 90b32b9f8..31dd6b7cf 100644 --- a/src/test/config.json +++ b/src/test/config.json @@ -118,8 +118,8 @@ }, { "id": "disk", - "total": 1000000000, - "max": 1000000000, + "total": 100, + "max": 100, "min": 0, "type": "disk" } @@ -177,11 +177,11 @@ }, { "id": "ram", - "max": 1000000000 + "max": 1 }, { "id": "disk", - "max": 1000000000 + "max": 1 } ] } From ed25ad5bfad1b4daa31be26a9851a6447a402037 Mon Sep 17 00:00:00 2001 From: denisiuriet Date: Tue, 31 Mar 2026 09:39:33 +0300 Subject: [PATCH 23/23] add the physical value of disk --- src/components/c2d/compute_engine_docker.ts | 25 +++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index fd31e3f5a..2c2381acf 100755 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -41,6 +41,7 @@ import { writeFileSync, appendFileSync, statSync, + statfsSync, createReadStream } from 'fs' import { pipeline } from 'node:stream/promises' @@ -171,6 +172,13 @@ export class C2DEngineDocker extends C2DEngine { this.physicalLimits.set('cpu', sysinfo.NCPU) this.physicalLimits.set('ram', Math.floor(sysinfo.MemTotal / 1024 / 1024 / 1024)) + try { + const diskStats = statfsSync(this.getC2DConfig().tempFolder) + const diskGB = Math.floor((diskStats.bsize * diskStats.blocks) / 1024 / 1024 / 1024) + this.physicalLimits.set('disk', diskGB) + } catch (e) { + CORE_LOGGER.warn('Could not detect physical disk size: ' + e.message) + } // Determine supported chains const supportedChains: number[] = [] @@ -207,6 +215,14 @@ export class C2DEngineDocker extends C2DEngine { max: Math.floor(sysinfo.MemTotal / 1024 / 1024 / 1024), min: 1 } + const physicalDiskGB = this.physicalLimits.get('disk') || 0 + const diskResources = { + id: 'disk', + type: 'disk', + total: physicalDiskGB, + max: physicalDiskGB, + min: 0 + } if (envDef.resources) { for (const res of envDef.resources) { @@ -221,9 +237,13 @@ export class C2DEngineDocker extends C2DEngine { if (res.max) ramResources.max = res.max if (res.min) ramResources.min = res.min } + if (res.id === 'disk') { + if (res.total) diskResources.total = res.total + if (res.max) diskResources.max = res.max + if (res.min !== undefined) diskResources.min = res.min + } - if (res.id !== 'cpu' && res.id !== 'ram') { - if (res.id === 'disk' && res.total) res.type = 'disk' + if (res.id !== 'cpu' && res.id !== 'ram' && res.id !== 'disk') { if (!res.max) res.max = res.total if (!res.min) res.min = 0 envResources.push(res) @@ -232,6 +252,7 @@ export class C2DEngineDocker extends C2DEngine { } envResources.push(cpuResources) envResources.push(ramResources) + envResources.push(diskResources) const env: ComputeEnvironment = { id: '',