diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index 68c65a30b..89bdbd6a6 100644 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -16,7 +16,6 @@ import type { DBComputeJob, DBComputeJobPayment, ComputeResult, - RunningPlatform, ComputeEnvFeesStructure, ComputeResourceRequest, ComputeEnvFees @@ -47,7 +46,6 @@ import { AssetUtils } from '../../utils/asset.js' import { FindDdoHandler } from '../core/handler/ddoHandler.js' import { OceanNode } from '../../OceanNode.js' import { decryptFilesObject, omitDBComputeFieldsFromComputeJob } from './index.js' -import { ValidateParams } from '../httpRoutes/validateCommands.js' import { Service } from '@oceanprotocol/ddo-js' import { getOceanTokenAddressForChain } from '../../utils/address.js' @@ -58,7 +56,6 @@ export class C2DEngineDocker extends C2DEngine { private cronTimer: any private cronTime: number = 2000 private jobImageSizes: Map = new Map() - private static DEFAULT_DOCKER_REGISTRY = 'https://registry-1.docker.io' public constructor(clusterConfig: C2DClusterInfo, db: C2DDatabase, escrow: Escrow) { super(clusterConfig, db, escrow) @@ -332,107 +329,6 @@ export class C2DEngineDocker extends C2DEngine { return filteredEnvs } - private static parseImage(image: string) { - let registry = C2DEngineDocker.DEFAULT_DOCKER_REGISTRY - let name = image - let ref = 'latest' - - const atIdx = name.indexOf('@') - const colonIdx = name.lastIndexOf(':') - - if (atIdx !== -1) { - ref = name.slice(atIdx + 1) - name = name.slice(0, atIdx) - } else if (colonIdx !== -1 && !name.slice(colonIdx).includes('/')) { - ref = name.slice(colonIdx + 1) - name = name.slice(0, colonIdx) - } - - const firstSlash = name.indexOf('/') - if (firstSlash !== -1) { - const potential = name.slice(0, firstSlash) - if (potential.includes('.') || potential.includes(':')) { - registry = potential.includes('localhost') - ? `http://${potential}` - : `https://${potential}` - name = name.slice(firstSlash + 1) - } - } - - if (registry === C2DEngineDocker.DEFAULT_DOCKER_REGISTRY && !name.includes('/')) { - name = `library/${name}` - } - - return { registry, name, ref } - } - - public static async getDockerManifest(image: string): Promise { - const { registry, name, ref } = C2DEngineDocker.parseImage(image) - const url = `${registry}/v2/${name}/manifests/${ref}` - let headers: Record = { - Accept: - 'application/vnd.docker.distribution.manifest.v2+json, application/vnd.oci.image.manifest.v1+json, application/vnd.docker.distribution.manifest.list.v2+json, application/vnd.oci.image.index.v1+json' - } - let response = await fetch(url, { headers }) - - if (response.status === 401) { - const match = (response.headers.get('www-authenticate') || '').match( - /Bearer realm="([^"]+)",service="([^"]+)"/ - ) - if (match) { - const tokenUrl = new URL(match[1]) - tokenUrl.searchParams.set('service', match[2]) - tokenUrl.searchParams.set('scope', `repository:${name}:pull`) - const { token } = (await fetch(tokenUrl.toString()).then((r) => r.json())) as { - token: string - } - headers = { ...headers, Authorization: `Bearer ${token}` } - response = await fetch(url, { headers }) - } - } - - if (!response.ok) { - const body = await response.text() - throw new Error( - `Failed to get manifest: ${response.status} ${response.statusText} - ${body}` - ) - } - return await response.json() - } - - /** - * Checks the docker image by looking at the manifest - * @param image name or tag - * @returns boolean - */ - public static async checkDockerImage( - image: string, - platform?: RunningPlatform - ): Promise { - try { - const manifest = await C2DEngineDocker.getDockerManifest(image) - - const platforms = Array.isArray(manifest.manifests) - ? manifest.manifests.map((entry: any) => entry.platform) - : [manifest.platform] - - const isValidPlatform = platforms.some((entry: any) => - checkManifestPlatform(entry, platform) - ) - - return { valid: isValidPlatform } - } catch (err: any) { - CORE_LOGGER.error(`Unable to get Manifest for image ${image}: ${err.message}`) - if (err.errors?.length) CORE_LOGGER.error(JSON.stringify(err.errors)) - - return { - valid: false, - status: 404, - reason: err.errors?.length ? JSON.stringify(err.errors) : err.message - } - } - } - // eslint-disable-next-line require-await public override async startComputeJob( assets: ComputeAsset[], @@ -540,13 +436,6 @@ export class C2DEngineDocker extends C2DEngine { job.statusText = C2DStatusText.BuildImage } } else { - // already built, we need to validate it - const validation = await C2DEngineDocker.checkDockerImage(image, env.platform) - console.log('Validation: ', validation) - if (!validation.valid) - throw new Error( - `Cannot find image ${image} for ${env.platform.architecture}. Maybe it does not exist or it's build for other arhitectures.` - ) if (queueMaxWaitTime === 0) { job.status = C2DStatusNumber.PullImage job.statusText = C2DStatusText.PullImage @@ -561,9 +450,15 @@ export class C2DEngineDocker extends C2DEngine { } if (queueMaxWaitTime === 0) { if (algorithm.meta.container && algorithm.meta.container.dockerfile) { - this.buildImage(job, additionalDockerFiles) + const buildResult = await this.buildImage(job, additionalDockerFiles) + if (!buildResult.success) { + throw new Error(buildResult.error) + } } else { - this.pullImage(job) + const pullResult = await this.pullImage(job) + if (!pullResult.success) { + throw new Error(pullResult.error) + } } } // only now set the timer @@ -796,36 +691,47 @@ export class C2DEngineDocker extends C2DEngine { } } - private async setNewTimer() { + private setNewTimer() { + // Prevent multiple timers from being created + if (this.cronTimer) { + return + } // don't set the cron if we don't have compute environments - if ((await this.getComputeEnvironments()).length > 0) - this.cronTimer = setInterval(this.InternalLoop.bind(this), this.cronTime) + if (this.envs.length > 0) { + this.cronTimer = setTimeout(this.InternalLoop.bind(this), this.cronTime) + } } private async InternalLoop() { // this is the internal loop of docker engine // gets list of all running jobs and process them one by one - clearInterval(this.cronTimer) - this.cronTimer = null - // get all running jobs - const jobs = await this.db.getRunningJobs(this.getC2DConfig().hash) - if (jobs.length === 0) { - CORE_LOGGER.info('No C2D jobs found for engine ' + this.getC2DConfig().hash) - this.setNewTimer() - return - } else { - CORE_LOGGER.info(`Got ${jobs.length} jobs for engine ${this.getC2DConfig().hash}`) - CORE_LOGGER.debug(JSON.stringify(jobs)) + if (this.cronTimer) { + clearTimeout(this.cronTimer) + this.cronTimer = null } - const promises: any = [] - for (const job of jobs) { - promises.push(this.processJob(job)) + + try { + // get all running jobs + const jobs = await this.db.getRunningJobs(this.getC2DConfig().hash) + + if (jobs.length === 0) { + CORE_LOGGER.info('No C2D jobs found for engine ' + this.getC2DConfig().hash) + } else { + CORE_LOGGER.info(`Got ${jobs.length} jobs for engine ${this.getC2DConfig().hash}`) + CORE_LOGGER.debug(JSON.stringify(jobs)) + + const promises: any = [] + for (const job of jobs) { + promises.push(this.processJob(job)) + } + await Promise.all(promises) + } + } catch (err) { + CORE_LOGGER.error(`Error in C2D InternalLoop: ${err.message}`) + } finally { + this.setNewTimer() } - // wait for all promises, there is no return - await Promise.all(promises) - // set the cron again - this.setNewTimer() } private async createDockerContainer( @@ -940,13 +846,16 @@ export class C2DEngineDocker extends C2DEngine { if (algorithm.meta.container && algorithm.meta.container.dockerfile) { job.status = C2DStatusNumber.BuildImage job.statusText = C2DStatusText.BuildImage - this.buildImage(job, null) + await this.db.updateJob(job) + await this.buildImage(job, null) } else { job.status = C2DStatusNumber.PullImage job.statusText = C2DStatusText.PullImage - this.pullImage(job) + await this.db.updateJob(job) + await this.pullImage(job) } - await this.db.updateJob(job) + // Return here - the next iteration will pick up the job with updated status from DB + return } if (job.status === C2DStatusNumber.ConfiguringVolumes) { @@ -1519,7 +1428,9 @@ export class C2DEngineDocker extends C2DEngine { return true } - private async pullImage(originaljob: DBComputeJob) { + private async pullImage( + originaljob: DBComputeJob + ): Promise<{ success: boolean; error?: string }> { const job = JSON.parse(JSON.stringify(originaljob)) as DBComputeJob const imageLogFile = this.getC2DConfig().tempFolder + '/' + job.jobId + '/data/logs/image.log' @@ -1558,7 +1469,8 @@ export class C2DEngineDocker extends C2DEngine { }) job.status = C2DStatusNumber.ConfiguringVolumes job.statusText = C2DStatusText.ConfiguringVolumes - this.db.updateJob(job) + await this.db.updateJob(job) + return { success: true } } catch (err) { const logText = `Unable to pull docker image: ${job.containerImage}: ${err.message}` CORE_LOGGER.error(logText) @@ -1569,13 +1481,14 @@ export class C2DEngineDocker extends C2DEngine { job.dateFinished = String(Date.now() / 1000) await this.db.updateJob(job) await this.cleanupJob(job) + return { success: false, error: logText } } } private async buildImage( originaljob: DBComputeJob, additionalDockerFiles: { [key: string]: any } - ) { + ): Promise<{ success: boolean; error?: string }> { const job = JSON.parse(JSON.stringify(originaljob)) as DBComputeJob const imageLogFile = this.getC2DConfig().tempFolder + '/' + job.jobId + '/data/logs/image.log' @@ -1624,11 +1537,11 @@ export class C2DEngineDocker extends C2DEngine { }) job.status = C2DStatusNumber.ConfiguringVolumes job.statusText = C2DStatusText.ConfiguringVolumes - this.db.updateJob(job) + await this.db.updateJob(job) + return { success: true } } catch (err) { - CORE_LOGGER.error( - `Unable to build docker image: ${job.containerImage}: ${err.message}` - ) + const logText = `Unable to build docker image: ${job.containerImage}: ${err.message}` + CORE_LOGGER.error(logText) appendFileSync(imageLogFile, String(err.message)) job.status = C2DStatusNumber.BuildImageFailed job.statusText = C2DStatusText.BuildImageFailed @@ -1636,6 +1549,7 @@ export class C2DEngineDocker extends C2DEngine { job.dateFinished = String(Date.now() / 1000) await this.db.updateJob(job) await this.cleanupJob(job) + return { success: false, error: logText } } } @@ -2046,19 +1960,3 @@ export function getAlgorithmImage(algorithm: ComputeAlgorithm, jobId: string): s // console.log('Using image: ' + image) return image } - -export function checkManifestPlatform( - manifestPlatform: any, - envPlatform?: RunningPlatform -): boolean { - if (!manifestPlatform || !envPlatform) return true // skips if not present - if (envPlatform.architecture === 'amd64') envPlatform.architecture = 'x86_64' // x86_64 is compatible with amd64 - if (manifestPlatform.architecture === 'amd64') manifestPlatform.architecture = 'x86_64' // x86_64 is compatible with amd64 - - if ( - envPlatform.architecture !== manifestPlatform.architecture || - envPlatform.os !== manifestPlatform.os - ) - return false - return true -} diff --git a/src/components/core/compute/initialize.ts b/src/components/core/compute/initialize.ts index 11d92f595..b032f5b47 100644 --- a/src/components/core/compute/initialize.ts +++ b/src/components/core/compute/initialize.ts @@ -1,6 +1,5 @@ import { Readable } from 'stream' import { P2PCommandResponse } from '../../../@types/OceanNode.js' -import { C2DClusterType } from '../../../@types/C2D/C2D.js' import { CORE_LOGGER } from '../../../utils/logging/common.js' import { CommandHandler } from '../handler/handler.js' import { ComputeInitializeCommand } from '../../../@types/commands.js' @@ -29,11 +28,10 @@ import { sanitizeServiceFiles } from '../../../utils/util.js' import { FindDdoHandler } from '../handler/ddoHandler.js' import { isOrderingAllowedForAsset } from '../handler/downloadHandler.js' import { getNonceAsNumber } from '../utils/nonceHandler.js' -import { C2DEngineDocker, getAlgorithmImage } from '../../c2d/compute_engine_docker.js' import { Credentials, DDOManager } from '@oceanprotocol/ddo-js' import { areKnownCredentialTypes, checkCredentials } from '../../../utils/credentials.js' import { PolicyServer } from '../../policyServer/index.js' -import { generateUniqueID, getAlgoChecksums, validateAlgoForDataset } from './utils.js' +import { getAlgoChecksums, validateAlgoForDataset } from './utils.js' export class ComputeInitializeHandler extends CommandHandler { validate(command: ComputeInitializeCommand): ValidateParams { @@ -363,34 +361,6 @@ export class ComputeInitializeHandler extends CommandHandler { } } - // docker images? - const clusters = config.c2dClusters - let hasDockerImages = false - for (const cluster of clusters) { - if (cluster.type === C2DClusterType.DOCKER) { - hasDockerImages = true - break - } - } - if (hasDockerImages) { - const algoImage = getAlgorithmImage(task.algorithm, generateUniqueID(task)) - if (algoImage) { - const validation: ValidateParams = await C2DEngineDocker.checkDockerImage( - algoImage, - env.platform - ) - if (!validation.valid) { - return { - stream: null, - status: { - httpStatus: validation.status, - error: `Initialize Compute failed for image ${algoImage} :${validation.reason}` - } - } - } - } - } - const signer = blockchain.getSigner() // check if oasis evm or similar diff --git a/src/components/core/compute/startCompute.ts b/src/components/core/compute/startCompute.ts index f818ac545..e6916875b 100644 --- a/src/components/core/compute/startCompute.ts +++ b/src/components/core/compute/startCompute.ts @@ -574,10 +574,15 @@ export class PaidComputeStartHandler extends CommandHandler { } } catch (error) { CORE_LOGGER.error(error.message) + + const isImageError = + error.message?.includes('Unable to pull docker image') || + error.message?.includes('Unable to build docker image') + return { stream: null, status: { - httpStatus: 500, + httpStatus: isImageError ? 400 : 500, error: error.message } } @@ -855,10 +860,15 @@ export class FreeComputeStartHandler extends CommandHandler { } } catch (error) { CORE_LOGGER.error(error.message) + + const isImageError = + error.message?.includes('Unable to pull docker image') || + error.message?.includes('Unable to build docker image') + return { stream: null, status: { - httpStatus: 500, + httpStatus: isImageError ? 400 : 500, error: error.message } } diff --git a/src/test/integration/compute.test.ts b/src/test/integration/compute.test.ts index 2ac26381d..e4910223e 100644 --- a/src/test/integration/compute.test.ts +++ b/src/test/integration/compute.test.ts @@ -1123,7 +1123,7 @@ describe('Compute', () => { const command: FreeComputeStartCommand = freeComputeStartPayload const handler = new FreeComputeStartHandler(oceanNode) const response = await handler.handle(command) - assert(response.status.httpStatus === 500, 'Failed to get 500 response') + assert(response.status.httpStatus === 400, 'Failed to get 400 response for bad image') assert(response.stream === null, 'Should not get stream') }) // let's check our queued job diff --git a/src/test/unit/compute.test.ts b/src/test/unit/compute.test.ts index 36e4e37c3..7ae4db385 100644 --- a/src/test/unit/compute.test.ts +++ b/src/test/unit/compute.test.ts @@ -9,8 +9,7 @@ import { ComputeAsset, // ComputeEnvironment, ComputeJob, - DBComputeJob, - RunningPlatform + DBComputeJob } from '../../@types/C2D/C2D.js' // import { computeAsset } from '../data/assets' import { assert, expect } from 'chai' @@ -19,7 +18,6 @@ import { convertStringToArray, STRING_SEPARATOR } from '../../components/database/sqliteCompute.js' -import os from 'os' import { buildEnvOverrideConfig, OverrideEnvConfig, @@ -29,9 +27,8 @@ import { } from '../utils/utils.js' import { OceanNodeConfig } from '../../@types/OceanNode.js' import { ENVIRONMENT_VARIABLES } from '../../utils/constants.js' -import { completeDBComputeJob, dockerImageManifest } from '../data/assets.js' +import { completeDBComputeJob } from '../data/assets.js' import { omitDBComputeFieldsFromComputeJob } from '../../components/c2d/index.js' -import { checkManifestPlatform } from '../../components/c2d/compute_engine_docker.js' describe('Compute Jobs Database', () => { let envOverrides: OverrideEnvConfig[] @@ -225,29 +222,6 @@ describe('Compute Jobs Database', () => { ) }) - it('should check manifest platform against local platform env', () => { - const arch = os.machine() // ex: arm - const platform = os.platform() // ex: linux - const env: RunningPlatform = { - architecture: arch, - os: platform - } - const result: boolean = checkManifestPlatform(dockerImageManifest.platform, env) - // if all defined and a match its OK - if ( - dockerImageManifest.platform.os === env.os && - dockerImageManifest.platform.architecture === env.architecture - ) { - expect(result).to.be.equal(true) - } else { - // oterwise its NOT - expect(result).to.be.equal(false) - } - - // all good anyway, nothing on the manifest - expect(checkManifestPlatform(null, env)).to.be.equal(true) - }) - it('testing checkAndFillMissingResources', async function () { // TO DO })