Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 58 additions & 160 deletions src/components/c2d/compute_engine_docker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import type {
DBComputeJob,
DBComputeJobPayment,
ComputeResult,
RunningPlatform,
ComputeEnvFeesStructure,
ComputeResourceRequest,
ComputeEnvFees
Expand Down Expand Up @@ -47,7 +46,6 @@ import { AssetUtils } from '../../utils/asset.js'
import { FindDdoHandler } from '../core/handler/ddoHandler.js'
import { OceanNode } from '../../OceanNode.js'
import { decryptFilesObject, omitDBComputeFieldsFromComputeJob } from './index.js'
import { ValidateParams } from '../httpRoutes/validateCommands.js'
import { Service } from '@oceanprotocol/ddo-js'
import { getOceanTokenAddressForChain } from '../../utils/address.js'

Expand All @@ -58,7 +56,6 @@ export class C2DEngineDocker extends C2DEngine {
private cronTimer: any
private cronTime: number = 2000
private jobImageSizes: Map<string, number> = new Map()
private static DEFAULT_DOCKER_REGISTRY = 'https://registry-1.docker.io'

public constructor(clusterConfig: C2DClusterInfo, db: C2DDatabase, escrow: Escrow) {
super(clusterConfig, db, escrow)
Expand Down Expand Up @@ -332,107 +329,6 @@ export class C2DEngineDocker extends C2DEngine {
return filteredEnvs
}

private static parseImage(image: string) {
let registry = C2DEngineDocker.DEFAULT_DOCKER_REGISTRY
let name = image
let ref = 'latest'

const atIdx = name.indexOf('@')
const colonIdx = name.lastIndexOf(':')

if (atIdx !== -1) {
ref = name.slice(atIdx + 1)
name = name.slice(0, atIdx)
} else if (colonIdx !== -1 && !name.slice(colonIdx).includes('/')) {
ref = name.slice(colonIdx + 1)
name = name.slice(0, colonIdx)
}

const firstSlash = name.indexOf('/')
if (firstSlash !== -1) {
const potential = name.slice(0, firstSlash)
if (potential.includes('.') || potential.includes(':')) {
registry = potential.includes('localhost')
? `http://${potential}`
: `https://${potential}`
name = name.slice(firstSlash + 1)
}
}

if (registry === C2DEngineDocker.DEFAULT_DOCKER_REGISTRY && !name.includes('/')) {
name = `library/${name}`
}

return { registry, name, ref }
}

public static async getDockerManifest(image: string): Promise<any> {
const { registry, name, ref } = C2DEngineDocker.parseImage(image)
const url = `${registry}/v2/${name}/manifests/${ref}`
let headers: Record<string, string> = {
Accept:
'application/vnd.docker.distribution.manifest.v2+json, application/vnd.oci.image.manifest.v1+json, application/vnd.docker.distribution.manifest.list.v2+json, application/vnd.oci.image.index.v1+json'
}
let response = await fetch(url, { headers })

if (response.status === 401) {
const match = (response.headers.get('www-authenticate') || '').match(
/Bearer realm="([^"]+)",service="([^"]+)"/
)
if (match) {
const tokenUrl = new URL(match[1])
tokenUrl.searchParams.set('service', match[2])
tokenUrl.searchParams.set('scope', `repository:${name}:pull`)
const { token } = (await fetch(tokenUrl.toString()).then((r) => r.json())) as {
token: string
}
headers = { ...headers, Authorization: `Bearer ${token}` }
response = await fetch(url, { headers })
}
}

if (!response.ok) {
const body = await response.text()
throw new Error(
`Failed to get manifest: ${response.status} ${response.statusText} - ${body}`
)
}
return await response.json()
}

/**
* Checks the docker image by looking at the manifest
* @param image name or tag
* @returns boolean
*/
public static async checkDockerImage(
image: string,
platform?: RunningPlatform
): Promise<ValidateParams> {
try {
const manifest = await C2DEngineDocker.getDockerManifest(image)

const platforms = Array.isArray(manifest.manifests)
? manifest.manifests.map((entry: any) => entry.platform)
: [manifest.platform]

const isValidPlatform = platforms.some((entry: any) =>
checkManifestPlatform(entry, platform)
)

return { valid: isValidPlatform }
} catch (err: any) {
CORE_LOGGER.error(`Unable to get Manifest for image ${image}: ${err.message}`)
if (err.errors?.length) CORE_LOGGER.error(JSON.stringify(err.errors))

return {
valid: false,
status: 404,
reason: err.errors?.length ? JSON.stringify(err.errors) : err.message
}
}
}

// eslint-disable-next-line require-await
public override async startComputeJob(
assets: ComputeAsset[],
Expand Down Expand Up @@ -540,13 +436,6 @@ export class C2DEngineDocker extends C2DEngine {
job.statusText = C2DStatusText.BuildImage
}
} else {
// already built, we need to validate it
const validation = await C2DEngineDocker.checkDockerImage(image, env.platform)
console.log('Validation: ', validation)
if (!validation.valid)
throw new Error(
`Cannot find image ${image} for ${env.platform.architecture}. Maybe it does not exist or it's build for other arhitectures.`
)
if (queueMaxWaitTime === 0) {
job.status = C2DStatusNumber.PullImage
job.statusText = C2DStatusText.PullImage
Expand All @@ -561,9 +450,15 @@ export class C2DEngineDocker extends C2DEngine {
}
if (queueMaxWaitTime === 0) {
if (algorithm.meta.container && algorithm.meta.container.dockerfile) {
this.buildImage(job, additionalDockerFiles)
const buildResult = await this.buildImage(job, additionalDockerFiles)
Copy link
Member

@alexcos20 alexcos20 Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm.. if building the image takes 30 mins, this means that the entire compute queue goes unprocessed. As result:

  • compute jobs running more than allowed
  • new compute jobs are not started

if (!buildResult.success) {
throw new Error(buildResult.error)
}
} else {
this.pullImage(job)
const pullResult = await this.pullImage(job)
if (!pullResult.success) {
throw new Error(pullResult.error)
}
}
}
// only now set the timer
Expand Down Expand Up @@ -796,36 +691,47 @@ export class C2DEngineDocker extends C2DEngine {
}
}

private async setNewTimer() {
private setNewTimer() {
// Prevent multiple timers from being created
if (this.cronTimer) {
return
}
// don't set the cron if we don't have compute environments
if ((await this.getComputeEnvironments()).length > 0)
this.cronTimer = setInterval(this.InternalLoop.bind(this), this.cronTime)
if (this.envs.length > 0) {
this.cronTimer = setTimeout(this.InternalLoop.bind(this), this.cronTime)
}
}

private async InternalLoop() {
// this is the internal loop of docker engine
// gets list of all running jobs and process them one by one
clearInterval(this.cronTimer)
this.cronTimer = null
// get all running jobs
const jobs = await this.db.getRunningJobs(this.getC2DConfig().hash)

if (jobs.length === 0) {
CORE_LOGGER.info('No C2D jobs found for engine ' + this.getC2DConfig().hash)
this.setNewTimer()
return
} else {
CORE_LOGGER.info(`Got ${jobs.length} jobs for engine ${this.getC2DConfig().hash}`)
CORE_LOGGER.debug(JSON.stringify(jobs))
if (this.cronTimer) {
clearTimeout(this.cronTimer)
this.cronTimer = null
}
const promises: any = []
for (const job of jobs) {
promises.push(this.processJob(job))

try {
// get all running jobs
const jobs = await this.db.getRunningJobs(this.getC2DConfig().hash)

if (jobs.length === 0) {
CORE_LOGGER.info('No C2D jobs found for engine ' + this.getC2DConfig().hash)
} else {
CORE_LOGGER.info(`Got ${jobs.length} jobs for engine ${this.getC2DConfig().hash}`)
CORE_LOGGER.debug(JSON.stringify(jobs))

const promises: any = []
for (const job of jobs) {
promises.push(this.processJob(job))
}
await Promise.all(promises)
}
} catch (err) {
CORE_LOGGER.error(`Error in C2D InternalLoop: ${err.message}`)
} finally {
this.setNewTimer()
}
// wait for all promises, there is no return
await Promise.all(promises)
// set the cron again
this.setNewTimer()
}

private async createDockerContainer(
Expand Down Expand Up @@ -940,13 +846,16 @@ export class C2DEngineDocker extends C2DEngine {
if (algorithm.meta.container && algorithm.meta.container.dockerfile) {
job.status = C2DStatusNumber.BuildImage
job.statusText = C2DStatusText.BuildImage
this.buildImage(job, null)
await this.db.updateJob(job)
await this.buildImage(job, null)
} else {
job.status = C2DStatusNumber.PullImage
job.statusText = C2DStatusText.PullImage
this.pullImage(job)
await this.db.updateJob(job)
await this.pullImage(job)
}
await this.db.updateJob(job)
// Return here - the next iteration will pick up the job with updated status from DB
return
}

if (job.status === C2DStatusNumber.ConfiguringVolumes) {
Expand Down Expand Up @@ -1519,7 +1428,9 @@ export class C2DEngineDocker extends C2DEngine {
return true
}

private async pullImage(originaljob: DBComputeJob) {
private async pullImage(
originaljob: DBComputeJob
): Promise<{ success: boolean; error?: string }> {
const job = JSON.parse(JSON.stringify(originaljob)) as DBComputeJob
const imageLogFile =
this.getC2DConfig().tempFolder + '/' + job.jobId + '/data/logs/image.log'
Expand Down Expand Up @@ -1558,7 +1469,8 @@ export class C2DEngineDocker extends C2DEngine {
})
job.status = C2DStatusNumber.ConfiguringVolumes
job.statusText = C2DStatusText.ConfiguringVolumes
this.db.updateJob(job)
await this.db.updateJob(job)
return { success: true }
} catch (err) {
const logText = `Unable to pull docker image: ${job.containerImage}: ${err.message}`
CORE_LOGGER.error(logText)
Expand All @@ -1569,13 +1481,14 @@ export class C2DEngineDocker extends C2DEngine {
job.dateFinished = String(Date.now() / 1000)
await this.db.updateJob(job)
await this.cleanupJob(job)
return { success: false, error: logText }
}
}

private async buildImage(
originaljob: DBComputeJob,
additionalDockerFiles: { [key: string]: any }
) {
): Promise<{ success: boolean; error?: string }> {
const job = JSON.parse(JSON.stringify(originaljob)) as DBComputeJob
const imageLogFile =
this.getC2DConfig().tempFolder + '/' + job.jobId + '/data/logs/image.log'
Expand Down Expand Up @@ -1624,18 +1537,19 @@ export class C2DEngineDocker extends C2DEngine {
})
job.status = C2DStatusNumber.ConfiguringVolumes
job.statusText = C2DStatusText.ConfiguringVolumes
this.db.updateJob(job)
await this.db.updateJob(job)
return { success: true }
} catch (err) {
CORE_LOGGER.error(
`Unable to build docker image: ${job.containerImage}: ${err.message}`
)
const logText = `Unable to build docker image: ${job.containerImage}: ${err.message}`
CORE_LOGGER.error(logText)
appendFileSync(imageLogFile, String(err.message))
job.status = C2DStatusNumber.BuildImageFailed
job.statusText = C2DStatusText.BuildImageFailed
job.isRunning = false
job.dateFinished = String(Date.now() / 1000)
await this.db.updateJob(job)
await this.cleanupJob(job)
return { success: false, error: logText }
}
}

Expand Down Expand Up @@ -2046,19 +1960,3 @@ export function getAlgorithmImage(algorithm: ComputeAlgorithm, jobId: string): s
// console.log('Using image: ' + image)
return image
}

export function checkManifestPlatform(
manifestPlatform: any,
envPlatform?: RunningPlatform
): boolean {
if (!manifestPlatform || !envPlatform) return true // skips if not present
if (envPlatform.architecture === 'amd64') envPlatform.architecture = 'x86_64' // x86_64 is compatible with amd64
if (manifestPlatform.architecture === 'amd64') manifestPlatform.architecture = 'x86_64' // x86_64 is compatible with amd64

if (
envPlatform.architecture !== manifestPlatform.architecture ||
envPlatform.os !== manifestPlatform.os
)
return false
return true
}
32 changes: 1 addition & 31 deletions src/components/core/compute/initialize.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { Readable } from 'stream'
import { P2PCommandResponse } from '../../../@types/OceanNode.js'
import { C2DClusterType } from '../../../@types/C2D/C2D.js'
import { CORE_LOGGER } from '../../../utils/logging/common.js'
import { CommandHandler } from '../handler/handler.js'
import { ComputeInitializeCommand } from '../../../@types/commands.js'
Expand Down Expand Up @@ -29,11 +28,10 @@ import { sanitizeServiceFiles } from '../../../utils/util.js'
import { FindDdoHandler } from '../handler/ddoHandler.js'
import { isOrderingAllowedForAsset } from '../handler/downloadHandler.js'
import { getNonceAsNumber } from '../utils/nonceHandler.js'
import { C2DEngineDocker, getAlgorithmImage } from '../../c2d/compute_engine_docker.js'
import { Credentials, DDOManager } from '@oceanprotocol/ddo-js'
import { areKnownCredentialTypes, checkCredentials } from '../../../utils/credentials.js'
import { PolicyServer } from '../../policyServer/index.js'
import { generateUniqueID, getAlgoChecksums, validateAlgoForDataset } from './utils.js'
import { getAlgoChecksums, validateAlgoForDataset } from './utils.js'

export class ComputeInitializeHandler extends CommandHandler {
validate(command: ComputeInitializeCommand): ValidateParams {
Expand Down Expand Up @@ -363,34 +361,6 @@ export class ComputeInitializeHandler extends CommandHandler {
}
}

// docker images?
const clusters = config.c2dClusters
let hasDockerImages = false
for (const cluster of clusters) {
if (cluster.type === C2DClusterType.DOCKER) {
hasDockerImages = true
break
}
}
if (hasDockerImages) {
const algoImage = getAlgorithmImage(task.algorithm, generateUniqueID(task))
if (algoImage) {
const validation: ValidateParams = await C2DEngineDocker.checkDockerImage(
algoImage,
env.platform
)
if (!validation.valid) {
return {
stream: null,
status: {
httpStatus: validation.status,
error: `Initialize Compute failed for image ${algoImage} :${validation.reason}`
}
}
}
}
}

const signer = blockchain.getSigner()

// check if oasis evm or similar
Expand Down
Loading
Loading