diff --git a/.env.example b/.env.example index 473bb672f..5b151e342 100644 --- a/.env.example +++ b/.env.example @@ -65,8 +65,13 @@ export P2P_BOOTSTRAP_NODES= export P2P_FILTER_ANNOUNCED_ADDRESSES= ## compute -# Example with cross-resource constraints (constraints are optional and backwards-compatible): -# export DOCKER_COMPUTE_ENVIRONMENTS='[{"socketPath":"/var/run/docker.sock","storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"fees":{"1":[{"feeToken":"0x123","prices":[{"id":"cpu","price":1},{"id":"ram","price":0.1},{"id":"disk","price":0.01}]}]},"resources":[{"id":"cpu","total":8,"max":8,"min":1,"constraints":[{"id":"ram","min":1,"max":3},{"id":"disk","min":10,"max":100}]},{"id":"ram","total":32,"max":32,"min":1},{"id":"disk","total":500,"max":500,"min":10},{"id":"gpu","total":4,"max":4,"min":0,"constraints":[{"id":"ram","min":8,"max":32},{"id":"cpu","min":2,"max":4}]}]}]' +# Each environment defines its own resources (CPU, RAM, disk, GPUs) with full configuration. +# CPU, RAM, and disk are per-env exclusive: inUse tracked only within the environment where the job runs. +# A global check ensures the aggregate usage across all environments does not exceed physical capacity. +# GPUs are shared-exclusive: if a job on envA uses gpu0, it shows as in-use on envB too. +# CPU cores are automatically partitioned across environments based on each env's cpu.total. +# CPU and RAM defaults are auto-detected from the system when not configured. +# export DOCKER_COMPUTE_ENVIRONMENTS='[{"socketPath":"/var/run/docker.sock","environments":[{"id":"envA","storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"cpu","total":4,"max":4,"min":1,"type":"cpu"},{"id":"ram","total":16,"max":16,"min":1,"type":"ram"},{"id":"disk","total":500,"max":500,"min":10,"type":"disk"},{"id":"gpu0","total":1,"max":1,"min":0,"type":"gpu","init":{"deviceRequests":{"Driver":"nvidia","DeviceIDs":["0"],"Capabilities":[["gpu"]]}}}],"fees":{"1":[{"feeToken":"0x123","prices":[{"id":"cpu","price":1},{"id":"ram","price":0.1},{"id":"disk","price":0.01},{"id":"gpu0","price":5}]}]}}]}]' export DOCKER_COMPUTE_ENVIRONMENTS= diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6f1fa84bd..400f17c81 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -290,7 +290,7 @@ jobs: DB_PASSWORD: 'changeme' MAX_REQ_PER_MINUTE: 320 MAX_CONNECTIONS_PER_MINUTE: 320 - DOCKER_COMPUTE_ENVIRONMENTS: '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration": 60,"fees":{"8996":[{"prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"minJobDuration": 10,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]' + DOCKER_COMPUTE_ENVIRONMENTS: '[{"socketPath":"/var/run/docker.sock","environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"cpu","total":4,"max":4,"min":1,"type":"cpu"},{"id":"ram","total":10,"max":10,"min":1,"type":"ram"},{"id":"disk","total":10,"max":10,"min":0,"type":"disk"}],"fees":{"8996":[{"prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]}]' DOCKER_REGISTRY_AUTHS: ${{ env.DOCKER_REGISTRY_AUTHS }} - name: Check Ocean Node is running run: | diff --git a/config.json b/config.json index 2fa5f640b..fcb19438a 100644 --- a/config.json +++ b/config.json @@ -96,54 +96,53 @@ "dockerComputeEnvironments": [ { "socketPath": "/var/run/docker.sock", - "resources": [ + "environments": [ { - "id": "disk", - "total": 1 - } - ], - "storageExpiry": 604800, - "maxJobDuration": 3600, - "minJobDuration": 60, - "access": { - "addresses": [], - "accessLists": [] - }, - "fees": { - "8996": [ - { - "prices": [ + "storageExpiry": 604800, + "maxJobDuration": 3600, + "minJobDuration": 60, + "resources": [ + { + "id": "disk", + "total": 1 + } + ], + "access": { + "addresses": [], + "accessLists": [] + }, + "fees": { + "8996": [ { - "id": "cpu", - "price": 1 + "prices": [ + { + "id": "cpu", + "price": 1 + } + ] } ] - } - ] - }, - "free": { - "maxJobDuration": 3600, - "minJobDuration": 60, - "maxJobs": 3, - "access": { - "addresses": [], - "accessLists": [] - }, - "resources": [ - { - "id": "cpu", - "max": 1 - }, - { - "id": "ram", - "max": 1 }, - { - "id": "disk", - "max": 1 + "free": { + "maxJobDuration": 3600, + "maxJobs": 3, + "resources": [ + { + "id": "cpu", + "max": 1 + }, + { + "id": "ram", + "max": 1 + }, + { + "id": "disk", + "max": 1 + } + ] } - ] - } + } + ] } ] } diff --git a/src/@types/C2D/C2D.ts b/src/@types/C2D/C2D.ts index 1252e7c99..ef5a94a3d 100644 --- a/src/@types/C2D/C2D.ts +++ b/src/@types/C2D/C2D.ts @@ -140,6 +140,19 @@ export interface ComputeEnvironment extends ComputeEnvironmentBaseConfig { runMaxWaitTimeFree: number } +export interface C2DEnvironmentConfig { + id?: string + description?: string + storageExpiry?: number + minJobDuration?: number + maxJobDuration?: number + maxJobs?: number + fees?: ComputeEnvFeesStructure + access?: ComputeAccessList + free?: ComputeEnvironmentFreeOptions + resources?: ComputeResource[] +} + export interface C2DDockerConfig { socketPath: string protocol: string @@ -148,19 +161,12 @@ export interface C2DDockerConfig { caPath: string certPath: string keyPath: string - storageExpiry?: number - maxJobDuration?: number - minJobDuration?: number - maxJobs?: number - fees: ComputeEnvFeesStructure - resources?: ComputeResource[] // optional, owner can overwrite - free?: ComputeEnvironmentFreeOptions - access: ComputeAccessList imageRetentionDays?: number // Default: 7 days imageCleanupInterval?: number // Default: 86400 seconds (24 hours) paymentClaimInterval?: number // Default: 3600 seconds (1 hours) scanImages?: boolean scanImageDBUpdateInterval?: number // Default: 12 hours + environments: C2DEnvironmentConfig[] } export type ComputeResultType = diff --git a/src/components/c2d/compute_engine_base.ts b/src/components/c2d/compute_engine_base.ts index 37f1067c0..96708ee45 100644 --- a/src/components/c2d/compute_engine_base.ts +++ b/src/components/c2d/compute_engine_base.ts @@ -316,6 +316,9 @@ export abstract class C2DEngine { } catch (e) { CORE_LOGGER.error('Failed to get running jobs:' + e.message) } + + const envResourceMap = new Map((env.resources || []).map((r) => [r.id, r])) + let totalJobs = 0 let totalFreeJobs = 0 let queuedJobs = 0 @@ -324,9 +327,13 @@ export abstract class C2DEngine { let maxWaitTimeFree = 0 let maxRunningTime = 0 let maxRunningTimeFree = 0 + for (const job of jobs) { - if (job.environment === env.id) { - if (job.queueMaxWaitTime === 0) { + const isThisEnv = job.environment === env.id + const isRunning = job.queueMaxWaitTime === 0 + + if (isThisEnv) { + if (isRunning) { const timeElapsed = job.buildStartTimestamp ? new Date().getTime() / 1000 - Number.parseFloat(job?.buildStartTimestamp) : new Date().getTime() / 1000 - Number.parseFloat(job?.algoStartTimestamp) @@ -336,8 +343,24 @@ export abstract class C2DEngine { totalFreeJobs++ maxRunningTimeFree += job.maxJobDuration - timeElapsed } + } else { + queuedJobs++ + maxWaitTime += job.maxJobDuration + if (job.isFree) { + queuedFreeJobs++ + maxWaitTimeFree += job.maxJobDuration + } + } + } - for (const resource of job.resources) { + if (isRunning) { + for (const resource of job.resources) { + const envRes = envResourceMap.get(resource.id) + if (envRes) { + // GPUs are shared-exclusive: inUse tracked globally across all envs + // Everything else (cpu, ram, disk) is per-env exclusive + const isSharedExclusive = envRes.type === 'gpu' + if (!isSharedExclusive && !isThisEnv) continue if (!(resource.id in usedResources)) usedResources[resource.id] = 0 usedResources[resource.id] += resource.amount if (job.isFree) { @@ -345,14 +368,6 @@ export abstract class C2DEngine { usedFreeResources[resource.id] += resource.amount } } - } else { - // queued job - queuedJobs++ - maxWaitTime += job.maxJobDuration - if (job.isFree) { - queuedFreeJobs++ - maxWaitTimeFree += job.maxJobDuration - } } } } @@ -370,12 +385,41 @@ export abstract class C2DEngine { } } + protected physicalLimits: Map = new Map() + + private checkGlobalResourceAvailability( + allEnvironments: ComputeEnvironment[], + resourceId: string, + amount: number + ) { + let globalUsed = 0 + let globalTotal = 0 + for (const e of allEnvironments) { + const res = this.getResource(e.resources, resourceId) + if (res) { + globalTotal += res.total || 0 + globalUsed += res.inUse || 0 + } + } + const physicalLimit = this.physicalLimits.get(resourceId) + if (physicalLimit !== undefined && globalTotal > physicalLimit) { + globalTotal = physicalLimit + } + const globalRemainder = globalTotal - globalUsed + if (globalRemainder < amount) { + throw new Error( + `Not enough available ${resourceId} globally (remaining: ${globalRemainder}, requested: ${amount})` + ) + } + } + // overridden by each engine if required // eslint-disable-next-line require-await public async checkIfResourcesAreAvailable( resourcesRequest: ComputeResourceRequest[], env: ComputeEnvironment, - isFree: boolean + isFree: boolean, + allEnvironments?: ComputeEnvironment[] ) { // Filter out resources with amount 0 as they're not actually being requested const activeResources = resourcesRequest.filter((r) => r.amount > 0) @@ -385,6 +429,13 @@ export abstract class C2DEngine { if (!envResource) throw new Error(`No such resource ${request.id}`) if (envResource.total - envResource.inUse < request.amount) throw new Error(`Not enough available ${request.id}`) + + // Global check for non-GPU resources (cpu, ram, disk are per-env exclusive) + // GPUs are shared-exclusive so their inUse already reflects global usage + if (allEnvironments && envResource.type !== 'gpu') { + this.checkGlobalResourceAvailability(allEnvironments, request.id, request.amount) + } + if (isFree) { if (!env.free) throw new Error(`No free resources`) envResource = this.getResource(env.free?.resources, request.id) diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index ac7ed3406..c5d37b8ce 100755 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -20,7 +20,9 @@ import type { RunningPlatform, ComputeEnvFeesStructure, ComputeResourceRequest, - ComputeEnvFees + ComputeEnvFees, + ComputeResource, + C2DEnvironmentConfig } from '../../@types/C2D/C2D.js' import { getConfiguration } from '../../utils/config.js' import { C2DEngine } from './compute_engine_base.js' @@ -40,6 +42,7 @@ import { writeFileSync, appendFileSync, statSync, + statfsSync, createReadStream } from 'fs' import { pipeline } from 'node:stream/promises' @@ -77,19 +80,15 @@ export class C2DEngineDocker extends C2DEngine { private scanImageDBUpdateInterval: number private trivyCachePath: string private cpuAllocations: Map = new Map() - private envCpuCores: number[] = [] - private cpuOffset: number - + private envCpuCoresMap: Map = new Map() public constructor( clusterConfig: C2DClusterInfo, db: C2DDatabase, escrow: Escrow, keyManager: KeyManager, - dockerRegistryAuths: dockerRegistrysAuth, - cpuOffset: number = 0 + dockerRegistryAuths: dockerRegistrysAuth ) { super(clusterConfig, db, escrow, keyManager, dockerRegistryAuths) - this.cpuOffset = cpuOffset this.docker = null if (clusterConfig.connection.socketPath) { @@ -139,12 +138,48 @@ export class C2DEngineDocker extends C2DEngine { // envs are build on start function } + private processFeesForEnvironment( + rawFees: ComputeEnvFeesStructure | undefined, + supportedChains: number[] + ): ComputeEnvFeesStructure | null { + if (!rawFees || Object.keys(rawFees).length === 0) return null + let fees: ComputeEnvFeesStructure = null + for (const feeChain of Object.keys(rawFees)) { + if (!supportedChains.includes(parseInt(feeChain))) continue + if (fees === null) fees = {} + if (!(feeChain in fees)) fees[feeChain] = [] + const tmpFees: ComputeEnvFees[] = [] + for (const feeEntry of rawFees[feeChain]) { + if (!feeEntry.prices || feeEntry.prices.length === 0) { + CORE_LOGGER.error( + `Unable to find prices for fee ${JSON.stringify(feeEntry)} on chain ${feeChain}` + ) + continue + } + if (!feeEntry.feeToken) { + const tokenAddress = getOceanTokenAddressForChain(parseInt(feeChain)) + if (tokenAddress) { + feeEntry.feeToken = tokenAddress + tmpFees.push(feeEntry) + } else { + CORE_LOGGER.error( + `Unable to find Ocean token address for chain ${feeChain} and no custom token provided` + ) + } + } else { + tmpFees.push(feeEntry) + } + } + fees[feeChain] = tmpFees + } + return fees + } + public getStoragePath(): string { return this.getC2DConfig().tempFolder + this.getC2DConfig().hash } public override async start() { - // let's build the env. Swarm and k8 will build multiple envs, based on arhitecture const config = await getConfiguration() const envConfig = await this.getC2DConfig().connection let sysinfo = null @@ -155,186 +190,152 @@ export class C2DEngineDocker extends C2DEngine { // since we cannot connect to docker, we cannot start the engine -> no envs return } - let fees: ComputeEnvFeesStructure = null + + this.physicalLimits.set('cpu', sysinfo.NCPU) + this.physicalLimits.set('ram', Math.floor(sysinfo.MemTotal / 1024 / 1024 / 1024)) + try { + const diskStats = statfsSync(this.getC2DConfig().tempFolder) + const diskGB = Math.floor((diskStats.bsize * diskStats.blocks) / 1024 / 1024 / 1024) + this.physicalLimits.set('disk', diskGB) + } catch (e) { + CORE_LOGGER.warn('Could not detect physical disk size: ' + e.message) + } + + // Determine supported chains const supportedChains: number[] = [] if (config.supportedNetworks) { for (const chain of Object.keys(config.supportedNetworks)) { supportedChains.push(parseInt(chain)) } } - if (envConfig.fees && Object.keys(envConfig.fees).length > 0) { - for (const feeChain of Object.keys(envConfig.fees)) { - // for (const feeConfig of envConfig.fees) { - if (supportedChains.includes(parseInt(feeChain))) { - if (fees === null) fees = {} - if (!(feeChain in fees)) fees[feeChain] = [] - const tmpFees: ComputeEnvFees[] = [] - for (let i = 0; i < envConfig.fees[feeChain].length; i++) { - if ( - envConfig.fees[feeChain][i].prices && - envConfig.fees[feeChain][i].prices.length > 0 - ) { - if (!envConfig.fees[feeChain][i].feeToken) { - const tokenAddress = getOceanTokenAddressForChain(parseInt(feeChain)) - if (tokenAddress) { - envConfig.fees[feeChain][i].feeToken = tokenAddress - tmpFees.push(envConfig.fees[feeChain][i]) - } else { - CORE_LOGGER.error( - `Unable to find Ocean token address for chain ${feeChain} and no custom token provided` - ) - } - } else { - tmpFees.push(envConfig.fees[feeChain][i]) - } - } else { - CORE_LOGGER.error( - `Unable to find prices for fee ${JSON.stringify( - envConfig.fees[feeChain][i] - )} on chain ${feeChain}` - ) - } - } - fees[feeChain] = tmpFees - } - } - /* for (const chain of Object.keys(config.supportedNetworks)) { - const chainId = parseInt(chain) - if (task.chainId && task.chainId !== chainId) continue - result[chainId] = await computeEngines.fetchEnvironments(chainId) - } */ + const platform: RunningPlatform = { + architecture: sysinfo.Architecture, + os: sysinfo.OSType } - this.envs.push({ - id: '', // this.getC2DConfig().hash + '-' + create256Hash(JSON.stringify(this.envs[i])), - runningJobs: 0, - consumerAddress: this.getKeyManager().getEthAddress(), - platform: { - architecture: sysinfo.Architecture, - os: sysinfo.OSType - }, - access: { - addresses: [], - accessLists: null - }, - fees, - queuedJobs: 0, - queuedFreeJobs: 0, - queMaxWaitTime: 0, - queMaxWaitTimeFree: 0, - runMaxWaitTime: 0, - runMaxWaitTimeFree: 0 - }) - if (`access` in envConfig) this.envs[0].access = envConfig.access - - if (`storageExpiry` in envConfig) this.envs[0].storageExpiry = envConfig.storageExpiry - if (`minJobDuration` in envConfig) - this.envs[0].minJobDuration = envConfig.minJobDuration - if (`maxJobDuration` in envConfig) - this.envs[0].maxJobDuration = envConfig.maxJobDuration - if (`maxJobs` in envConfig) this.envs[0].maxJobs = envConfig.maxJobs - // let's add resources - this.envs[0].resources = [] - const cpuResources = { - id: 'cpu', - type: 'cpu', - total: sysinfo.NCPU, - max: sysinfo.NCPU, - min: 1, - description: os.cpus()[0].model - } - const ramResources = { - id: 'ram', - type: 'ram', - total: Math.floor(sysinfo.MemTotal / 1024 / 1024 / 1024), - max: Math.floor(sysinfo.MemTotal / 1024 / 1024 / 1024), - min: 1 - } - - if (envConfig.resources) { - for (const res of envConfig.resources) { - // allow user to add other resources - if (res.id === 'cpu') { - if (res.total) cpuResources.total = res.total - if (res.max) cpuResources.max = res.max - if (res.min) cpuResources.min = res.min - } - if (res.id === 'ram') { - if (res.total) ramResources.total = res.total - if (res.max) ramResources.max = res.max - if (res.min) ramResources.min = res.min - } - - if (res.id !== 'cpu' && res.id !== 'ram') { - if (!res.max) res.max = res.total - if (!res.min) res.min = 0 - this.envs[0].resources.push(res) - } - } - } - this.envs[0].resources.push(cpuResources) - this.envs[0].resources.push(ramResources) - // Build the list of physical CPU core indices for this environment - this.envCpuCores = Array.from( - { length: cpuResources.total }, - (_, i) => this.cpuOffset + i - ) - CORE_LOGGER.info( - `CPU affinity: environment cores ${this.envCpuCores[0]}-${this.envCpuCores[this.envCpuCores.length - 1]} (offset=${this.cpuOffset}, total=${cpuResources.total})` - ) - /* TODO - get namedresources & discreete one - if (sysinfo.GenericResources) { - for (const [key, value] of Object.entries(sysinfo.GenericResources)) { - for (const [type, val] of Object.entries(value)) { - // for (const resType in sysinfo.GenericResources) { - if (type === 'NamedResourceSpec') { - // if we have it, ignore it - const resourceId = val.Value - const resourceType = val.Kind - let found = false - for (const res of this.envs[0].resources) { - if (res.id === resourceId) { - found = true - break - } - } - if (!found) { - this.envs[0].resources.push({ - id: resourceId, - kind: resourceType, - total: 1, - max: 1, - min: 0 - }) - } + const consumerAddress = this.getKeyManager().getEthAddress() + + for (let envIdx = 0; envIdx < envConfig.environments.length; envIdx++) { + const envDef: C2DEnvironmentConfig = envConfig.environments[envIdx] + + const fees = this.processFeesForEnvironment(envDef.fees, supportedChains) + + const envResources: ComputeResource[] = [] + const cpuResources = { + id: 'cpu', + type: 'cpu', + total: sysinfo.NCPU, + max: sysinfo.NCPU, + min: 1, + description: os.cpus()[0].model + } + const ramResources = { + id: 'ram', + type: 'ram', + total: Math.floor(sysinfo.MemTotal / 1024 / 1024 / 1024), + max: Math.floor(sysinfo.MemTotal / 1024 / 1024 / 1024), + min: 1 + } + const physicalDiskGB = this.physicalLimits.get('disk') || 0 + const diskResources = { + id: 'disk', + type: 'disk', + total: physicalDiskGB, + max: physicalDiskGB, + min: 0 + } + + if (envDef.resources) { + for (const res of envDef.resources) { + // allow user to add other resources + if (res.id === 'cpu') { + if (res.total) cpuResources.total = res.total + if (res.max) cpuResources.max = res.max + if (res.min) cpuResources.min = res.min + } + if (res.id === 'ram') { + if (res.total) ramResources.total = res.total + if (res.max) ramResources.max = res.max + if (res.min) ramResources.min = res.min + } + if (res.id === 'disk') { + if (res.total) diskResources.total = res.total + if (res.max) diskResources.max = res.max + if (res.min !== undefined) diskResources.min = res.min + } + + if (res.id !== 'cpu' && res.id !== 'ram' && res.id !== 'disk') { + if (!res.max) res.max = res.total + if (!res.min) res.min = 0 + envResources.push(res) } } } + envResources.push(cpuResources) + envResources.push(ramResources) + envResources.push(diskResources) + + const env: ComputeEnvironment = { + id: '', + runningJobs: 0, + consumerAddress, + platform, + access: envDef.access || { addresses: [], accessLists: null }, + fees, + resources: envResources, + queuedJobs: 0, + queuedFreeJobs: 0, + queMaxWaitTime: 0, + queMaxWaitTimeFree: 0, + runMaxWaitTime: 0, + runMaxWaitTimeFree: 0 + } + + if (envDef.storageExpiry !== undefined) env.storageExpiry = envDef.storageExpiry + if (envDef.minJobDuration !== undefined) env.minJobDuration = envDef.minJobDuration + if (envDef.maxJobDuration !== undefined) env.maxJobDuration = envDef.maxJobDuration + if (envDef.maxJobs !== undefined) env.maxJobs = envDef.maxJobs + if (envDef.description !== undefined) env.description = envDef.description + + // Free tier config for this environment + if (envDef.free) { + env.free = { + access: envDef.free.access || { addresses: [], accessLists: null } + } + if (envDef.free.storageExpiry !== undefined) + env.free.storageExpiry = envDef.free.storageExpiry + if (envDef.free.minJobDuration !== undefined) + env.free.minJobDuration = envDef.free.minJobDuration + if (envDef.free.maxJobDuration !== undefined) + env.free.maxJobDuration = envDef.free.maxJobDuration + if (envDef.free.maxJobs !== undefined) env.free.maxJobs = envDef.free.maxJobs + if (envDef.free.resources) env.free.resources = envDef.free.resources + } + + const envIdSuffix = envDef.id || String(envIdx) + env.id = + this.getC2DConfig().hash + + '-' + + create256Hash(JSON.stringify(env.fees) + envIdSuffix) + + this.envs.push(env) + CORE_LOGGER.info( + `Engine ${this.getC2DConfig().hash}: created environment ${env.id} (index=${envIdx}, resources=${envResources.map((r) => r.id).join(',')})` + ) } - */ - // limits for free env - if ('free' in envConfig) { - this.envs[0].free = { - access: { - addresses: [], - accessLists: null - } - } - if (`access` in envConfig.free) this.envs[0].free.access = envConfig.free.access - if (`storageExpiry` in envConfig.free) - this.envs[0].free.storageExpiry = envConfig.free.storageExpiry - if (`minJobDuration` in envConfig.free) - this.envs[0].free.minJobDuration = envConfig.free.minJobDuration - if (`maxJobDuration` in envConfig.free) - this.envs[0].free.maxJobDuration = envConfig.free.maxJobDuration - if (`maxJobs` in envConfig.free) this.envs[0].free.maxJobs = envConfig.free.maxJobs - if ('resources' in envConfig.free) { - // TO DO - check if resource is also listed in this.envs[0].resources, if not, ignore it - this.envs[0].free.resources = envConfig.free.resources + + let cpuOffset = 0 + for (const env of this.envs) { + const cpuRes = this.getResource(env.resources, 'cpu') + if (cpuRes && cpuRes.total > 0) { + const cores = Array.from({ length: cpuRes.total }, (_, i) => cpuOffset + i) + this.envCpuCoresMap.set(env.id, cores) + CORE_LOGGER.info( + `CPU affinity: environment ${env.id} cores ${cores[0]}-${cores[cores.length - 1]} (offset=${cpuOffset}, total=${cpuRes.total})` + ) + cpuOffset += cpuRes.total } } - this.envs[0].id = - this.getC2DConfig().hash + '-' + create256Hash(JSON.stringify(this.envs[0].fees)) // Rebuild CPU allocations from running containers (handles node restart) await this.rebuildCpuAllocations() @@ -1632,12 +1633,11 @@ export class C2DEngineDocker extends C2DEngine { } // check if resources are available now try { - const env = await this.getComputeEnvironment( - job.payment && job.payment.chainId ? job.payment.chainId : null, - job.environment, - null - ) - await this.checkIfResourcesAreAvailable(job.resources, env, job.isFree) + const chainId = job.payment && job.payment.chainId ? job.payment.chainId : null + const allEnvs = await this.getComputeEnvironments(chainId) + const env = allEnvs.find((e) => e.id === job.environment) + if (!env) throw new Error(`Environment ${job.environment} not found`) + await this.checkIfResourcesAreAvailable(job.resources, env, job.isFree, allEnvs) } catch (err) { // resources are still not available return @@ -1693,8 +1693,9 @@ export class C2DEngineDocker extends C2DEngine { } // create the volume & create container // TO DO C2D: Choose driver & size - // get env info - const envResource = this.envs[0].resources + // get environment-specific resources for Docker device/hardware configuration + const env = this.envs.find((e) => e.id === job.environment) + const envResource = env?.resources || [] const volume: VolumeCreateOptions = { Name: job.jobId + '-volume' } @@ -1749,7 +1750,7 @@ export class C2DEngineDocker extends C2DEngine { hostConfig.CpuPeriod = 100000 // 100 miliseconds is usually the default hostConfig.CpuQuota = Math.floor(cpus * hostConfig.CpuPeriod) // Pin the container to specific physical CPU cores - const cpusetStr = this.allocateCpus(job.jobId, cpus) + const cpusetStr = this.allocateCpus(job.jobId, cpus, job.environment) if (cpusetStr) { hostConfig.CpusetCpus = cpusetStr } @@ -2050,8 +2051,9 @@ export class C2DEngineDocker extends C2DEngine { return cores } - private allocateCpus(jobId: string, count: number): string | null { - if (this.envCpuCores.length === 0 || count <= 0) return null + private allocateCpus(jobId: string, count: number, envId: string): string | null { + const envCores = this.envCpuCoresMap.get(envId) + if (!envCores || envCores.length === 0 || count <= 0) return null const existing = this.cpuAllocations.get(jobId) if (existing && existing.length > 0) { const cpusetStr = existing.join(',') @@ -2069,7 +2071,7 @@ export class C2DEngineDocker extends C2DEngine { } const freeCores: number[] = [] - for (const core of this.envCpuCores) { + for (const core of envCores) { if (!usedCores.has(core)) { freeCores.push(core) if (freeCores.length === count) break @@ -2078,7 +2080,7 @@ export class C2DEngineDocker extends C2DEngine { if (freeCores.length < count) { CORE_LOGGER.warn( - `CPU affinity: not enough free cores for job ${jobId} (requested=${count}, available=${freeCores.length}/${this.envCpuCores.length})` + `CPU affinity: not enough free cores for job ${jobId} in env ${envId} (requested=${count}, available=${freeCores.length}/${envCores.length})` ) return null } @@ -2103,7 +2105,7 @@ export class C2DEngineDocker extends C2DEngine { * On startup, inspects running Docker containers to rebuild the CPU allocation map. */ private async rebuildCpuAllocations(): Promise { - if (this.envCpuCores.length === 0) return + if (this.envCpuCoresMap.size === 0) return try { const jobs = await this.db.getRunningJobs(this.getC2DConfig().hash) for (const job of jobs) { diff --git a/src/components/c2d/compute_engines.ts b/src/components/c2d/compute_engines.ts index 6402d1dc7..f2da957a0 100644 --- a/src/components/c2d/compute_engines.ts +++ b/src/components/c2d/compute_engines.ts @@ -25,7 +25,6 @@ export class C2DEngines { } if (config && config.c2dClusters) { this.engines = [] - let cpuOffset = 0 for (const cluster of config.c2dClusters) { if (cluster.type === C2DClusterType.DOCKER) { // do some checks @@ -61,23 +60,9 @@ export class C2DEngines { } } this.engines.push( - new C2DEngineDocker( - cfg, - db, - escrow, - keyManager, - config.dockerRegistrysAuth, - cpuOffset - ) + new C2DEngineDocker(cfg, db, escrow, keyManager, config.dockerRegistrysAuth) ) } - // Advance the CPU offset by this cluster's configured CPU total - if (cluster.connection?.resources) { - const cpuRes = cluster.connection.resources.find((r: any) => r.id === 'cpu') - if (cpuRes?.total) { - cpuOffset += cpuRes.total - } - } } } } diff --git a/src/components/core/compute/startCompute.ts b/src/components/core/compute/startCompute.ts index e43c668fa..311f27401 100644 --- a/src/components/core/compute/startCompute.ts +++ b/src/components/core/compute/startCompute.ts @@ -29,6 +29,7 @@ import { import { EncryptMethod } from '../../../@types/fileObject.js' import { ComputeAccessList, + ComputeEnvironment, ComputeResourceRequestWithPrice } from '../../../@types/C2D/C2D.js' // import { verifyProviderFees } from '../utils/feesHandler.js' @@ -119,8 +120,10 @@ export class PaidComputeStartHandler extends CommonComputeHandler { } } + let allEnvs: ComputeEnvironment[] try { - env = await engine.getComputeEnvironment(null, task.environment) + allEnvs = await engine.getComputeEnvironments() + env = allEnvs.find((e) => e.id === task.environment) if (!env) { return { stream: null, @@ -150,7 +153,7 @@ export class PaidComputeStartHandler extends CommonComputeHandler { } } try { - await engine.checkIfResourcesAreAvailable(task.resources, env, false) + await engine.checkIfResourcesAreAvailable(task.resources, env, false, allEnvs) } catch (e) { if (task.queueMaxWaitTime > 0) { CORE_LOGGER.verbose( @@ -885,7 +888,8 @@ export class FreeComputeStartHandler extends CommonComputeHandler { } } } - const env = await engine.getComputeEnvironment(null, task.environment) + const allFreeEnvs = await engine.getComputeEnvironments() + const env = allFreeEnvs.find((e) => e.id === task.environment) if (!env) { return { stream: null, @@ -931,7 +935,7 @@ export class FreeComputeStartHandler extends CommonComputeHandler { } } try { - await engine.checkIfResourcesAreAvailable(task.resources, env, true) + await engine.checkIfResourcesAreAvailable(task.resources, env, true, allFreeEnvs) } catch (e) { if (task.queueMaxWaitTime > 0) { CORE_LOGGER.verbose( diff --git a/src/test/config.json b/src/test/config.json index 778a1c1be..31dd6b7cf 100644 --- a/src/test/config.json +++ b/src/test/config.json @@ -96,76 +96,97 @@ "dockerComputeEnvironments": [ { "socketPath": "/var/run/docker.sock", - "resources": [ + "environments": [ { - "id": "disk", - "total": 1000000000 - } - ], - "storageExpiry": 604800, - "maxJobDuration": 3600, - "minJobDuration": 60, - "fees": { - "11155111": [ - { - "feeToken": "0x1B083D8584dd3e6Ff37d04a6e7e82b5F622f3985", - "prices": [ + "storageExpiry": 604800, + "maxJobDuration": 3600, + "minJobDuration": 60, + "resources": [ + { + "id": "cpu", + "total": 4, + "max": 4, + "min": 1, + "type": "cpu" + }, + { + "id": "ram", + "total": 10, + "max": 10, + "min": 1, + "type": "ram" + }, + { + "id": "disk", + "total": 100, + "max": 100, + "min": 0, + "type": "disk" + } + ], + "fees": { + "11155111": [ { - "id": "cpu", - "price": 1 - } - ] - }, - { - "feeToken": "0xfff9976782d46cc05630d1f6ebab18b2324d6b14", - "prices": [ + "feeToken": "0x1B083D8584dd3e6Ff37d04a6e7e82b5F622f3985", + "prices": [ + { + "id": "cpu", + "price": 1 + } + ] + }, { - "id": "cpu", - "price": 1 + "feeToken": "0xfff9976782d46cc05630d1f6ebab18b2324d6b14", + "prices": [ + { + "id": "cpu", + "price": 1 + } + ] } - ] - } - ], - "11155420": [ - { - "feeToken": "0xf26c6C93f9f1d725e149d95f8E7B2334a406aD10", - "prices": [ + ], + "11155420": [ { - "id": "cpu", - "price": 1 + "feeToken": "0xf26c6C93f9f1d725e149d95f8E7B2334a406aD10", + "prices": [ + { + "id": "cpu", + "price": 1 + } + ] + }, + { + "feeToken": "0x4200000000000000000000000000000000000006", + "prices": [ + { + "id": "cpu", + "price": 1 + } + ] } ] }, - { - "feeToken": "0x4200000000000000000000000000000000000006", - "prices": [ + "free": { + "maxJobDuration": 3600, + "minJobDuration": 60, + "maxJobs": 3, + "resources": [ { "id": "cpu", - "price": 1 + "max": 1 + }, + { + "id": "ram", + "max": 1 + }, + { + "id": "disk", + "max": 1 } ] } - ] - }, - "free": { - "maxJobDuration": 3600, - "minJobDuration": 60, - "maxJobs": 3, - "resources": [ - { - "id": "cpu", - "max": 1 - }, - { - "id": "ram", - "max": 1000000000 - }, - { - "id": "disk", - "max": 1000000000 - } - ] - } + } + ] } ] } diff --git a/src/test/integration/algorithmsAccess.test.ts b/src/test/integration/algorithmsAccess.test.ts index 95fb54bd9..677f7343c 100644 --- a/src/test/integration/algorithmsAccess.test.ts +++ b/src/test/integration/algorithmsAccess.test.ts @@ -97,11 +97,11 @@ describe('Trusted algorithms Flow', () => { '0xc594c6e5def4bab63ac29eed19a134c130388f74f019bc74b8f4389df2837a58', JSON.stringify(['0xe2DD09d719Da89e5a3D0F2549c7E24566e947260']), `${homedir}/.ocean/ocean-contracts/artifacts/address.json`, - '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"fees":{"' + + '[{"socketPath":"/var/run/docker.sock","environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"cpu","total":4,"max":4,"min":1,"type":"cpu"},{"id":"ram","total":10,"max":10,"min":1,"type":"ram"},{"id":"disk","total":10,"max":10,"min":0,"type":"disk"}],"fees":{"' + DEVELOPMENT_CHAIN_ID + '":[{"feeToken":"' + paymentToken + - '","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60, "minJobDuration":10, "maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]' + '","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"minJobDuration":10,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]}]' ] ) ) diff --git a/src/test/integration/compute.test.ts b/src/test/integration/compute.test.ts index 77ca097dc..75cc72afc 100644 --- a/src/test/integration/compute.test.ts +++ b/src/test/integration/compute.test.ts @@ -183,11 +183,11 @@ describe('Compute', () => { '0xc594c6e5def4bab63ac29eed19a134c130388f74f019bc74b8f4389df2837a58', JSON.stringify(['0xe2DD09d719Da89e5a3D0F2549c7E24566e947260']), `${homedir}/.ocean/ocean-contracts/artifacts/address.json`, - '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"fees":{"' + + '[{"socketPath":"/var/run/docker.sock","environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"cpu","total":4,"max":4,"min":1,"type":"cpu"},{"id":"ram","total":10,"max":10,"min":1,"type":"ram"},{"id":"disk","total":10,"max":10,"min":0,"type":"disk"}],"fees":{"' + DEVELOPMENT_CHAIN_ID + '":[{"feeToken":"' + paymentToken + - '","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"minJobDuration":10,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]' + '","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"minJobDuration":10,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]}]' ] ) ) @@ -2339,7 +2339,7 @@ describe('Compute Access Restrictions', () => { '0xc594c6e5def4bab63ac29eed19a134c130388f74f019bc74b8f4389df2837a58', JSON.stringify(['0xe2DD09d719Da89e5a3D0F2549c7E24566e947260']), `${homedir}/.ocean/ocean-contracts/artifacts/address.json`, - '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"access":{"addresses":["' + + '[{"socketPath":"/var/run/docker.sock","environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"cpu","total":4,"max":4,"min":1,"type":"cpu"},{"id":"ram","total":10,"max":10,"min":1,"type":"ram"},{"id":"disk","total":10,"max":10,"min":0,"type":"disk"}],"access":{"addresses":["' + allowedAddress + '"],"accessLists":[]},"fees":{"' + DEVELOPMENT_CHAIN_ID + @@ -2347,7 +2347,7 @@ describe('Compute Access Restrictions', () => { paymentToken + '","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"minJobDuration":10,"maxJobs":3,"access":{"addresses":["' + allowedAddress + - '"],"accessLists":[]},"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]' + '"],"accessLists":[]},"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]}]' ] ) ) @@ -2503,36 +2503,44 @@ describe('Compute Access Restrictions', () => { JSON.stringify([ { socketPath: '/var/run/docker.sock', - resources: [{ id: 'disk', total: 10 }], - storageExpiry: 604800, - maxJobDuration: 3600, - minJobDuration: 60, - access: { - addresses: [], - accessLists: [{ [DEVELOPMENT_CHAIN_ID]: [accessListAddress] }] - }, - fees: { - [DEVELOPMENT_CHAIN_ID]: [ - { - feeToken: paymentToken, - prices: [{ id: 'cpu', price: 1 }] + environments: [ + { + storageExpiry: 604800, + maxJobDuration: 3600, + minJobDuration: 60, + resources: [ + { id: 'cpu', total: 4, max: 4, min: 1, type: 'cpu' }, + { id: 'ram', total: 10, max: 10, min: 1, type: 'ram' }, + { id: 'disk', total: 10, max: 10, min: 0, type: 'disk' } + ], + access: { + addresses: [], + accessLists: [{ [DEVELOPMENT_CHAIN_ID]: [accessListAddress] }] + }, + fees: { + [DEVELOPMENT_CHAIN_ID]: [ + { + feeToken: paymentToken, + prices: [{ id: 'cpu', price: 1 }] + } + ] + }, + free: { + maxJobDuration: 60, + minJobDuration: 10, + maxJobs: 3, + access: { + addresses: [], + accessLists: [{ [DEVELOPMENT_CHAIN_ID]: [accessListAddress] }] + }, + resources: [ + { id: 'cpu', max: 1 }, + { id: 'ram', max: 1 }, + { id: 'disk', max: 1 } + ] } - ] - }, - free: { - maxJobDuration: 60, - minJobDuration: 10, - maxJobs: 3, - access: { - addresses: [], - accessLists: [{ [DEVELOPMENT_CHAIN_ID]: [accessListAddress] }] - }, - resources: [ - { id: 'cpu', max: 1 }, - { id: 'ram', max: 1 }, - { id: 'disk', max: 1 } - ] - } + } + ] } ]) ] @@ -2660,11 +2668,11 @@ describe('Compute Access Restrictions', () => { JSON.stringify([DEVELOPMENT_CHAIN_ID]), '0xc594c6e5def4bab63ac29eed19a134c130388f74f019bc74b8f4389df2837a58', `${homedir}/.ocean/ocean-contracts/artifacts/address.json`, - '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"paymentClaimInterval":60,"fees":{"' + + '[{"socketPath":"/var/run/docker.sock","paymentClaimInterval":60,"environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"cpu","total":4,"max":4,"min":1,"type":"cpu"},{"id":"ram","total":10,"max":10,"min":1,"type":"ram"},{"id":"disk","total":10,"max":10,"min":0,"type":"disk"}],"fees":{"' + DEVELOPMENT_CHAIN_ID + '":[{"feeToken":"' + paymentToken + - '","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"minJobDuration":10,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]' + '","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"minJobDuration":10,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]}]' ] ) ) diff --git a/src/test/integration/credentials.test.ts b/src/test/integration/credentials.test.ts index db9945ea6..6bd7d83ba 100644 --- a/src/test/integration/credentials.test.ts +++ b/src/test/integration/credentials.test.ts @@ -126,11 +126,11 @@ describe('[Credentials Flow] - Should run a complete node flow.', () => { JSON.stringify([await publisherAccount.getAddress()]), JSON.stringify([await publisherAccount.getAddress()]), `${homedir}/.ocean/ocean-contracts/artifacts/address.json`, - '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"fees":{"' + + '[{"socketPath":"/var/run/docker.sock","environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"cpu","total":4,"max":4,"min":1,"type":"cpu"},{"id":"ram","total":10,"max":10,"min":1,"type":"ram"},{"id":"disk","total":10,"max":10,"min":0,"type":"disk"}],"fees":{"' + DEVELOPMENT_CHAIN_ID + '":[{"feeToken":"' + paymentToken + - '","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"minJobDuration":10,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]' + '","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"minJobDuration":10,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]}]' ] ) ) diff --git a/src/test/integration/imageCleanup.test.ts b/src/test/integration/imageCleanup.test.ts index ab3b5304c..31f60d561 100644 --- a/src/test/integration/imageCleanup.test.ts +++ b/src/test/integration/imageCleanup.test.ts @@ -33,24 +33,32 @@ describe('Docker Image Cleanup Integration Tests', () => { JSON.stringify([ { socketPath: '/var/run/docker.sock', - resources: [{ id: 'disk', total: 10 }], - storageExpiry: 604800, - maxJobDuration: 3600, - minJobDuration: 60, - fees: { - '1': [ - { - feeToken: '0x123', - prices: [{ id: 'cpu', price: 1 }] - } - ] - }, - access: { - addresses: [], - accessLists: null - }, imageRetentionDays: 7, - imageCleanupInterval: 60 // 1 minute for testing + imageCleanupInterval: 60, // 1 minute for testing + environments: [ + { + storageExpiry: 604800, + maxJobDuration: 3600, + minJobDuration: 60, + resources: [ + { id: 'cpu', total: 4, max: 4, min: 1, type: 'cpu' }, + { id: 'ram', total: 10, max: 10, min: 1, type: 'ram' }, + { id: 'disk', total: 10, max: 10, min: 0, type: 'disk' } + ], + fees: { + '1': [ + { + feeToken: '0x123', + prices: [{ id: 'cpu', price: 1 }] + } + ] + }, + access: { + addresses: [], + accessLists: null + } + } + ] } ]) ] diff --git a/src/test/unit/compute.test.ts b/src/test/unit/compute.test.ts index 8836e2ae0..1164213f8 100644 --- a/src/test/unit/compute.test.ts +++ b/src/test/unit/compute.test.ts @@ -130,7 +130,7 @@ describe('Compute Jobs Database', () => { envOverrides = buildEnvOverrideConfig( [ENVIRONMENT_VARIABLES.DOCKER_COMPUTE_ENVIRONMENTS], [ - '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"fees":{"1":[{"feeToken":"0x123","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"minJobDuration":10,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]' + '[{"socketPath":"/var/run/docker.sock","environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"cpu","total":4,"max":4,"min":1,"type":"cpu"},{"id":"ram","total":10,"max":10,"min":1,"type":"ram"},{"id":"disk","total":10,"max":10,"min":0,"type":"disk"}],"fees":{"1":[{"feeToken":"0x123","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"minJobDuration":10,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]}]' ] ) envOverrides = await setupEnvironment(TEST_ENV_CONFIG_FILE, envOverrides) diff --git a/src/utils/config/schemas.ts b/src/utils/config/schemas.ts index e18df6de8..a71851549 100644 --- a/src/utils/config/schemas.ts +++ b/src/utils/config/schemas.ts @@ -142,6 +142,7 @@ export const ComputeEnvFeesSchema = z.object({ }) export const ComputeEnvironmentFreeOptionsSchema = z.object({ + minJobDuration: z.number().int().optional().default(60), maxJobDuration: z.number().int().optional().default(3600), maxJobs: z.number().int().optional().default(3), resources: z.array(ComputeResourceSchema).optional(), @@ -157,63 +158,61 @@ export const ComputeEnvironmentFreeOptionsSchema = z.object({ allowImageBuild: z.boolean().optional().default(false) }) +export const C2DEnvironmentConfigSchema = z + .object({ + id: z.string().optional(), + description: z.string().optional(), + storageExpiry: z.number().int().optional().default(604800), + minJobDuration: z.number().int().optional().default(60), + maxJobDuration: z.number().int().optional().default(3600), + maxJobs: z.number().int().optional(), + fees: z.record(z.string(), z.array(ComputeEnvFeesSchema)).optional(), + access: z + .object({ + addresses: z.array(z.string()), + accessLists: z + .array(z.record(z.string(), z.array(z.string()))) + .nullable() + .optional() + }) + .optional(), + free: ComputeEnvironmentFreeOptionsSchema.optional(), + resources: z.array(ComputeResourceSchema).optional() + }) + .refine( + (data) => + (data.fees !== undefined && Object.keys(data.fees).length > 0) || + (data.free !== undefined && data.free !== null), + { + message: + 'Each environment must have either a non-empty "fees" configuration or a "free" configuration' + } + ) + .refine((data) => data.storageExpiry >= data.maxJobDuration, { + message: '"storageExpiry" should be greater than "maxJobDuration"' + }) + .refine( + (data) => { + if (!data.resources) return false + return data.resources.some((r) => r.id === 'disk' && r.total) + }, + { message: 'There is no "disk" resource configured. This is mandatory' } + ) + export const C2DDockerConfigSchema = z.array( - z - .object({ - socketPath: z.string().optional(), - protocol: z.string().optional(), - host: z.string().optional(), - port: z.number().optional(), - caPath: z.string().optional(), - certPath: z.string().optional(), - keyPath: z.string().optional(), - resources: z.array(ComputeResourceSchema).optional(), - storageExpiry: z.number().int().optional().default(604800), - maxJobDuration: z.number().int().optional().default(3600), - minJobDuration: z.number().int().optional().default(60), - access: z - .object({ - addresses: z.array(z.string()), - accessLists: z - .array(z.record(z.string(), z.array(z.string()))) - .nullable() - .optional() - }) - .optional(), - fees: z.record(z.string(), z.array(ComputeEnvFeesSchema)).optional(), - free: ComputeEnvironmentFreeOptionsSchema.optional(), - imageRetentionDays: z.number().int().min(1).optional().default(7), - imageCleanupInterval: z.number().int().min(3600).optional().default(86400) // min 1 hour, default 24 hours - }) - .refine( - (data) => - (data.fees !== undefined && Object.keys(data.fees).length > 0) || - (data.free !== undefined && data.free !== null), - { - message: - 'Each docker compute environment must have either a non-empty "fees" configuration or a "free" configuration' - } - ) - .refine((data) => data.storageExpiry >= data.maxJobDuration, { - message: '"storageExpiry" should be greater than "maxJobDuration"' - }) - .refine( - (data) => { - if (!data.resources) return false - return data.resources.some((r) => r.id === 'disk' && r.total) - }, - { message: 'There is no "disk" resource configured. This is mandatory' } - ) - .transform((data) => { - if (data.resources) { - for (const resource of data.resources) { - if (resource.id === 'disk' && resource.total) { - resource.type = 'disk' - } - } - } - return data - }) + z.object({ + socketPath: z.string().optional(), + protocol: z.string().optional(), + host: z.string().optional(), + port: z.number().optional(), + caPath: z.string().optional(), + certPath: z.string().optional(), + keyPath: z.string().optional(), + imageRetentionDays: z.number().int().min(1).optional().default(7), + imageCleanupInterval: z.number().int().min(3600).optional().default(86400), + paymentClaimInterval: z.number().int().optional(), + environments: z.array(C2DEnvironmentConfigSchema).min(1) + }) ) export const C2DClusterInfoSchema = z.object({