diff --git a/config.json b/config.json index 4bf0ce975..69693d5d4 100644 --- a/config.json +++ b/config.json @@ -1,147 +1,147 @@ { - "authorizedDecrypters": [], - "authorizedDecryptersList": [], - "allowedValidators": [], - "allowedValidatorsList": [], - "authorizedPublishers": [], - "authorizedPublishersList": [], - "keys": {}, - "hasIndexer": true, - "hasHttp": true, - "hasP2P": true, - "p2pConfig": { - "bootstrapTimeout": 20000, - "bootstrapTagName": "bootstrap", - "bootstrapTagValue": 50, - "bootstrapTTL": 0, - "enableIPV4": true, - "enableIPV6": true, - "ipV4BindAddress": "0.0.0.0", - "ipV4BindTcpPort": 9000, - "ipV4BindWsPort": 9001, - "ipV6BindAddress": "::", - "ipV6BindTcpPort": 9002, - "ipV6BindWsPort": 9003, - "announceAddresses": [], - "pubsubPeerDiscoveryInterval": 10000, - "dhtMaxInboundStreams": 500, - "dhtMaxOutboundStreams": 500, - "dhtFilter": null, - "mDNSInterval": 20000, - "connectionsMaxParallelDials": 15, - "connectionsDialTimeout": 30000, - "upnp": true, - "autoNat": true, - "enableCircuitRelayServer": false, - "enableCircuitRelayClient": false, - "circuitRelays": 0, - "announcePrivateIp": false, - "filterAnnouncedAddresses": [ - "127.0.0.0/8", - "10.0.0.0/8", - "172.16.0.0/12", - "192.168.0.0/16", - "100.64.0.0/10", - "169.254.0.0/16", - "192.0.0.0/24", - "192.0.2.0/24", - "198.51.100.0/24", - "203.0.113.0/24", - "224.0.0.0/4", - "240.0.0.0/4" - ], - "minConnections": 1, - "maxConnections": 300, - "autoDialPeerRetryThreshold": 7200000, - "autoDialConcurrency": 5, - "maxPeerAddrsToDial": 5, - "autoDialInterval": 5000, - "enableNetworkStats": false - }, - "hasControlPanel": true, - "httpPort": 8001, - "dbConfig": { - "url": "http://localhost:8108/?apiKey=xyz", - "username": "", - "password": "", - "dbType": "typesense" - }, - "supportedNetworks": { - "8996": { - "rpc": "http://127.0.0.1:8545", - "chainId": 8996, - "network": "development", - "chunkSize": 100 - } - }, - "feeStrategy": {}, - "c2dClusters": [], - "ipfsGateway": "https://ipfs.io/", - "arweaveGateway": "https://arweave.net/", - "accountPurgatoryUrl": null, - "assetPurgatoryUrl": null, - "allowedAdmins": [], - "allowedAdminsList": [], - "rateLimit": 30, - "maxConnections": 30, - "denyList": { - "peers": [], - "ips": [] - }, - "unsafeURLs": [], - "isBootstrap": false, - "claimDurationTimeout": 600, - "validateUnsignedDDO": true, - "jwtSecret": "ocean-node-secret", - "dockerComputeEnvironments": [ + "authorizedDecrypters": [], + "authorizedDecryptersList": [], + "allowedValidators": [], + "allowedValidatorsList": [], + "authorizedPublishers": [], + "authorizedPublishersList": [], + "keys": {}, + "hasIndexer": true, + "hasHttp": true, + "hasP2P": true, + "p2pConfig": { + "bootstrapTimeout": 20000, + "bootstrapTagName": "bootstrap", + "bootstrapTagValue": 50, + "bootstrapTTL": 0, + "enableIPV4": true, + "enableIPV6": true, + "ipV4BindAddress": "0.0.0.0", + "ipV4BindTcpPort": 9000, + "ipV4BindWsPort": 9001, + "ipV6BindAddress": "::", + "ipV6BindTcpPort": 9002, + "ipV6BindWsPort": 9003, + "announceAddresses": [], + "pubsubPeerDiscoveryInterval": 10000, + "dhtMaxInboundStreams": 500, + "dhtMaxOutboundStreams": 500, + "dhtFilter": null, + "mDNSInterval": 20000, + "connectionsMaxParallelDials": 15, + "connectionsDialTimeout": 30000, + "upnp": true, + "autoNat": true, + "enableCircuitRelayServer": false, + "enableCircuitRelayClient": false, + "circuitRelays": 0, + "announcePrivateIp": false, + "filterAnnouncedAddresses": [ + "127.0.0.0/8", + "10.0.0.0/8", + "172.16.0.0/12", + "192.168.0.0/16", + "100.64.0.0/10", + "169.254.0.0/16", + "192.0.0.0/24", + "192.0.2.0/24", + "198.51.100.0/24", + "203.0.113.0/24", + "224.0.0.0/4", + "240.0.0.0/4" + ], + "minConnections": 1, + "maxConnections": 300, + "autoDialPeerRetryThreshold": 7200000, + "autoDialConcurrency": 5, + "maxPeerAddrsToDial": 5, + "autoDialInterval": 5000, + "enableNetworkStats": false + }, + "hasControlPanel": true, + "httpPort": 8001, + "dbConfig": { + "url": "http://localhost:8108/?apiKey=xyz", + "username": "", + "password": "", + "dbType": "typesense" + }, + "supportedNetworks": { + "8996": { + "rpc": "http://127.0.0.1:8545", + "chainId": 8996, + "network": "development", + "chunkSize": 100 + } + }, + "feeStrategy": {}, + "c2dClusters": [], + "ipfsGateway": "https://ipfs.io/", + "arweaveGateway": "https://arweave.net/", + "accountPurgatoryUrl": null, + "assetPurgatoryUrl": null, + "allowedAdmins": [], + "allowedAdminsList": [], + "rateLimit": 30, + "maxConnections": 30, + "denyList": { + "peers": [], + "ips": [] + }, + "unsafeURLs": [], + "isBootstrap": false, + "claimDurationTimeout": 3600, + "validateUnsignedDDO": true, + "jwtSecret": "ocean-node-secret", + "dockerComputeEnvironments": [ + { + "socketPath": "/var/run/docker.sock", + "resources": [ { - "socketPath": "/var/run/docker.sock", - "resources": [ - { - "id": "disk", - "total": 1 - } - ], - "storageExpiry": 604800, - "maxJobDuration": 3600, - "access": { - "addresses": [], - "accessLists": [] - }, - "fees": { - "8996": [ - { - "prices": [ - { - "id": "cpu", - "price": 1 - } - ] - } - ] - }, - "free": { - "maxJobDuration": 3600, - "maxJobs": 3, - "access": { - "addresses": [], - "accessLists": [] - }, - "resources": [ - { - "id": "cpu", - "max": 1 - }, - { - "id": "ram", - "max": 1 - }, - { - "id": "disk", - "max": 1 - } - ] - } + "id": "disk", + "total": 1 } - ] -} \ No newline at end of file + ], + "storageExpiry": 604800, + "maxJobDuration": 3600, + "access": { + "addresses": [], + "accessLists": [] + }, + "fees": { + "8996": [ + { + "prices": [ + { + "id": "cpu", + "price": 1 + } + ] + } + ] + }, + "free": { + "maxJobDuration": 3600, + "maxJobs": 3, + "access": { + "addresses": [], + "accessLists": [] + }, + "resources": [ + { + "id": "cpu", + "max": 1 + }, + { + "id": "ram", + "max": 1 + }, + { + "id": "disk", + "max": 1 + } + ] + } + } + ] +} diff --git a/docs/API.md b/docs/API.md index 10d887219..108eeea3c 100644 --- a/docs/API.md +++ b/docs/API.md @@ -1400,19 +1400,20 @@ starts a free compute job and returns jobId if succesfull #### Parameters -| name | type | required | description | -| ----------------- | ------ | -------- | ---------------------------------------------------------------- | -| command | string | v | command name | -| node | string | | if not present it means current node | -| consumerAddress | string | v | consumer address | -| signature | string | v | signature (msg=String(nonce) ) | -| nonce | string | v | nonce for the request | -| datasets | object | | list of ComputeAsset to be used as inputs | -| algorithm | object | | ComputeAlgorithm definition | -| environment | string | v | compute environment to use | -| resources | object | | optional list of required resources | -| metadata | object | | optional metadata for the job, data provided by the user | -| additionalViewers | object | | optional array of addresses that are allowed to fetch the result | +| name | type | required | description | +| ----------------- | ------ | -------- | ----------------------------------------------------------------------------- | +| command | string | v | command name | +| node | string | | if not present it means current node | +| consumerAddress | string | v | consumer address | +| signature | string | v | signature (msg=String(nonce) ) | +| nonce | string | v | nonce for the request | +| datasets | object | | list of ComputeAsset to be used as inputs | +| algorithm | object | | ComputeAlgorithm definition | +| environment | string | v | compute environment to use | +| resources | object | | optional list of required resources | +| metadata | object | | optional metadata for the job, data provided by the user | +| additionalViewers | object | | optional array of addresses that are allowed to fetch the result | +| queueMaxWaitTime | number | | optional max time in seconds a job can wait in the queue before being started | #### Request diff --git a/docs/env.md b/docs/env.md index 74c561bf5..c9d38e3e6 100644 --- a/docs/env.md +++ b/docs/env.md @@ -52,7 +52,7 @@ Environmental variables are also tracked in `ENVIRONMENT_VARIABLES` within `src/ ## Payments -- `ESCROW_CLAIM_TIMEOUT`: Amount of time reserved to claim a escrow payment, in seconds. Defaults to `600`. Example: `600` +- `ESCROW_CLAIM_TIMEOUT`: Amount of time reserved to claim a escrow payment, in seconds. Defaults to `3600`. Example: `3600` ## Logs diff --git a/src/@types/C2D/C2D.ts b/src/@types/C2D/C2D.ts index db54e6812..879720066 100644 --- a/src/@types/C2D/C2D.ts +++ b/src/@types/C2D/C2D.ts @@ -117,6 +117,10 @@ export interface ComputeEnvironment extends ComputeEnvironmentBaseConfig { runningJobs: number runningfreeJobs?: number consumerAddress: string // v1 + queuedJobs: number + queuedFreeJobs: number + queMaxWaitTime: number + queMaxWaitTimeFree: number } export interface C2DDockerConfig { @@ -174,6 +178,7 @@ export interface ComputeJob { environment?: string metadata?: DBComputeJobMetadata terminationDetails?: ComputeJobTerminationDetails + queueMaxWaitTime: number // max time in seconds a job can wait in the queue before being started } export interface ComputeOutput { @@ -260,6 +265,10 @@ export enum C2DStatusNumber { // eslint-disable-next-line no-unused-vars JobStarted = 0, // eslint-disable-next-line no-unused-vars + JobQueued = 1, + // eslint-disable-next-line no-unused-vars + JobQueuedExpired = 2, + // eslint-disable-next-line no-unused-vars PullImage = 10, // eslint-disable-next-line no-unused-vars PullImageFailed = 11, @@ -302,6 +311,10 @@ export enum C2DStatusText { // eslint-disable-next-line no-unused-vars JobStarted = 'Job started', // eslint-disable-next-line no-unused-vars + JobQueued = 'Job queued', + // eslint-disable-next-line no-unused-vars + JobQueuedExpired = 'Job expired in queue', + // eslint-disable-next-line no-unused-vars PullImage = 'Pulling algorithm image', // eslint-disable-next-line no-unused-vars PullImageFailed = 'Pulling algorithm image failed', diff --git a/src/@types/commands.ts b/src/@types/commands.ts index 767836057..de4d99278 100644 --- a/src/@types/commands.ts +++ b/src/@types/commands.ts @@ -186,10 +186,10 @@ export interface ComputeGetEnvironmentsCommand extends Command { export interface ComputePayment { chainId: number token: string - resources?: ComputeResourceRequest[] + resources?: ComputeResourceRequest[] // only used in initializeCompute } export interface ComputeInitializeCommand extends Command { - datasets: [ComputeAsset] + datasets: ComputeAsset[] algorithm: ComputeAlgorithm environment: string payment: ComputePayment @@ -197,6 +197,7 @@ export interface ComputeInitializeCommand extends Command { signature?: string maxJobDuration: number policyServer?: any // object to pass to policy server + queueMaxWaitTime?: number // max time in seconds a job can wait in the queue before being started } export interface FreeComputeStartCommand extends Command { @@ -212,6 +213,7 @@ export interface FreeComputeStartCommand extends Command { policyServer?: any // object to pass to policy server metadata?: DBComputeJobMetadata additionalViewers?: string[] // addresses of additional addresses that can get results + queueMaxWaitTime?: number // max time in seconds a job can wait in the queue before being started } export interface PaidComputeStartCommand extends FreeComputeStartCommand { payment: ComputePayment diff --git a/src/components/c2d/compute_engine_base.ts b/src/components/c2d/compute_engine_base.ts index 63f64034b..73c794c24 100644 --- a/src/components/c2d/compute_engine_base.ts +++ b/src/components/c2d/compute_engine_base.ts @@ -65,7 +65,8 @@ export abstract class C2DEngine { payment: DBComputeJobPayment, jobId: string, metadata?: DBComputeJobMetadata, - additionalViewers?: string[] + additionalViewers?: string[], + queueMaxWaitTime?: number ): Promise public abstract stopComputeJob( @@ -233,22 +234,45 @@ export abstract class C2DEngine { const jobs = await this.db.getRunningJobs(this.getC2DConfig().hash) let totalJobs = 0 let totalFreeJobs = 0 + let queuedJobs = 0 + let queuedFreeJobs = 0 + let maxWaitTime = 0 + let maxWaitTimeFree = 0 for (const job of jobs) { if (job.environment === env.id) { - totalJobs++ - if (job.isFree) totalFreeJobs++ - - for (const resource of job.resources) { - if (!(resource.id in usedResources)) usedResources[resource.id] = 0 - usedResources[resource.id] += resource.amount + if (job.queueMaxWaitTime === 0) { + totalJobs++ + if (job.isFree) totalFreeJobs++ + + for (const resource of job.resources) { + if (!(resource.id in usedResources)) usedResources[resource.id] = 0 + usedResources[resource.id] += resource.amount + if (job.isFree) { + if (!(resource.id in usedFreeResources)) usedFreeResources[resource.id] = 0 + usedFreeResources[resource.id] += resource.amount + } + } + } else { + // queued job + queuedJobs++ + maxWaitTime += job.maxJobDuration if (job.isFree) { - if (!(resource.id in usedFreeResources)) usedFreeResources[resource.id] = 0 - usedFreeResources[resource.id] += resource.amount + queuedFreeJobs++ + maxWaitTimeFree += job.maxJobDuration } } } } - return { totalJobs, totalFreeJobs, usedResources, usedFreeResources } + return { + totalJobs, + totalFreeJobs, + usedResources, + usedFreeResources, + queuedJobs, + queuedFreeJobs, + maxWaitTime, + maxWaitTimeFree + } } // overridden by each engine if required diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index 17b04f64f..704730a7b 100644 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -173,7 +173,11 @@ export class C2DEngineDocker extends C2DEngine { addresses: [], accessLists: [] }, - fees + fees, + queuedJobs: 0, + queuedFreeJobs: 0, + queMaxWaitTime: 0, + queMaxWaitTimeFree: 0 }) if (`access` in envConfig) this.envs[0].access = envConfig.access @@ -289,10 +293,22 @@ export class C2DEngineDocker extends C2DEngine { // if (systemInfo.Runtimes) computeEnv.runtimes = systemInfo.Runtimes // if (systemInfo.DefaultRuntime) // computeEnv.defaultRuntime = systemInfo.DefaultRuntime - const { totalJobs, totalFreeJobs, usedResources, usedFreeResources } = - await this.getUsedResources(computeEnv) + const { + totalJobs, + totalFreeJobs, + usedResources, + usedFreeResources, + queuedJobs, + queuedFreeJobs, + maxWaitTime, + maxWaitTimeFree + } = await this.getUsedResources(computeEnv) computeEnv.runningJobs = totalJobs computeEnv.runningfreeJobs = totalFreeJobs + computeEnv.queuedJobs = queuedJobs + computeEnv.queuedFreeJobs = queuedFreeJobs + computeEnv.queMaxWaitTime = maxWaitTime + computeEnv.queMaxWaitTimeFree = maxWaitTimeFree for (let i = 0; i < computeEnv.resources.length; i++) { if (computeEnv.resources[i].id in usedResources) computeEnv.resources[i].inUse = usedResources[computeEnv.resources[i].id] @@ -367,7 +383,8 @@ export class C2DEngineDocker extends C2DEngine { payment: DBComputeJobPayment, jobId: string, metadata?: DBComputeJobMetadata, - additionalViewers?: string[] + additionalViewers?: string[], + queueMaxWaitTime?: number ): Promise { if (!this.docker) return [] // TO DO - iterate over resources and get default runtime @@ -411,6 +428,9 @@ export class C2DEngineDocker extends C2DEngine { ) // make sure that we don't keep them in the db structure algorithm.meta.container.additionalDockerFiles = null + if (queueMaxWaitTime && queueMaxWaitTime > 0) { + throw new Error(`additionalDockerFiles cannot be used with queued jobs`) + } } const job: DBComputeJob = { clusterHash: this.getC2DConfig().hash, @@ -419,8 +439,14 @@ export class C2DEngineDocker extends C2DEngine { jobId, dateCreated: String(Date.now() / 1000), dateFinished: null, - status: C2DStatusNumber.JobStarted, - statusText: C2DStatusText.JobStarted, + status: + queueMaxWaitTime && queueMaxWaitTime > 0 + ? C2DStatusNumber.JobQueued + : C2DStatusNumber.JobStarted, + statusText: + queueMaxWaitTime && queueMaxWaitTime > 0 + ? C2DStatusText.JobQueued + : C2DStatusText.JobStarted, results: [], algorithm, assets, @@ -441,13 +467,16 @@ export class C2DEngineDocker extends C2DEngine { metadata, additionalViewers, terminationDetails: { exitCode: null, OOMKilled: null }, - algoDuration: 0 + algoDuration: 0, + queueMaxWaitTime: queueMaxWaitTime || 0 } if (algorithm.meta.container && algorithm.meta.container.dockerfile) { - // we need to build the image - job.status = C2DStatusNumber.BuildImage - job.statusText = C2DStatusText.BuildImage + // we need to build the image if job is not queued + if (queueMaxWaitTime === 0) { + job.status = C2DStatusNumber.BuildImage + job.statusText = C2DStatusText.BuildImage + } } else { // already built, we need to validate it const validation = await C2DEngineDocker.checkDockerImage(image, env.platform) @@ -456,8 +485,10 @@ export class C2DEngineDocker extends C2DEngine { throw new Error( `Cannot find image ${image} for ${env.platform.architecture}. Maybe it does not exist or it's build for other arhitectures.` ) - job.status = C2DStatusNumber.PullImage - job.statusText = C2DStatusText.PullImage + if (queueMaxWaitTime === 0) { + job.status = C2DStatusNumber.PullImage + job.statusText = C2DStatusText.PullImage + } } await this.makeJobFolders(job) @@ -466,10 +497,12 @@ export class C2DEngineDocker extends C2DEngine { if (!addedId) { return [] } - if (algorithm.meta.container && algorithm.meta.container.dockerfile) { - this.buildImage(job, additionalDockerFiles) - } else { - this.pullImage(job) + if (queueMaxWaitTime === 0) { + if (algorithm.meta.container && algorithm.meta.container.dockerfile) { + this.buildImage(job, additionalDockerFiles) + } else { + this.pullImage(job) + } } // only now set the timer if (!this.cronTimer) { @@ -613,8 +646,7 @@ export class C2DEngineDocker extends C2DEngine { } if ( jobs[0].owner !== consumerAddress && - jobs[0].additionalViewers && - !jobs[0].additionalViewers.includes(consumerAddress) + (!jobs[0].additionalViewers || !jobs[0].additionalViewers.includes(consumerAddress)) ) { // consumerAddress is not the owner and not in additionalViewers throw new Error( @@ -817,6 +849,44 @@ export class C2DEngineDocker extends C2DEngine { - delete the container - delete the volume */ + if (job.status === C2DStatusNumber.JobQueued) { + // check if we can start the job now + const now = String(Date.now() / 1000) + if (job.queueMaxWaitTime < parseFloat(now) - parseFloat(job.dateCreated)) { + job.status = C2DStatusNumber.JobQueuedExpired + job.statusText = C2DStatusText.JobQueuedExpired + job.isRunning = false + job.dateFinished = now + await this.db.updateJob(job) + await this.cleanupJob(job) + return + } + // check if resources are available now + try { + const env = await this.getComputeEnvironment( + job.payment && job.payment.chainId ? job.payment.chainId : null, + job.environment, + null + ) + await this.checkIfResourcesAreAvailable(job.resources, env, true) + } catch (err) { + // resources are still not available + return + } + // resources are now available, let's start the job + const { algorithm } = job + if (algorithm.meta.container && algorithm.meta.container.dockerfile) { + job.status = C2DStatusNumber.BuildImage + job.statusText = C2DStatusText.BuildImage + this.buildImage(job, null) + } else { + job.status = C2DStatusNumber.PullImage + job.statusText = C2DStatusText.PullImage + this.pullImage(job) + } + await this.db.updateJob(job) + } + if (job.status === C2DStatusNumber.ConfiguringVolumes) { // create the volume & create container // TO DO C2D: Choose driver & size diff --git a/src/components/core/compute/initialize.ts b/src/components/core/compute/initialize.ts index 855f62611..e820d2b87 100644 --- a/src/components/core/compute/initialize.ts +++ b/src/components/core/compute/initialize.ts @@ -69,6 +69,9 @@ export class ComputeInitializeHandler extends CommandHandler { if (this.shouldDenyTaskHandling(validationResponse)) { return validationResponse } + if (!task.queueMaxWaitTime) { + task.queueMaxWaitTime = 0 + } let engine let env let resourcesNeeded @@ -193,7 +196,9 @@ export class ComputeInitializeHandler extends CommandHandler { escrowAddress, payee: env.consumerAddress, chainId: task.payment.chainId, - minLockSeconds: engine.escrow.getMinLockTime(task.maxJobDuration), + minLockSeconds: engine.escrow.getMinLockTime( + task.maxJobDuration + task.queueMaxWaitTime + ), token: task.payment.token, amount: await engine.escrow.getPaymentAmountInWei( cost, diff --git a/src/components/core/compute/startCompute.ts b/src/components/core/compute/startCompute.ts index d1cfa40dd..327a52614 100644 --- a/src/components/core/compute/startCompute.ts +++ b/src/components/core/compute/startCompute.ts @@ -68,7 +68,9 @@ export class PaidComputeStartHandler extends CommandHandler { if (this.shouldDenyTaskHandling(validationResponse)) { return validationResponse } - + if (!task.queueMaxWaitTime) { + task.queueMaxWaitTime = 0 + } const authValidationResponse = await this.validateTokenOrSignature( task.authorization, task.consumerAddress, @@ -115,18 +117,34 @@ export class PaidComputeStartHandler extends CommandHandler { if (!task.maxJobDuration || task.maxJobDuration > env.maxJobDuration) { task.maxJobDuration = env.maxJobDuration } - task.payment.resources = await engine.checkAndFillMissingResources( - task.payment.resources, + task.resources = await engine.checkAndFillMissingResources( + task.resources, env, false ) - await engine.checkIfResourcesAreAvailable(task.payment.resources, env, true) } catch (e) { return { stream: null, status: { httpStatus: 400, - error: e + error: e?.message || String(e) + } + } + } + try { + await engine.checkIfResourcesAreAvailable(task.resources, env, true) + } catch (e) { + if (task.queueMaxWaitTime > 0) { + CORE_LOGGER.verbose( + `Compute resources not available, queuing job for max ${task.queueMaxWaitTime} seconds` + ) + } else { + return { + stream: null, + status: { + httpStatus: 400, + error: e?.message || String(e) + } } } } @@ -441,7 +459,7 @@ export class PaidComputeStartHandler extends CommandHandler { } const resources: ComputeResourceRequestWithPrice[] = [] - for (const res of task.payment.resources) { + for (const res of task.resources) { const price = engine.getResourcePrice(prices, res.id) resources.push({ id: res.id, @@ -465,7 +483,7 @@ export class PaidComputeStartHandler extends CommandHandler { const jobId = generateUniqueID(s) // let's calculate payment needed based on resources request and maxJobDuration const cost = engine.calculateResourcesCost( - task.payment.resources, + task.resources, env, task.payment.chainId, task.payment.token, @@ -479,7 +497,7 @@ export class PaidComputeStartHandler extends CommandHandler { task.payment.token, task.consumerAddress, cost, - engine.escrow.getMinLockTime(task.maxJobDuration) + engine.escrow.getMinLockTime(task.maxJobDuration + task.queueMaxWaitTime) ) } catch (e) { if (e.message.includes('insufficient funds for intrinsic transaction cost')) { @@ -496,7 +514,7 @@ export class PaidComputeStartHandler extends CommandHandler { stream: null, status: { httpStatus: 400, - error: e + error: e?.message || String(e) } } } @@ -508,7 +526,7 @@ export class PaidComputeStartHandler extends CommandHandler { env.id, task.consumerAddress, task.maxJobDuration, - task.payment.resources, + task.resources, { chainId: task.payment.chainId, token: task.payment.token, @@ -518,7 +536,8 @@ export class PaidComputeStartHandler extends CommandHandler { }, jobId, task.metadata, - task.additionalViewers + task.additionalViewers, + task.queueMaxWaitTime ) CORE_LOGGER.logMessage( 'ComputeStartCommand Response: ' + JSON.stringify(response, null, 2), @@ -539,14 +558,14 @@ export class PaidComputeStartHandler extends CommandHandler { task.payment.token, task.consumerAddress ) - } catch (e) { + } catch (cancelError) { // is fine if it fails } return { stream: null, status: { httpStatus: 400, - error: e + error: e?.message || String(e) } } } @@ -586,7 +605,9 @@ export class FreeComputeStartHandler extends CommandHandler { if (this.shouldDenyTaskHandling(validationResponse)) { return validationResponse } - + if (!task.queueMaxWaitTime) { + task.queueMaxWaitTime = 0 + } const authValidationResponse = await this.validateTokenOrSignature( task.authorization, task.consumerAddress, @@ -725,18 +746,17 @@ export class FreeComputeStartHandler extends CommandHandler { } } } - try { - const env = await engine.getComputeEnvironment(null, task.environment) - if (!env) { - return { - stream: null, - status: { - httpStatus: 500, - error: 'Invalid C2D Environment' - } + const env = await engine.getComputeEnvironment(null, task.environment) + if (!env) { + return { + stream: null, + status: { + httpStatus: 500, + error: 'Invalid C2D Environment' } } - + } + try { const accessGranted = await validateAccess(task.consumerAddress, env.free.access) if (!accessGranted) { return { @@ -753,7 +773,7 @@ export class FreeComputeStartHandler extends CommandHandler { env, true ) - await engine.checkIfResourcesAreAvailable(task.resources, env, true) + if (!task.maxJobDuration || task.maxJobDuration > env.free.maxJobDuration) { task.maxJobDuration = env.free.maxJobDuration } @@ -767,6 +787,23 @@ export class FreeComputeStartHandler extends CommandHandler { } } } + try { + await engine.checkIfResourcesAreAvailable(task.resources, env, true) + } catch (e) { + if (task.queueMaxWaitTime > 0) { + CORE_LOGGER.verbose( + `Compute resources not available, queuing job for max ${task.queueMaxWaitTime} seconds` + ) + } else { + return { + stream: null, + status: { + httpStatus: 400, + error: e + } + } + } + } // console.log(task.resources) /* return { @@ -798,7 +835,8 @@ export class FreeComputeStartHandler extends CommandHandler { null, jobId, task.metadata, - task.additionalViewers + task.additionalViewers, + task.queueMaxWaitTime ) CORE_LOGGER.logMessage( diff --git a/src/components/core/utils/escrow.ts b/src/components/core/utils/escrow.ts index 134e9816c..99469ed13 100644 --- a/src/components/core/utils/escrow.ts +++ b/src/components/core/utils/escrow.ts @@ -175,7 +175,7 @@ export class Escrow { const locks = await this.getLocks(chain, token, payer, await signer.getAddress()) for (const lock of locks) { if (BigInt(lock.jobId.toString()) === BigInt(jobId)) { - const gas = await contract.claimLock.estimateGas( + const gas = await contract.claimLockAndWithdraw.estimateGas( jobId, token, payer, @@ -183,7 +183,7 @@ export class Escrow { ethers.toUtf8Bytes(proof) ) const gasOptions = await blockchain.getGasOptions(gas, 1.2) - const tx = await contract.claimLock( + const tx = await contract.claimLockAndWithdraw( jobId, token, payer, diff --git a/src/components/database/C2DDatabase.ts b/src/components/database/C2DDatabase.ts index fcb08a8f8..20ac3d7e5 100644 --- a/src/components/database/C2DDatabase.ts +++ b/src/components/database/C2DDatabase.ts @@ -67,10 +67,6 @@ export class C2DDatabase extends AbstractDatabase { return await this.provider.getRunningJobs(engine, environment) } - async getAllFinishedJobs(): Promise { - return await this.provider.getAllFinishedJobs() - } - async deleteJob(jobId: string): Promise { return await this.provider.deleteJob(jobId) } diff --git a/src/components/database/sqliteCompute.ts b/src/components/database/sqliteCompute.ts index 739e6f910..74dcbe84a 100644 --- a/src/components/database/sqliteCompute.ts +++ b/src/components/database/sqliteCompute.ts @@ -42,7 +42,8 @@ function getInternalStructure(job: DBComputeJob): any { additionalViewers: job.additionalViewers, terminationDetails: job.terminationDetails, payment: job.payment, - algoDuration: job.algoDuration + algoDuration: job.algoDuration, + queueMaxWaitTime: job.queueMaxWaitTime } return internalBlob } @@ -271,7 +272,7 @@ export class SQLiteCompute implements ComputeDatabaseProvider { getRunningJobs(engine?: string, environment?: string): Promise { const selectSQL = ` - SELECT * FROM ${this.schema.name} WHERE dateFinished IS NULL + SELECT * FROM ${this.schema.name} WHERE dateFinished IS NULL ORDER by dateCreated ` return new Promise((resolve, reject) => { this.db.all(selectSQL, (err, rows: any[] | undefined) => { @@ -314,36 +315,6 @@ export class SQLiteCompute implements ComputeDatabaseProvider { }) } - getAllFinishedJobs(): Promise { - const selectSQL = ` - SELECT * FROM ${this.schema.name} WHERE dateFinished IS NOT NULL OR results IS NOT NULL - ` - - return new Promise((resolve, reject) => { - this.db.all(selectSQL, (err, rows: any[] | undefined) => { - if (err) { - DATABASE_LOGGER.error(err.message) - reject(err) - } else { - if (rows && rows.length > 0) { - const all: DBComputeJob[] = rows.map((row) => { - const body = generateJSONFromBlob(row.body) - delete row.body - const maxJobDuration = row.expireTimestamp - delete row.expireTimestamp - const job: DBComputeJob = { ...row, ...body, maxJobDuration } - return job - }) - resolve(all) - } else { - DATABASE_LOGGER.info('Could not find any running C2D jobs!') - resolve([]) - } - } - }) - }) - } - getFinishedJobs(environments?: string[]): Promise { let selectSQL = ` SELECT * FROM ${this.schema.name} WHERE (dateFinished IS NOT NULL OR results IS NOT NULL) diff --git a/src/components/httpRoutes/compute.ts b/src/components/httpRoutes/compute.ts index 9d5976d78..7f94a3a47 100644 --- a/src/components/httpRoutes/compute.ts +++ b/src/components/httpRoutes/compute.ts @@ -80,7 +80,8 @@ computeRoutes.post(`${SERVICES_API_BASE_PATH}/compute`, async (req, res) => { policyServer: (req.body.policyServer as any) || null, metadata: req.body.metadata || null, authorization: req.headers?.authorization, - additionalViewers: (req.body.additionalViewers as unknown as string[]) || null + additionalViewers: (req.body.additionalViewers as unknown as string[]) || null, + queueMaxWaitTime: req.body.queueMaxWaitTime || 0 } if (req.body.output) { startComputeTask.output = req.body.output as ComputeOutput @@ -125,7 +126,8 @@ computeRoutes.post(`${SERVICES_API_BASE_PATH}/freeCompute`, async (req, res) => policyServer: (req.body.policyServer as any) || null, metadata: req.body.metadata || null, authorization: req.headers?.authorization, - additionalViewers: (req.body.additionalViewers as unknown as string[]) || null + additionalViewers: (req.body.additionalViewers as unknown as string[]) || null, + queueMaxWaitTime: req.body.queueMaxWaitTime || 0 } if (req.body.output) { startComputeTask.output = req.body.output as ComputeOutput diff --git a/src/test/integration/algorithmsAccess.test.ts b/src/test/integration/algorithmsAccess.test.ts index 0d0f40e2d..b8d685600 100644 --- a/src/test/integration/algorithmsAccess.test.ts +++ b/src/test/integration/algorithmsAccess.test.ts @@ -389,7 +389,7 @@ describe('Trusted algorithms Flow', () => { initializeResponse.payment.token, firstEnv.consumerAddress, balance, - computeJobDuration, + initializeResponse.payment.minLockSeconds, 10 ) const locks = await oceanNode.escrow.getLocks( diff --git a/src/test/integration/compute.test.ts b/src/test/integration/compute.test.ts index 673a3bb42..c58b83524 100644 --- a/src/test/integration/compute.test.ts +++ b/src/test/integration/compute.test.ts @@ -17,10 +17,10 @@ import type { ComputeInitializeCommand, ComputeGetResultCommand } from '../../@types/commands.js' -import type { - ComputeAsset, - ComputeAlgorithm, - ComputeEnvironment +import { + type ComputeAsset, + type ComputeAlgorithm, + type ComputeEnvironment } from '../../@types/C2D/C2D.js' import { // DB_TYPES, @@ -77,6 +77,8 @@ import { import { freeComputeStartPayload } from '../data/commands.js' import { DDOManager } from '@oceanprotocol/ddo-js' +const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)) + describe('Compute', () => { let previousConfiguration: OverrideEnvConfig[] let config: OceanNodeConfig @@ -89,6 +91,7 @@ describe('Compute', () => { let publishedComputeDataset: any let publishedAlgoDataset: any let jobId: string + let freeJobId: string let datasetOrderTxId: any let algoOrderTxId: any let paymentToken: any @@ -332,7 +335,6 @@ describe('Compute', () => { console.log('firstEnv', firstEnv) computeEnvironments = await streamToObject(response.stream as Readable) firstEnv = computeEnvironments[0] - const initializeComputeTask: ComputeInitializeCommand = { datasets: [dataset], algorithm, @@ -640,7 +642,7 @@ describe('Compute', () => { assert(!response.stream, 'We should not have a stream') }) - it('should start a compute job', async () => { + it('should start a compute job with maxed resources', async () => { // first check escrow auth let balance = await paymentTokenContract.balanceOf(await consumerAccount.getAddress()) @@ -702,6 +704,10 @@ describe('Compute', () => { ) const messageHashBytes = ethers.toBeArray(consumerMessage) const signature = await wallet.signMessage(messageHashBytes) + const re = [] + for (const res of firstEnv.resources) { + re.push({ id: res.id, amount: res.total }) + } const startComputeTask: PaidComputeStartCommand = { command: PROTOCOL_COMMANDS.COMPUTE_START, consumerAddress: await consumerAccount.getAddress(), @@ -726,7 +732,12 @@ describe('Compute', () => { chainId: DEVELOPMENT_CHAIN_ID, token: paymentToken }, - maxJobDuration: computeJobDuration + metadata: { + key: 'value' + }, + additionalViewers: [await wallet2.getAddress()], + maxJobDuration: computeJobDuration, + resources: re // additionalDatasets?: ComputeAsset[] // output?: ComputeOutput } @@ -748,7 +759,7 @@ describe('Compute', () => { initializeResponse.payment.token, firstEnv.consumerAddress, balance, - computeJobDuration, + initializeResponse.payment.minLockSeconds, 10 ) auth = await oceanNode.escrow.getAuthorizations( @@ -789,6 +800,7 @@ describe('Compute', () => { nonce: nonce2, signature: signature2 }) + console.log(response) assert(response, 'Failed to get response') assert(response.status.httpStatus === 200, 'Failed to get 200 response') assert(response.stream, 'Failed to get stream') @@ -797,6 +809,7 @@ describe('Compute', () => { const jobs = await streamToObject(response.stream as Readable) // eslint-disable-next-line prefer-destructuring jobId = jobs[0].jobId + console.log('**** Started compute job with id: ', jobId) // check escrow funds = await oceanNode.escrow.getUserAvailableFunds( DEVELOPMENT_CHAIN_ID, @@ -824,9 +837,19 @@ describe('Compute', () => { ) }) - /* it('should start a free docker compute job', async () => { + it('should try start another compute job with maxed resources, but fail', async () => { + const getEnvironmentsTask = { + command: PROTOCOL_COMMANDS.COMPUTE_GET_ENVIRONMENTS + } + const eresponse = await new ComputeGetEnvironmentsHandler(oceanNode).handle( + getEnvironmentsTask + ) + computeEnvironments = await streamToObject(eresponse.stream as Readable) + console.log(computeEnvironments[0]) const nonce = Date.now().toString() - const message = String(nonce) + const message = String( + (await consumerAccount.getAddress()) + publishedComputeDataset.ddo.id + nonce + ) // sign message/nonce const consumerMessage = ethers.solidityPackedKeccak256( ['bytes'], @@ -834,44 +857,47 @@ describe('Compute', () => { ) const messageHashBytes = ethers.toBeArray(consumerMessage) const signature = await wallet.signMessage(messageHashBytes) - const startComputeTask: FreeComputeStartCommand = { - command: PROTOCOL_COMMANDS.FREE_COMPUTE_START, + const re = [] + for (const res of firstEnv.resources) { + re.push({ id: res.id, amount: res.total }) + } + const startComputeTask: PaidComputeStartCommand = { + command: PROTOCOL_COMMANDS.COMPUTE_START, consumerAddress: await consumerAccount.getAddress(), signature, nonce, environment: firstEnv.id, datasets: [ { - fileObject: computeAsset.services[0].files.files[0], documentId: publishedComputeDataset.ddo.id, serviceId: publishedComputeDataset.ddo.services[0].id, transferTxId: datasetOrderTxId } ], algorithm: { - fileObject: algoAsset.services[0].files.files[0], documentId: publishedAlgoDataset.ddo.id, serviceId: publishedAlgoDataset.ddo.services[0].id, transferTxId: algoOrderTxId, meta: publishedAlgoDataset.ddo.metadata.algorithm }, - output: {} + output: {}, + payment: { + chainId: DEVELOPMENT_CHAIN_ID, + token: paymentToken + }, + maxJobDuration: computeJobDuration, + resources: re // additionalDatasets?: ComputeAsset[] // output?: ComputeOutput } - const response = await new FreeComputeStartHandler(oceanNode).handle(startComputeTask) - assert(response, 'Failed to get response') - assert(response.status.httpStatus === 200, 'Failed to get 200 response') - assert(response.stream, 'Failed to get stream') - expect(response.stream).to.be.instanceOf(Readable) - - const jobs = await streamToObject(response.stream as Readable) - assert(jobs[0].jobId, 'failed to got job id') - // eslint-disable-next-line prefer-destructuring - jobId = jobs[0].jobId - }) */ + // it should fail, because we don't have enough free resources + const response = await new PaidComputeStartHandler(oceanNode).handle(startComputeTask) + console.log(response) + assert(response.status.httpStatus === 400, 'Failed to get 400 response') + assert(!response.stream, 'We should not have a stream') + }) - it('should start a free docker compute job', async () => { + it('should start a queued free docker compute job', async () => { const nonce = Date.now().toString() const message = String(nonce) // sign message/nonce @@ -903,10 +929,7 @@ describe('Compute', () => { meta: publishedAlgoDataset.ddo.metadata.algorithm }, output: {}, - metadata: { - key: 'value' - }, - additionalViewers: [await wallet2.getAddress()] + queueMaxWaitTime: 300 // 5 minutes // additionalDatasets?: ComputeAsset[] // output?: ComputeOutput } @@ -919,8 +942,10 @@ describe('Compute', () => { const jobs = await streamToObject(response.stream as Readable) assert(jobs[0].jobId, 'failed to got job id') + console.log('**** Started FREE compute job with id: ', jobs[0].jobId) + console.log(jobs[0]) // eslint-disable-next-line prefer-destructuring - jobId = jobs[0].jobId + freeJobId = jobs[0].jobId }) it('should get job status by jobId', async () => { @@ -1028,6 +1053,7 @@ describe('Compute', () => { const response = await new ComputeGetResultHandler(oceanNode).handle( resultComputeTask ) + console.log(response) assert(response, 'Failed to get response') assert(response.status.httpStatus === 500, 'Failed to get 500 response') console.log(response.status.error) @@ -1055,6 +1081,29 @@ describe('Compute', () => { assert(response.status.httpStatus === 200, 'Failed to get 200 response') assert(response.stream, 'Failed to get stream') expect(response.stream).to.be.instanceOf(Readable) + let tries = 0 + do { + const statusComputeTask: ComputeGetStatusCommand = { + command: PROTOCOL_COMMANDS.COMPUTE_GET_STATUS, + consumerAddress: null, + agreementId: null, + jobId + } + const response = await new ComputeGetStatusHandler(oceanNode).handle( + statusComputeTask + ) + assert(response, 'Failed to get response') + assert(response.status.httpStatus === 200, 'Failed to get 200 response') + assert(response.stream, 'Failed to get stream') + expect(response.stream).to.be.instanceOf(Readable) + const jobs = await streamToObject(response.stream as Readable) + console.log('Checking job status after stop...') + console.log(jobs[0]) + if (jobs[0].dateFinished) break + if (tries > 10) assert.fail('Job not stopped after multiple tries') + await sleep(2000) + tries++ + } while (true) }) it('should deny the Free job due to signature (directCommand payload)', async function () { freeComputeStartPayload.environment = firstEnv.id @@ -1084,6 +1133,25 @@ describe('Compute', () => { assert(response.status.httpStatus === 500, 'Failed to get 500 response') assert(response.stream === null, 'Should not get stream') }) + // let's check our queued job + it('should get job status by jobId', async () => { + const statusComputeTask: ComputeGetStatusCommand = { + command: PROTOCOL_COMMANDS.COMPUTE_GET_STATUS, + consumerAddress: null, + agreementId: null, + jobId: freeJobId + } + const response = await new ComputeGetStatusHandler(oceanNode).handle( + statusComputeTask + ) + assert(response, 'Failed to get response') + assert(response.status.httpStatus === 200, 'Failed to get 200 response') + assert(response.stream, 'Failed to get stream') + expect(response.stream).to.be.instanceOf(Readable) + const jobs = await streamToObject(response.stream as Readable) + console.log('Checking FREE job status...') + console.log(jobs[0]) + }) // algo and checksums related describe('C2D algo and checksums related', () => { it('should publish AlgoDDO', async () => { diff --git a/src/test/integration/getJobs.test.ts b/src/test/integration/getJobs.test.ts index cc8ca53e6..0f405a66b 100644 --- a/src/test/integration/getJobs.test.ts +++ b/src/test/integration/getJobs.test.ts @@ -55,7 +55,8 @@ function buildJob(overrides: Partial = {}): DBComputeJob { resources: overrides.resources || [], payment: overrides.payment, additionalViewers: overrides.additionalViewers || [], - algoDuration: overrides.algoDuration || 0 + algoDuration: overrides.algoDuration || 0, + queueMaxWaitTime: overrides.queueMaxWaitTime || 0 } } diff --git a/src/test/unit/compute.test.ts b/src/test/unit/compute.test.ts index b78daa9be..6af149c6a 100644 --- a/src/test/unit/compute.test.ts +++ b/src/test/unit/compute.test.ts @@ -97,7 +97,8 @@ describe('Compute Jobs Database', () => { isFree: false, algoStartTimestamp: '0', algoStopTimestamp: '0', - algoDuration: 0 + algoDuration: 0, + queueMaxWaitTime: 0 } jobId = await db.newJob(job) @@ -167,7 +168,8 @@ describe('Compute Jobs Database', () => { isFree: false, algoStartTimestamp: '0', algoStopTimestamp: '0', - algoDuration: 0 + algoDuration: 0, + queueMaxWaitTime: 0 } const jobId = await db.newJob(job) diff --git a/src/test/unit/indexer/indexer.test.ts b/src/test/unit/indexer/indexer.test.ts index c8bba1120..4c39241c8 100644 --- a/src/test/unit/indexer/indexer.test.ts +++ b/src/test/unit/indexer/indexer.test.ts @@ -99,4 +99,8 @@ describe('OceanIndexer', () => { await tearDownEnvironment(envOverrides) sandbox.restore() }) + after(async () => { + await tearDownEnvironment(envOverrides) + sandbox.restore() + }) }) diff --git a/src/utils/config/schemas.ts b/src/utils/config/schemas.ts index 0f6cfd2bb..dcd696db9 100644 --- a/src/utils/config/schemas.ts +++ b/src/utils/config/schemas.ts @@ -279,7 +279,7 @@ export const OceanNodeConfigSchema = z supportedNetworks: jsonFromString(RPCSSchema).optional(), - claimDurationTimeout: z.coerce.number().default(600), + claimDurationTimeout: z.coerce.number().default(3600), indexingNetworks: z .union([jsonFromString(RPCSSchema), z.array(z.union([z.string(), z.number()]))]) .optional(),