diff --git a/docs/API.md b/docs/API.md index 10d887219..108eeea3c 100644 --- a/docs/API.md +++ b/docs/API.md @@ -1400,19 +1400,20 @@ starts a free compute job and returns jobId if succesfull #### Parameters -| name | type | required | description | -| ----------------- | ------ | -------- | ---------------------------------------------------------------- | -| command | string | v | command name | -| node | string | | if not present it means current node | -| consumerAddress | string | v | consumer address | -| signature | string | v | signature (msg=String(nonce) ) | -| nonce | string | v | nonce for the request | -| datasets | object | | list of ComputeAsset to be used as inputs | -| algorithm | object | | ComputeAlgorithm definition | -| environment | string | v | compute environment to use | -| resources | object | | optional list of required resources | -| metadata | object | | optional metadata for the job, data provided by the user | -| additionalViewers | object | | optional array of addresses that are allowed to fetch the result | +| name | type | required | description | +| ----------------- | ------ | -------- | ----------------------------------------------------------------------------- | +| command | string | v | command name | +| node | string | | if not present it means current node | +| consumerAddress | string | v | consumer address | +| signature | string | v | signature (msg=String(nonce) ) | +| nonce | string | v | nonce for the request | +| datasets | object | | list of ComputeAsset to be used as inputs | +| algorithm | object | | ComputeAlgorithm definition | +| environment | string | v | compute environment to use | +| resources | object | | optional list of required resources | +| metadata | object | | optional metadata for the job, data provided by the user | +| additionalViewers | object | | optional array of addresses that are allowed to fetch the result | +| queueMaxWaitTime | number | | optional max time in seconds a job can wait in the queue before being started | #### Request diff --git a/src/@types/C2D/C2D.ts b/src/@types/C2D/C2D.ts index db54e6812..879720066 100644 --- a/src/@types/C2D/C2D.ts +++ b/src/@types/C2D/C2D.ts @@ -117,6 +117,10 @@ export interface ComputeEnvironment extends ComputeEnvironmentBaseConfig { runningJobs: number runningfreeJobs?: number consumerAddress: string // v1 + queuedJobs: number + queuedFreeJobs: number + queMaxWaitTime: number + queMaxWaitTimeFree: number } export interface C2DDockerConfig { @@ -174,6 +178,7 @@ export interface ComputeJob { environment?: string metadata?: DBComputeJobMetadata terminationDetails?: ComputeJobTerminationDetails + queueMaxWaitTime: number // max time in seconds a job can wait in the queue before being started } export interface ComputeOutput { @@ -260,6 +265,10 @@ export enum C2DStatusNumber { // eslint-disable-next-line no-unused-vars JobStarted = 0, // eslint-disable-next-line no-unused-vars + JobQueued = 1, + // eslint-disable-next-line no-unused-vars + JobQueuedExpired = 2, + // eslint-disable-next-line no-unused-vars PullImage = 10, // eslint-disable-next-line no-unused-vars PullImageFailed = 11, @@ -302,6 +311,10 @@ export enum C2DStatusText { // eslint-disable-next-line no-unused-vars JobStarted = 'Job started', // eslint-disable-next-line no-unused-vars + JobQueued = 'Job queued', + // eslint-disable-next-line no-unused-vars + JobQueuedExpired = 'Job expired in queue', + // eslint-disable-next-line no-unused-vars PullImage = 'Pulling algorithm image', // eslint-disable-next-line no-unused-vars PullImageFailed = 'Pulling algorithm image failed', diff --git a/src/@types/commands.ts b/src/@types/commands.ts index 98263df53..d7cb2597b 100644 --- a/src/@types/commands.ts +++ b/src/@types/commands.ts @@ -185,10 +185,10 @@ export interface ComputeGetEnvironmentsCommand extends Command { export interface ComputePayment { chainId: number token: string - resources?: ComputeResourceRequest[] + resources?: ComputeResourceRequest[] // only used in initializeCompute } export interface ComputeInitializeCommand extends Command { - datasets: [ComputeAsset] + datasets: ComputeAsset[] algorithm: ComputeAlgorithm environment: string payment: ComputePayment @@ -196,6 +196,7 @@ export interface ComputeInitializeCommand extends Command { signature?: string maxJobDuration: number policyServer?: any // object to pass to policy server + queueMaxWaitTime?: number // max time in seconds a job can wait in the queue before being started } export interface FreeComputeStartCommand extends Command { @@ -211,6 +212,7 @@ export interface FreeComputeStartCommand extends Command { policyServer?: any // object to pass to policy server metadata?: DBComputeJobMetadata additionalViewers?: string[] // addresses of additional addresses that can get results + queueMaxWaitTime?: number // max time in seconds a job can wait in the queue before being started } export interface PaidComputeStartCommand extends FreeComputeStartCommand { payment: ComputePayment diff --git a/src/components/c2d/compute_engine_base.ts b/src/components/c2d/compute_engine_base.ts index 918ec7a9c..408176d74 100644 --- a/src/components/c2d/compute_engine_base.ts +++ b/src/components/c2d/compute_engine_base.ts @@ -65,7 +65,8 @@ export abstract class C2DEngine { payment: DBComputeJobPayment, jobId: string, metadata?: DBComputeJobMetadata, - additionalViewers?: string[] + additionalViewers?: string[], + queueMaxWaitTime?: number ): Promise public abstract stopComputeJob( @@ -233,22 +234,45 @@ export abstract class C2DEngine { const jobs = await this.db.getRunningJobs(this.getC2DConfig().hash) let totalJobs = 0 let totalFreeJobs = 0 + let queuedJobs = 0 + let queuedFreeJobs = 0 + let maxWaitTime = 0 + let maxWaitTimeFree = 0 for (const job of jobs) { if (job.environment === env.id) { - totalJobs++ - if (job.isFree) totalFreeJobs++ - - for (const resource of job.resources) { - if (!(resource.id in usedResources)) usedResources[resource.id] = 0 - usedResources[resource.id] += resource.amount + if (job.queueMaxWaitTime === 0) { + totalJobs++ + if (job.isFree) totalFreeJobs++ + + for (const resource of job.resources) { + if (!(resource.id in usedResources)) usedResources[resource.id] = 0 + usedResources[resource.id] += resource.amount + if (job.isFree) { + if (!(resource.id in usedFreeResources)) usedFreeResources[resource.id] = 0 + usedFreeResources[resource.id] += resource.amount + } + } + } else { + // queued job + queuedJobs++ + maxWaitTime += job.maxJobDuration if (job.isFree) { - if (!(resource.id in usedFreeResources)) usedFreeResources[resource.id] = 0 - usedFreeResources[resource.id] += resource.amount + queuedFreeJobs++ + maxWaitTimeFree += job.maxJobDuration } } } } - return { totalJobs, totalFreeJobs, usedResources, usedFreeResources } + return { + totalJobs, + totalFreeJobs, + usedResources, + usedFreeResources, + queuedJobs, + queuedFreeJobs, + maxWaitTime, + maxWaitTimeFree + } } // overridden by each engine if required diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index 56e993097..0d7b5d98f 100644 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -171,7 +171,11 @@ export class C2DEngineDocker extends C2DEngine { addresses: [], accessLists: [] }, - fees + fees, + queuedJobs: 0, + queuedFreeJobs: 0, + queMaxWaitTime: 0, + queMaxWaitTimeFree: 0 }) if (`access` in envConfig) this.envs[0].access = envConfig.access @@ -287,10 +291,22 @@ export class C2DEngineDocker extends C2DEngine { // if (systemInfo.Runtimes) computeEnv.runtimes = systemInfo.Runtimes // if (systemInfo.DefaultRuntime) // computeEnv.defaultRuntime = systemInfo.DefaultRuntime - const { totalJobs, totalFreeJobs, usedResources, usedFreeResources } = - await this.getUsedResources(computeEnv) + const { + totalJobs, + totalFreeJobs, + usedResources, + usedFreeResources, + queuedJobs, + queuedFreeJobs, + maxWaitTime, + maxWaitTimeFree + } = await this.getUsedResources(computeEnv) computeEnv.runningJobs = totalJobs computeEnv.runningfreeJobs = totalFreeJobs + computeEnv.queuedJobs = queuedJobs + computeEnv.queuedFreeJobs = queuedFreeJobs + computeEnv.queMaxWaitTime = maxWaitTime + computeEnv.queMaxWaitTimeFree = maxWaitTimeFree for (let i = 0; i < computeEnv.resources.length; i++) { if (computeEnv.resources[i].id in usedResources) computeEnv.resources[i].inUse = usedResources[computeEnv.resources[i].id] @@ -365,7 +381,8 @@ export class C2DEngineDocker extends C2DEngine { payment: DBComputeJobPayment, jobId: string, metadata?: DBComputeJobMetadata, - additionalViewers?: string[] + additionalViewers?: string[], + queueMaxWaitTime?: number ): Promise { if (!this.docker) return [] // TO DO - iterate over resources and get default runtime @@ -409,6 +426,9 @@ export class C2DEngineDocker extends C2DEngine { ) // make sure that we don't keep them in the db structure algorithm.meta.container.additionalDockerFiles = null + if (queueMaxWaitTime && queueMaxWaitTime > 0) { + throw new Error(`additionalDockerFiles cannot be used with queued jobs`) + } } const job: DBComputeJob = { clusterHash: this.getC2DConfig().hash, @@ -417,8 +437,14 @@ export class C2DEngineDocker extends C2DEngine { jobId, dateCreated: String(Date.now() / 1000), dateFinished: null, - status: C2DStatusNumber.JobStarted, - statusText: C2DStatusText.JobStarted, + status: + queueMaxWaitTime && queueMaxWaitTime > 0 + ? C2DStatusNumber.JobQueued + : C2DStatusNumber.JobStarted, + statusText: + queueMaxWaitTime && queueMaxWaitTime > 0 + ? C2DStatusText.JobQueued + : C2DStatusText.JobStarted, results: [], algorithm, assets, @@ -439,13 +465,16 @@ export class C2DEngineDocker extends C2DEngine { metadata, additionalViewers, terminationDetails: { exitCode: null, OOMKilled: null }, - algoDuration: 0 + algoDuration: 0, + queueMaxWaitTime: queueMaxWaitTime || 0 } if (algorithm.meta.container && algorithm.meta.container.dockerfile) { - // we need to build the image - job.status = C2DStatusNumber.BuildImage - job.statusText = C2DStatusText.BuildImage + // we need to build the image if job is not queued + if (queueMaxWaitTime === 0) { + job.status = C2DStatusNumber.BuildImage + job.statusText = C2DStatusText.BuildImage + } } else { // already built, we need to validate it const validation = await C2DEngineDocker.checkDockerImage(image, env.platform) @@ -454,8 +483,10 @@ export class C2DEngineDocker extends C2DEngine { throw new Error( `Cannot find image ${image} for ${env.platform.architecture}. Maybe it does not exist or it's build for other arhitectures.` ) - job.status = C2DStatusNumber.PullImage - job.statusText = C2DStatusText.PullImage + if (queueMaxWaitTime === 0) { + job.status = C2DStatusNumber.PullImage + job.statusText = C2DStatusText.PullImage + } } await this.makeJobFolders(job) @@ -464,10 +495,12 @@ export class C2DEngineDocker extends C2DEngine { if (!addedId) { return [] } - if (algorithm.meta.container && algorithm.meta.container.dockerfile) { - this.buildImage(job, additionalDockerFiles) - } else { - this.pullImage(job) + if (queueMaxWaitTime === 0) { + if (algorithm.meta.container && algorithm.meta.container.dockerfile) { + this.buildImage(job, additionalDockerFiles) + } else { + this.pullImage(job) + } } // only now set the timer if (!this.cronTimer) { @@ -611,8 +644,7 @@ export class C2DEngineDocker extends C2DEngine { } if ( jobs[0].owner !== consumerAddress && - jobs[0].additionalViewers && - !jobs[0].additionalViewers.includes(consumerAddress) + (!jobs[0].additionalViewers || !jobs[0].additionalViewers.includes(consumerAddress)) ) { // consumerAddress is not the owner and not in additionalViewers throw new Error( @@ -815,6 +847,44 @@ export class C2DEngineDocker extends C2DEngine { - delete the container - delete the volume */ + if (job.status === C2DStatusNumber.JobQueued) { + // check if we can start the job now + const now = String(Date.now() / 1000) + if (job.queueMaxWaitTime < parseFloat(now) - parseFloat(job.dateCreated)) { + job.status = C2DStatusNumber.JobQueuedExpired + job.statusText = C2DStatusText.JobQueuedExpired + job.isRunning = false + job.dateFinished = now + await this.db.updateJob(job) + await this.cleanupJob(job) + return + } + // check if resources are available now + try { + const env = await this.getComputeEnvironment( + job.payment && job.payment.chainId ? job.payment.chainId : null, + job.environment, + null + ) + await this.checkIfResourcesAreAvailable(job.resources, env, true) + } catch (err) { + // resources are still not available + return + } + // resources are now available, let's start the job + const { algorithm } = job + if (algorithm.meta.container && algorithm.meta.container.dockerfile) { + job.status = C2DStatusNumber.BuildImage + job.statusText = C2DStatusText.BuildImage + this.buildImage(job, null) + } else { + job.status = C2DStatusNumber.PullImage + job.statusText = C2DStatusText.PullImage + this.pullImage(job) + } + await this.db.updateJob(job) + } + if (job.status === C2DStatusNumber.ConfiguringVolumes) { // create the volume & create container // TO DO C2D: Choose driver & size diff --git a/src/components/core/compute/initialize.ts b/src/components/core/compute/initialize.ts index aee054935..11d92f595 100644 --- a/src/components/core/compute/initialize.ts +++ b/src/components/core/compute/initialize.ts @@ -69,6 +69,9 @@ export class ComputeInitializeHandler extends CommandHandler { if (this.shouldDenyTaskHandling(validationResponse)) { return validationResponse } + if (!task.queueMaxWaitTime) { + task.queueMaxWaitTime = 0 + } let engine let env let resourcesNeeded @@ -193,7 +196,9 @@ export class ComputeInitializeHandler extends CommandHandler { escrowAddress, payee: env.consumerAddress, chainId: task.payment.chainId, - minLockSeconds: engine.escrow.getMinLockTime(task.maxJobDuration), + minLockSeconds: engine.escrow.getMinLockTime( + task.maxJobDuration + task.queueMaxWaitTime + ), token: task.payment.token, amount: await engine.escrow.getPaymentAmountInWei( cost, diff --git a/src/components/core/compute/startCompute.ts b/src/components/core/compute/startCompute.ts index a9041a866..c4ae1c5ce 100644 --- a/src/components/core/compute/startCompute.ts +++ b/src/components/core/compute/startCompute.ts @@ -68,7 +68,9 @@ export class PaidComputeStartHandler extends CommandHandler { if (this.shouldDenyTaskHandling(validationResponse)) { return validationResponse } - + if (!task.queueMaxWaitTime) { + task.queueMaxWaitTime = 0 + } const authValidationResponse = await this.validateTokenOrSignature( task.authorization, task.consumerAddress, @@ -115,12 +117,11 @@ export class PaidComputeStartHandler extends CommandHandler { if (!task.maxJobDuration || task.maxJobDuration > env.maxJobDuration) { task.maxJobDuration = env.maxJobDuration } - task.payment.resources = await engine.checkAndFillMissingResources( - task.payment.resources, + task.resources = await engine.checkAndFillMissingResources( + task.resources, env, false ) - await engine.checkIfResourcesAreAvailable(task.payment.resources, env, true) } catch (e) { return { stream: null, @@ -130,6 +131,23 @@ export class PaidComputeStartHandler extends CommandHandler { } } } + try { + await engine.checkIfResourcesAreAvailable(task.resources, env, true) + } catch (e) { + if (task.queueMaxWaitTime > 0) { + CORE_LOGGER.verbose( + `Compute resources not available, queuing job for max ${task.queueMaxWaitTime} seconds` + ) + } else { + return { + stream: null, + status: { + httpStatus: 400, + error: e + } + } + } + } const { algorithm } = task const config = await getConfiguration() @@ -442,7 +460,7 @@ export class PaidComputeStartHandler extends CommandHandler { } const resources: ComputeResourceRequestWithPrice[] = [] - for (const res of task.payment.resources) { + for (const res of task.resources) { const price = engine.getResourcePrice(prices, res.id) resources.push({ id: res.id, @@ -466,7 +484,7 @@ export class PaidComputeStartHandler extends CommandHandler { const jobId = generateUniqueID(s) // let's calculate payment needed based on resources request and maxJobDuration const cost = engine.calculateResourcesCost( - task.payment.resources, + task.resources, env, task.payment.chainId, task.payment.token, @@ -480,7 +498,7 @@ export class PaidComputeStartHandler extends CommandHandler { task.payment.token, task.consumerAddress, cost, - task.maxJobDuration + engine.escrow.getMinLockTime(task.maxJobDuration + task.queueMaxWaitTime) ) } catch (e) { return { @@ -499,7 +517,7 @@ export class PaidComputeStartHandler extends CommandHandler { env.id, task.consumerAddress, task.maxJobDuration, - task.payment.resources, + task.resources, { chainId: task.payment.chainId, token: task.payment.token, @@ -509,7 +527,8 @@ export class PaidComputeStartHandler extends CommandHandler { }, jobId, task.metadata, - task.additionalViewers + task.additionalViewers, + task.queueMaxWaitTime ) CORE_LOGGER.logMessage( 'ComputeStartCommand Response: ' + JSON.stringify(response, null, 2), @@ -577,7 +596,9 @@ export class FreeComputeStartHandler extends CommandHandler { if (this.shouldDenyTaskHandling(validationResponse)) { return validationResponse } - + if (!task.queueMaxWaitTime) { + task.queueMaxWaitTime = 0 + } const authValidationResponse = await this.validateTokenOrSignature( task.authorization, task.consumerAddress, @@ -716,18 +737,17 @@ export class FreeComputeStartHandler extends CommandHandler { } } } - try { - const env = await engine.getComputeEnvironment(null, task.environment) - if (!env) { - return { - stream: null, - status: { - httpStatus: 500, - error: 'Invalid C2D Environment' - } + const env = await engine.getComputeEnvironment(null, task.environment) + if (!env) { + return { + stream: null, + status: { + httpStatus: 500, + error: 'Invalid C2D Environment' } } - + } + try { const accessGranted = await validateAccess(task.consumerAddress, env.free.access) if (!accessGranted) { return { @@ -744,7 +764,7 @@ export class FreeComputeStartHandler extends CommandHandler { env, true ) - await engine.checkIfResourcesAreAvailable(task.resources, env, true) + if (!task.maxJobDuration || task.maxJobDuration > env.free.maxJobDuration) { task.maxJobDuration = env.free.maxJobDuration } @@ -758,6 +778,23 @@ export class FreeComputeStartHandler extends CommandHandler { } } } + try { + await engine.checkIfResourcesAreAvailable(task.resources, env, true) + } catch (e) { + if (task.queueMaxWaitTime > 0) { + CORE_LOGGER.verbose( + `Compute resources not available, queuing job for max ${task.queueMaxWaitTime} seconds` + ) + } else { + return { + stream: null, + status: { + httpStatus: 400, + error: e + } + } + } + } // console.log(task.resources) /* return { @@ -789,7 +826,8 @@ export class FreeComputeStartHandler extends CommandHandler { null, jobId, task.metadata, - task.additionalViewers + task.additionalViewers, + task.queueMaxWaitTime ) CORE_LOGGER.logMessage( diff --git a/src/components/database/sqliteCompute.ts b/src/components/database/sqliteCompute.ts index 75b7c5716..74dcbe84a 100644 --- a/src/components/database/sqliteCompute.ts +++ b/src/components/database/sqliteCompute.ts @@ -42,7 +42,8 @@ function getInternalStructure(job: DBComputeJob): any { additionalViewers: job.additionalViewers, terminationDetails: job.terminationDetails, payment: job.payment, - algoDuration: job.algoDuration + algoDuration: job.algoDuration, + queueMaxWaitTime: job.queueMaxWaitTime } return internalBlob } @@ -271,7 +272,7 @@ export class SQLiteCompute implements ComputeDatabaseProvider { getRunningJobs(engine?: string, environment?: string): Promise { const selectSQL = ` - SELECT * FROM ${this.schema.name} WHERE dateFinished IS NULL + SELECT * FROM ${this.schema.name} WHERE dateFinished IS NULL ORDER by dateCreated ` return new Promise((resolve, reject) => { this.db.all(selectSQL, (err, rows: any[] | undefined) => { diff --git a/src/components/httpRoutes/compute.ts b/src/components/httpRoutes/compute.ts index 9d5976d78..7f94a3a47 100644 --- a/src/components/httpRoutes/compute.ts +++ b/src/components/httpRoutes/compute.ts @@ -80,7 +80,8 @@ computeRoutes.post(`${SERVICES_API_BASE_PATH}/compute`, async (req, res) => { policyServer: (req.body.policyServer as any) || null, metadata: req.body.metadata || null, authorization: req.headers?.authorization, - additionalViewers: (req.body.additionalViewers as unknown as string[]) || null + additionalViewers: (req.body.additionalViewers as unknown as string[]) || null, + queueMaxWaitTime: req.body.queueMaxWaitTime || 0 } if (req.body.output) { startComputeTask.output = req.body.output as ComputeOutput @@ -125,7 +126,8 @@ computeRoutes.post(`${SERVICES_API_BASE_PATH}/freeCompute`, async (req, res) => policyServer: (req.body.policyServer as any) || null, metadata: req.body.metadata || null, authorization: req.headers?.authorization, - additionalViewers: (req.body.additionalViewers as unknown as string[]) || null + additionalViewers: (req.body.additionalViewers as unknown as string[]) || null, + queueMaxWaitTime: req.body.queueMaxWaitTime || 0 } if (req.body.output) { startComputeTask.output = req.body.output as ComputeOutput diff --git a/src/test/integration/algorithmsAccess.test.ts b/src/test/integration/algorithmsAccess.test.ts index 0d0f40e2d..b8d685600 100644 --- a/src/test/integration/algorithmsAccess.test.ts +++ b/src/test/integration/algorithmsAccess.test.ts @@ -389,7 +389,7 @@ describe('Trusted algorithms Flow', () => { initializeResponse.payment.token, firstEnv.consumerAddress, balance, - computeJobDuration, + initializeResponse.payment.minLockSeconds, 10 ) const locks = await oceanNode.escrow.getLocks( diff --git a/src/test/integration/compute.test.ts b/src/test/integration/compute.test.ts index d7a5fd983..be2f00caf 100644 --- a/src/test/integration/compute.test.ts +++ b/src/test/integration/compute.test.ts @@ -17,10 +17,10 @@ import type { ComputeInitializeCommand, ComputeGetResultCommand } from '../../@types/commands.js' -import type { - ComputeAsset, - ComputeAlgorithm, - ComputeEnvironment +import { + type ComputeAsset, + type ComputeAlgorithm, + type ComputeEnvironment } from '../../@types/C2D/C2D.js' import { // DB_TYPES, @@ -77,6 +77,8 @@ import { import { freeComputeStartPayload } from '../data/commands.js' import { DDOManager } from '@oceanprotocol/ddo-js' +const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)) + describe('Compute', () => { let previousConfiguration: OverrideEnvConfig[] let config: OceanNodeConfig @@ -89,6 +91,7 @@ describe('Compute', () => { let publishedComputeDataset: any let publishedAlgoDataset: any let jobId: string + let freeJobId: string let datasetOrderTxId: any let algoOrderTxId: any let paymentToken: any @@ -325,7 +328,6 @@ describe('Compute', () => { ) computeEnvironments = await streamToObject(response.stream as Readable) firstEnv = computeEnvironments[0] - const initializeComputeTask: ComputeInitializeCommand = { datasets: [dataset], algorithm, @@ -633,7 +635,7 @@ describe('Compute', () => { assert(!response.stream, 'We should not have a stream') }) - it('should start a compute job', async () => { + it('should start a compute job with maxed resources', async () => { // first check escrow auth let balance = await paymentTokenContract.balanceOf(await consumerAccount.getAddress()) @@ -695,6 +697,10 @@ describe('Compute', () => { ) const messageHashBytes = ethers.toBeArray(consumerMessage) const signature = await wallet.signMessage(messageHashBytes) + const re = [] + for (const res of firstEnv.resources) { + re.push({ id: res.id, amount: res.total }) + } const startComputeTask: PaidComputeStartCommand = { command: PROTOCOL_COMMANDS.COMPUTE_START, consumerAddress: await consumerAccount.getAddress(), @@ -719,7 +725,12 @@ describe('Compute', () => { chainId: DEVELOPMENT_CHAIN_ID, token: paymentToken }, - maxJobDuration: computeJobDuration + metadata: { + key: 'value' + }, + additionalViewers: [await wallet2.getAddress()], + maxJobDuration: computeJobDuration, + resources: re // additionalDatasets?: ComputeAsset[] // output?: ComputeOutput } @@ -741,7 +752,7 @@ describe('Compute', () => { initializeResponse.payment.token, firstEnv.consumerAddress, balance, - computeJobDuration, + initializeResponse.payment.minLockSeconds, 10 ) auth = await oceanNode.escrow.getAuthorizations( @@ -782,6 +793,7 @@ describe('Compute', () => { nonce: nonce2, signature: signature2 }) + console.log(response) assert(response, 'Failed to get response') assert(response.status.httpStatus === 200, 'Failed to get 200 response') assert(response.stream, 'Failed to get stream') @@ -790,6 +802,7 @@ describe('Compute', () => { const jobs = await streamToObject(response.stream as Readable) // eslint-disable-next-line prefer-destructuring jobId = jobs[0].jobId + console.log('**** Started compute job with id: ', jobId) // check escrow funds = await oceanNode.escrow.getUserAvailableFunds( DEVELOPMENT_CHAIN_ID, @@ -817,9 +830,19 @@ describe('Compute', () => { ) }) - /* it('should start a free docker compute job', async () => { + it('should try start another compute job with maxed resources, but fail', async () => { + const getEnvironmentsTask = { + command: PROTOCOL_COMMANDS.COMPUTE_GET_ENVIRONMENTS + } + const eresponse = await new ComputeGetEnvironmentsHandler(oceanNode).handle( + getEnvironmentsTask + ) + computeEnvironments = await streamToObject(eresponse.stream as Readable) + console.log(computeEnvironments[0]) const nonce = Date.now().toString() - const message = String(nonce) + const message = String( + (await consumerAccount.getAddress()) + publishedComputeDataset.ddo.id + nonce + ) // sign message/nonce const consumerMessage = ethers.solidityPackedKeccak256( ['bytes'], @@ -827,44 +850,47 @@ describe('Compute', () => { ) const messageHashBytes = ethers.toBeArray(consumerMessage) const signature = await wallet.signMessage(messageHashBytes) - const startComputeTask: FreeComputeStartCommand = { - command: PROTOCOL_COMMANDS.FREE_COMPUTE_START, + const re = [] + for (const res of firstEnv.resources) { + re.push({ id: res.id, amount: res.total }) + } + const startComputeTask: PaidComputeStartCommand = { + command: PROTOCOL_COMMANDS.COMPUTE_START, consumerAddress: await consumerAccount.getAddress(), signature, nonce, environment: firstEnv.id, datasets: [ { - fileObject: computeAsset.services[0].files.files[0], documentId: publishedComputeDataset.ddo.id, serviceId: publishedComputeDataset.ddo.services[0].id, transferTxId: datasetOrderTxId } ], algorithm: { - fileObject: algoAsset.services[0].files.files[0], documentId: publishedAlgoDataset.ddo.id, serviceId: publishedAlgoDataset.ddo.services[0].id, transferTxId: algoOrderTxId, meta: publishedAlgoDataset.ddo.metadata.algorithm }, - output: {} + output: {}, + payment: { + chainId: DEVELOPMENT_CHAIN_ID, + token: paymentToken + }, + maxJobDuration: computeJobDuration, + resources: re // additionalDatasets?: ComputeAsset[] // output?: ComputeOutput } - const response = await new FreeComputeStartHandler(oceanNode).handle(startComputeTask) - assert(response, 'Failed to get response') - assert(response.status.httpStatus === 200, 'Failed to get 200 response') - assert(response.stream, 'Failed to get stream') - expect(response.stream).to.be.instanceOf(Readable) - - const jobs = await streamToObject(response.stream as Readable) - assert(jobs[0].jobId, 'failed to got job id') - // eslint-disable-next-line prefer-destructuring - jobId = jobs[0].jobId - }) */ + // it should fail, because we don't have enough free resources + const response = await new PaidComputeStartHandler(oceanNode).handle(startComputeTask) + console.log(response) + assert(response.status.httpStatus === 400, 'Failed to get 400 response') + assert(!response.stream, 'We should not have a stream') + }) - it('should start a free docker compute job', async () => { + it('should start a queued free docker compute job', async () => { const nonce = Date.now().toString() const message = String(nonce) // sign message/nonce @@ -896,10 +922,7 @@ describe('Compute', () => { meta: publishedAlgoDataset.ddo.metadata.algorithm }, output: {}, - metadata: { - key: 'value' - }, - additionalViewers: [await wallet2.getAddress()] + queueMaxWaitTime: 300 // 5 minutes // additionalDatasets?: ComputeAsset[] // output?: ComputeOutput } @@ -912,8 +935,10 @@ describe('Compute', () => { const jobs = await streamToObject(response.stream as Readable) assert(jobs[0].jobId, 'failed to got job id') + console.log('**** Started FREE compute job with id: ', jobs[0].jobId) + console.log(jobs[0]) // eslint-disable-next-line prefer-destructuring - jobId = jobs[0].jobId + freeJobId = jobs[0].jobId }) it('should get job status by jobId', async () => { @@ -1021,6 +1046,7 @@ describe('Compute', () => { const response = await new ComputeGetResultHandler(oceanNode).handle( resultComputeTask ) + console.log(response) assert(response, 'Failed to get response') assert(response.status.httpStatus === 500, 'Failed to get 500 response') console.log(response.status.error) @@ -1048,6 +1074,29 @@ describe('Compute', () => { assert(response.status.httpStatus === 200, 'Failed to get 200 response') assert(response.stream, 'Failed to get stream') expect(response.stream).to.be.instanceOf(Readable) + let tries = 0 + do { + const statusComputeTask: ComputeGetStatusCommand = { + command: PROTOCOL_COMMANDS.COMPUTE_GET_STATUS, + consumerAddress: null, + agreementId: null, + jobId + } + const response = await new ComputeGetStatusHandler(oceanNode).handle( + statusComputeTask + ) + assert(response, 'Failed to get response') + assert(response.status.httpStatus === 200, 'Failed to get 200 response') + assert(response.stream, 'Failed to get stream') + expect(response.stream).to.be.instanceOf(Readable) + const jobs = await streamToObject(response.stream as Readable) + console.log('Checking job status after stop...') + console.log(jobs[0]) + if (jobs[0].dateFinished) break + if (tries > 10) assert.fail('Job not stopped after multiple tries') + await sleep(2000) + tries++ + } while (true) }) it('should deny the Free job due to signature (directCommand payload)', async function () { freeComputeStartPayload.environment = firstEnv.id @@ -1077,6 +1126,25 @@ describe('Compute', () => { assert(response.status.httpStatus === 500, 'Failed to get 500 response') assert(response.stream === null, 'Should not get stream') }) + // let's check our queued job + it('should get job status by jobId', async () => { + const statusComputeTask: ComputeGetStatusCommand = { + command: PROTOCOL_COMMANDS.COMPUTE_GET_STATUS, + consumerAddress: null, + agreementId: null, + jobId: freeJobId + } + const response = await new ComputeGetStatusHandler(oceanNode).handle( + statusComputeTask + ) + assert(response, 'Failed to get response') + assert(response.status.httpStatus === 200, 'Failed to get 200 response') + assert(response.stream, 'Failed to get stream') + expect(response.stream).to.be.instanceOf(Readable) + const jobs = await streamToObject(response.stream as Readable) + console.log('Checking FREE job status...') + console.log(jobs[0]) + }) // algo and checksums related describe('C2D algo and checksums related', () => { it('should publish AlgoDDO', async () => { diff --git a/src/test/integration/getJobs.test.ts b/src/test/integration/getJobs.test.ts index cc8ca53e6..0f405a66b 100644 --- a/src/test/integration/getJobs.test.ts +++ b/src/test/integration/getJobs.test.ts @@ -55,7 +55,8 @@ function buildJob(overrides: Partial = {}): DBComputeJob { resources: overrides.resources || [], payment: overrides.payment, additionalViewers: overrides.additionalViewers || [], - algoDuration: overrides.algoDuration || 0 + algoDuration: overrides.algoDuration || 0, + queueMaxWaitTime: overrides.queueMaxWaitTime || 0 } } diff --git a/src/test/unit/compute.test.ts b/src/test/unit/compute.test.ts index c36d65f4b..29c49af43 100644 --- a/src/test/unit/compute.test.ts +++ b/src/test/unit/compute.test.ts @@ -97,7 +97,8 @@ describe('Compute Jobs Database', () => { isFree: false, algoStartTimestamp: '0', algoStopTimestamp: '0', - algoDuration: 0 + algoDuration: 0, + queueMaxWaitTime: 0 } jobId = await db.newJob(job) @@ -167,7 +168,8 @@ describe('Compute Jobs Database', () => { isFree: false, algoStartTimestamp: '0', algoStopTimestamp: '0', - algoDuration: 0 + algoDuration: 0, + queueMaxWaitTime: 0 } const jobId = await db.newJob(job)