diff --git a/docs/API.md b/docs/API.md index 954d99eed..455b4aeaa 100644 --- a/docs/API.md +++ b/docs/API.md @@ -1338,7 +1338,7 @@ starts a free compute job and returns jobId if succesfull "algorithm": { "meta": { "container": { "image": "ubuntu", "entrypoint": "/bin/bash'" } } }, - "consumerAddress": "0xC7EC1970B09224B317c52d92f37F5e1E4fF6B687", + "consumerAddress": "0x00", "signature": "123", "nonce": 1, "environment": "0x7d187e4c751367be694497ead35e2937ece3c7f3b325dcb4f7571e5972d092bd-0xbeaf12703d708f39ef98c3d8939ce458553254176dbb69fe83d535883c4cee38", @@ -1352,7 +1352,7 @@ starts a free compute job and returns jobId if succesfull ```json [ { - "owner": "0xC7EC1970B09224B317c52d92f37F5e1E4fF6B687", + "owner": "0x00", "jobId": "0x7d187e4c751367be694497ead35e2937ece3c7f3b325dcb4f7571e5972d092bd-a4ad237d-dfd8-404c-a5d6-b8fc3a1f66d3", "dateCreated": "1742291065.119", "dateFinished": null, @@ -1396,7 +1396,7 @@ Required at least one of the following parameters: ```json [ { - "owner": "0xC7EC1970B09224B317c52d92f37F5e1E4fF6B687", + "owner": "0x00", "did": null, "jobId": "a4ad237d-dfd8-404c-a5d6-b8fc3a1f66d3", "dateCreated": "1742291065.119", diff --git a/docs/GPU.md b/docs/GPU.md index a83c742ef..0f961630e 100644 --- a/docs/GPU.md +++ b/docs/GPU.md @@ -122,7 +122,7 @@ root@gpu-1:/repos/ocean/ocean-node# curl http://localhost:8000/api/services/comp { "id": "0xd6b10b27aab01a72070a5164c07d0517755838b9cb9857e2d5649287ec3aaaa2-0x66073c81f833deaa2f8e2a508f69cf78f8a99b17ba1a64f369af921750f93914", "runningJobs": 0, - "consumerAddress": "0x4fb80776C8eb4cAbe7730dcBCdb1fa6ecD3c460E", + "consumerAddress": "0x00", "platform": { "architecture": "x86_64", "os": "Ubuntu 22.04.3 LTS" }, "fees": { "1": [ @@ -194,7 +194,7 @@ Start a free job using: "rawcode": "import tensorflow as tf\nsess = tf.compat.v1.Session(config=tf.compat.v1.ConfigProto(log_device_placement=True))\nprint(\"Num GPUs Available: \", len(tf.config.list_physical_devices('GPU')))\ngpus = tf.config.list_physical_devices('GPU')\nfor gpu in gpus:\n\tprint('Name:', gpu.name, ' Type:', gpu.device_type)" } }, - "consumerAddress": "0xC7EC1970B09224B317c52d92f37F5e1E4fF6B687", + "consumerAddress": "0x00", "signature": "123", "nonce": 1, "environment": "0xd6b10b27aab01a72070a5164c07d0517755838b9cb9857e2d5649287ec3aaaa2-0x66073c81f833deaa2f8e2a508f69cf78f8a99b17ba1a64f369af921750f93914", @@ -325,7 +325,7 @@ root@gpu-1:/repos/ocean/ocean-node# curl http://localhost:8000/api/services/comp { "id": "0xbb5773e734e1b188165dac88d9a3dc8ac28bc9f5624b45fa8bbd8fca043de7c1-0x2c2761f938cf186eeb81f71dee06ad7edb299493e39c316c390d0c0691e6585c", "runningJobs": 0, - "consumerAddress": "0x4fb80776C8eb4cAbe7730dcBCdb1fa6ecD3c460E", + "consumerAddress": "0x00", "platform": { "architecture": "x86_64", "os": "Ubuntu 24.04.2 LTS" @@ -450,7 +450,7 @@ Start a free job with "rawcode": "import tensorflow as tf\nsess = tf.compat.v1.Session(config=tf.compat.v1.ConfigProto(log_device_placement=True))\nprint(\"Num GPUs Available: \", len(tf.config.list_physical_devices('GPU')))\ngpus = tf.config.list_physical_devices('GPU')\nfor gpu in gpus:\n\tprint('Name:', gpu.name, ' Type:', gpu.device_type)" } }, - "consumerAddress": "0xC7EC1970B09224B317c52d92f37F5e1E4fF6B687", + "consumerAddress": "0x00", "signature": "123", "nonce": 1, "environment": "0xbb5773e734e1b188165dac88d9a3dc8ac28bc9f5624b45fa8bbd8fca043de7c1-0x2c2761f938cf186eeb81f71dee06ad7edb299493e39c316c390d0c0691e6585c", diff --git a/package-lock.json b/package-lock.json index 1ea69aab0..f9c324fef 100644 --- a/package-lock.json +++ b/package-lock.json @@ -80,6 +80,7 @@ "@types/node": "^20.14.2", "@types/node-cron": "^3.0.11", "@types/sinon": "^17.0.4", + "@types/tar-stream": "^3.1.4", "@typescript-eslint/eslint-plugin": "^6.8.0", "@typescript-eslint/parser": "^6.8.0", "auto-changelog": "^2.4.0", @@ -5729,6 +5730,16 @@ "undici-types": "~5.26.4" } }, + "node_modules/@types/tar-stream": { + "version": "3.1.4", + "resolved": "https://registry.npmjs.org/@types/tar-stream/-/tar-stream-3.1.4.tgz", + "integrity": "sha512-921gW0+g29mCJX0fRvqeHzBlE/XclDaAG0Ousy1LCghsOhvaKacDeRGEVzQP9IPfKn8Vysy7FEXAIxycpc/CMg==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@types/triple-beam": { "version": "1.3.4", "license": "MIT" @@ -23319,6 +23330,15 @@ } } }, + "@types/tar-stream": { + "version": "3.1.4", + "resolved": "https://registry.npmjs.org/@types/tar-stream/-/tar-stream-3.1.4.tgz", + "integrity": "sha512-921gW0+g29mCJX0fRvqeHzBlE/XclDaAG0Ousy1LCghsOhvaKacDeRGEVzQP9IPfKn8Vysy7FEXAIxycpc/CMg==", + "dev": true, + "requires": { + "@types/node": "*" + } + }, "@types/triple-beam": { "version": "1.3.4" }, diff --git a/package.json b/package.json index 82e63deb8..354ce2ce7 100644 --- a/package.json +++ b/package.json @@ -119,6 +119,7 @@ "@types/node": "^20.14.2", "@types/node-cron": "^3.0.11", "@types/sinon": "^17.0.4", + "@types/tar-stream": "^3.1.4", "@typescript-eslint/eslint-plugin": "^6.8.0", "@typescript-eslint/parser": "^6.8.0", "auto-changelog": "^2.4.0", diff --git a/src/@types/C2D/C2D.ts b/src/@types/C2D/C2D.ts index 2171f7d06..32cff43ee 100644 --- a/src/@types/C2D/C2D.ts +++ b/src/@types/C2D/C2D.ts @@ -1,4 +1,4 @@ -import { MetadataAlgorithm } from '@oceanprotocol/ddo-js' +import { MetadataAlgorithm, ConsumerParameter } from '@oceanprotocol/ddo-js' import type { BaseFileObject } from '../fileObject.js' export enum C2DClusterType { // eslint-disable-next-line no-unused-vars @@ -129,6 +129,7 @@ export interface C2DDockerConfig { } export type ComputeResultType = + | 'imageLog' | 'algorithmLog' | 'output' | 'configurationLog' @@ -180,12 +181,23 @@ export interface ComputeAsset { transferTxId?: string userdata?: { [key: string]: any } } - +export interface ExtendedMetadataAlgorithm extends MetadataAlgorithm { + container: { + // retain existing properties + entrypoint: string + image: string + tag: string + checksum: string + dockerfile?: string // optional + additionalDockerFiles?: { [key: string]: any } + consumerParameters?: ConsumerParameter[] + } +} export interface ComputeAlgorithm { documentId?: string serviceId?: string fileObject?: BaseFileObject - meta?: MetadataAlgorithm + meta?: ExtendedMetadataAlgorithm transferTxId?: string algocustomdata?: { [key: string]: any } userdata?: { [key: string]: any } @@ -235,6 +247,10 @@ export enum C2DStatusNumber { // eslint-disable-next-line no-unused-vars PullImageFailed = 11, // eslint-disable-next-line no-unused-vars + BuildImage = 12, + // eslint-disable-next-line no-unused-vars + BuildImageFailed = 13, + // eslint-disable-next-line no-unused-vars ConfiguringVolumes = 20, // eslint-disable-next-line no-unused-vars VolumeCreationFailed = 21, @@ -271,6 +287,10 @@ export enum C2DStatusText { // eslint-disable-next-line no-unused-vars PullImageFailed = 'Pulling algorithm image failed', // eslint-disable-next-line no-unused-vars + BuildImage = 'Building algorithm image', + // eslint-disable-next-line no-unused-vars + BuildImageFailed = 'Building algorithm image failed', + // eslint-disable-next-line no-unused-vars ConfiguringVolumes = 'Configuring volumes', // eslint-disable-next-line no-unused-vars VolumeCreationFailed = 'Volume creation failed', diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index 4204bae36..6e82d6205 100644 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -30,12 +30,14 @@ import { Storage } from '../storage/index.js' import Dockerode from 'dockerode' import type { ContainerCreateOptions, HostConfig, VolumeCreateOptions } from 'dockerode' import * as tar from 'tar' +import * as tarStream from 'tar-stream' import { createWriteStream, existsSync, mkdirSync, rmSync, writeFileSync, + appendFileSync, statSync, createReadStream } from 'fs' @@ -108,10 +110,11 @@ export class C2DEngineDocker extends C2DEngine { } // console.log(sysinfo) let fees: ComputeEnvFeesStructure = null - const supportedChains: number[] = [] - for (const chain of Object.keys(config.supportedNetworks)) { - supportedChains.push(parseInt(chain)) + if (config.supportedNetworks) { + for (const chain of Object.keys(config.supportedNetworks)) { + supportedChains.push(parseInt(chain)) + } } for (const feeChain of Object.keys(envConfig.fees)) { // for (const feeConfig of envConfig.fees) { @@ -370,18 +373,6 @@ export class C2DEngineDocker extends C2DEngine { // TO DO - iterate over resources and get default runtime const isFree: boolean = !(payment && payment.lockTx) - // C2D - Check image, check arhitecture, etc - const image = getAlgorithmImage(algorithm) - // ex: node@sha256:1155995dda741e93afe4b1c6ced2d01734a6ec69865cc0997daf1f4db7259a36 - if (!image) { - // send a 500 with the error message - throw new Error( - `Unable to extract docker image ${image} from algoritm: ${JSON.stringify( - algorithm - )}` - ) - } - if (metadata && Object.keys(metadata).length > 0) { const metadataSize = JSON.stringify(metadata).length if (metadataSize > 1024) { @@ -398,9 +389,29 @@ export class C2DEngineDocker extends C2DEngine { if (!env) { throw new Error(`Invalid environment ${environment}`) } - const validation = await C2DEngineDocker.checkDockerImage(image, env.platform) - if (!validation.valid) - throw new Error(`Unable to validate docker image ${image}: ${validation.reason}`) + // C2D - Check image, check arhitecture, etc + const image = getAlgorithmImage(algorithm, jobId) + // ex: node@sha256:1155995dda741e93afe4b1c6ced2d01734a6ec69865cc0997daf1f4db7259a36 + if (!image) { + // send a 500 with the error message + throw new Error( + `Unable to extract docker image ${image} from algoritm: ${JSON.stringify( + algorithm + )}` + ) + } + let additionalDockerFiles: { [key: string]: any } = null + if ( + algorithm.meta && + algorithm.meta.container && + algorithm.meta.container.additionalDockerFiles + ) { + additionalDockerFiles = JSON.parse( + JSON.stringify(algorithm.meta.container.additionalDockerFiles) + ) + // make sure that we don't keep them in the db structure + algorithm.meta.container.additionalDockerFiles = null + } const job: DBComputeJob = { clusterHash: this.getC2DConfig().hash, containerImage: image, @@ -430,13 +441,31 @@ export class C2DEngineDocker extends C2DEngine { metadata, additionalViewers } + + if (algorithm.meta.container && algorithm.meta.container.dockerfile) { + // we need to build the image + job.status = C2DStatusNumber.BuildImage + job.statusText = C2DStatusText.BuildImage + } else { + // already built, we need to validate it + const validation = await C2DEngineDocker.checkDockerImage(image, env.platform) + if (!validation.valid) + throw new Error(`Unable to validate docker image ${image}: ${validation.reason}`) + job.status = C2DStatusNumber.PullImage + job.statusText = C2DStatusText.PullImage + } + await this.makeJobFolders(job) // make sure we actually were able to insert on DB const addedId = await this.db.newJob(job) if (!addedId) { return [] } - + if (algorithm.meta.container && algorithm.meta.container.dockerfile) { + this.buildImage(job, additionalDockerFiles) + } else { + this.pullImage(job) + } // only now set the timer if (!this.cronTimer) { this.setNewTimer() @@ -473,6 +502,20 @@ export class C2DEngineDocker extends C2DEngine { protected async getResults(jobId: string): Promise { const res: ComputeResult[] = [] let index = 0 + try { + const logStat = statSync( + this.getC2DConfig().tempFolder + '/' + jobId + '/data/logs/image.log' + ) + if (logStat) { + res.push({ + filename: 'image.log', + filesize: logStat.size, + type: 'imageLog', + index + }) + index = index + 1 + } + } catch (e) {} try { const logStat = statSync( this.getC2DConfig().tempFolder + '/' + jobId + '/data/logs/algorithm.log' @@ -558,6 +601,16 @@ export class C2DEngineDocker extends C2DEngine { } } } + if (i.type === 'imageLog') { + return { + stream: createReadStream( + this.getC2DConfig().tempFolder + '/' + jobId + '/data/logs/image.log' + ), + headers: { + 'Content-Type': 'text/plain' + } + } + } if (i.type === 'output') { return { stream: createReadStream( @@ -686,7 +739,7 @@ export class C2DEngineDocker extends C2DEngine { // - monitor running containers and stop them if over limits // - monitor disc space and clean up /* steps: - - instruct docker to pull image + - wait until image is ready - create volume - after image is ready, create the container - download assets & algo into temp folder @@ -699,65 +752,6 @@ export class C2DEngineDocker extends C2DEngine { - delete the container - delete the volume */ - if (job.status === C2DStatusNumber.JobStarted) { - // pull docker image - try { - const pullStream = await this.docker.pull(job.containerImage) - await new Promise((resolve, reject) => { - let wroteStatusBanner = false - this.docker.modem.followProgress( - pullStream, - (err: any, res: any) => { - // onFinished - if (err) return reject(err) - CORE_LOGGER.info('############# Pull docker image complete ##############') - resolve(res) - }, - (progress: any) => { - // onProgress - if (!wroteStatusBanner) { - wroteStatusBanner = true - CORE_LOGGER.info('############# Pull docker image status: ##############') - } - // only write the status banner once, its cleaner - CORE_LOGGER.info(progress.status) - } - ) - }) - } catch (err) { - CORE_LOGGER.error( - `Unable to pull docker image: ${job.containerImage}: ${err.message}` - ) - job.status = C2DStatusNumber.PullImageFailed - job.statusText = C2DStatusText.PullImageFailed - job.isRunning = false - job.dateFinished = String(Date.now() / 1000) - await this.db.updateJob(job) - await this.cleanupJob(job) - return - } - - job.status = C2DStatusNumber.PullImage - job.statusText = C2DStatusText.PullImage - await this.db.updateJob(job) - return // now we wait until image is ready - } - if (job.status === C2DStatusNumber.PullImage) { - try { - const imageInfo = await this.docker.getImage(job.containerImage) - console.log('imageInfo', imageInfo) - const details = await imageInfo.inspect() - console.log('details:', details) - job.status = C2DStatusNumber.ConfiguringVolumes - job.statusText = C2DStatusText.ConfiguringVolumes - await this.db.updateJob(job) - // now we can move forward - } catch (e) { - // not ready yet - CORE_LOGGER.error(`Unable to inspect docker image: ${e.message}`) - } - return - } if (job.status === C2DStatusNumber.ConfiguringVolumes) { // create the volume & create container // TO DO C2D: Choose driver & size @@ -1112,7 +1106,7 @@ export class C2DEngineDocker extends C2DEngine { await container.remove() } } catch (e) { - console.error('Container not found! ' + e.message) + // console.error('Container not found! ' + e.message) } try { const volume = await this.docker.getVolume(job.jobId + '-volume') @@ -1124,7 +1118,17 @@ export class C2DEngineDocker extends C2DEngine { } } } catch (e) { - console.error('Container volume not found! ' + e.message) + // console.error('Container volume not found! ' + e.message) + } + if (job.algorithm.meta.container && job.algorithm.meta.container.dockerfile) { + const image = getAlgorithmImage(job.algorithm, job.jobId) + if (image) { + try { + await this.docker.getImage(image).remove({ force: true }) + } catch (e) { + console.log('Could not delete image: ' + image + ' : ' + e.message) + } + } } try { // remove folders @@ -1160,6 +1164,124 @@ export class C2DEngineDocker extends C2DEngine { }) } + private async pullImage(originaljob: DBComputeJob) { + const job = JSON.parse(JSON.stringify(originaljob)) as DBComputeJob + const imageLogFile = + this.getC2DConfig().tempFolder + '/' + job.jobId + '/data/logs/image.log' + try { + const pullStream = await this.docker.pull(job.containerImage) + await new Promise((resolve, reject) => { + let wroteStatusBanner = false + this.docker.modem.followProgress( + pullStream, + (err: any, res: any) => { + // onFinished + if (err) { + appendFileSync(imageLogFile, String(err.message)) + return reject(err) + } + CORE_LOGGER.debug('############# Pull docker image complete ##############') + resolve(res) + }, + (progress: any) => { + // onProgress + if (!wroteStatusBanner) { + wroteStatusBanner = true + CORE_LOGGER.debug('############# Pull docker image status: ##############') + } + // only write the status banner once, its cleaner + let logText = '' + if (progress.id) logText += progress.id + ' : ' + progress.status + else logText = progress.status + CORE_LOGGER.debug("Pulling image for jobId '" + job.jobId + "': " + logText) + console.log(progress) + appendFileSync(imageLogFile, logText + '\n') + } + ) + }) + job.status = C2DStatusNumber.ConfiguringVolumes + job.statusText = C2DStatusText.ConfiguringVolumes + this.db.updateJob(job) + } catch (err) { + CORE_LOGGER.error( + `Unable to pull docker image: ${job.containerImage}: ${err.message}` + ) + job.status = C2DStatusNumber.PullImageFailed + job.statusText = C2DStatusText.PullImageFailed + job.isRunning = false + job.dateFinished = String(Date.now() / 1000) + await this.db.updateJob(job) + await this.cleanupJob(job) + } + } + + private async buildImage( + originaljob: DBComputeJob, + additionalDockerFiles: { [key: string]: any } + ) { + const job = JSON.parse(JSON.stringify(originaljob)) as DBComputeJob + const imageLogFile = + this.getC2DConfig().tempFolder + '/' + job.jobId + '/data/logs/image.log' + try { + const pack = tarStream.pack() + + // Append the Dockerfile to the tar archive + pack.entry({ name: 'Dockerfile' }, job.algorithm.meta.container.dockerfile) + // Append any additional files to the tar archive + if (additionalDockerFiles) { + for (const filePath of Object.keys(additionalDockerFiles)) { + pack.entry({ name: filePath }, additionalDockerFiles[filePath]) + } + } + pack.finalize() + + // Build the image using the tar stream as context + const buildStream = await this.docker.buildImage(pack, { + t: job.containerImage + }) + + // Optional: listen to build output + buildStream.on('data', (data) => { + try { + const text = JSON.parse(data.toString('utf8')) + CORE_LOGGER.debug( + "Building image for jobId '" + job.jobId + "': " + text.stream.trim() + ) + appendFileSync(imageLogFile, String(text.stream)) + } catch (e) { + // console.log('non json build data: ', data.toString('utf8')) + } + }) + + await new Promise((resolve, reject) => { + buildStream.on('end', () => { + CORE_LOGGER.debug(`Image '${job.containerImage}' built successfully.`) + + resolve() + }) + buildStream.on('error', (err) => { + CORE_LOGGER.debug(`Error building image '${job.containerImage}':` + err.message) + appendFileSync(imageLogFile, String(err.message)) + reject(err) + }) + }) + job.status = C2DStatusNumber.ConfiguringVolumes + job.statusText = C2DStatusText.ConfiguringVolumes + this.db.updateJob(job) + } catch (err) { + CORE_LOGGER.error( + `Unable to build docker image: ${job.containerImage}: ${err.message}` + ) + appendFileSync(imageLogFile, String(err.message)) + job.status = C2DStatusNumber.BuildImageFailed + job.statusText = C2DStatusText.BuildImageFailed + job.isRunning = false + job.dateFinished = String(Date.now() / 1000) + await this.db.updateJob(job) + await this.cleanupJob(job) + } + } + private async uploadData( job: DBComputeJob ): Promise<{ status: C2DStatusNumber; statusText: C2DStatusText }> { @@ -1445,10 +1567,13 @@ export class C2DEngineDocker extends C2DEngine { // this uses the docker engine, but exposes only one env, the free one -export function getAlgorithmImage(algorithm: ComputeAlgorithm): string { +export function getAlgorithmImage(algorithm: ComputeAlgorithm, jobId: string): string { if (!algorithm.meta || !algorithm.meta.container) { return null } + if (algorithm.meta.container.dockerfile) { + return jobId.toLowerCase() + '-image:latest' + } let { image } = algorithm.meta.container if (algorithm.meta.container.checksum) image = image + '@' + algorithm.meta.container.checksum diff --git a/src/components/core/compute/initialize.ts b/src/components/core/compute/initialize.ts index c2a223b33..f80402547 100644 --- a/src/components/core/compute/initialize.ts +++ b/src/components/core/compute/initialize.ts @@ -33,7 +33,7 @@ import { C2DEngineDocker, getAlgorithmImage } from '../../c2d/compute_engine_doc import { Credentials, DDOManager } from '@oceanprotocol/ddo-js' import { areKnownCredentialTypes, checkCredentials } from '../../../utils/credentials.js' import { PolicyServer } from '../../policyServer/index.js' -import { getAlgoChecksums, validateAlgoForDataset } from './utils.js' +import { generateUniqueID, getAlgoChecksums, validateAlgoForDataset } from './utils.js' export class ComputeInitializeHandler extends CommandHandler { validate(command: ComputeInitializeCommand): ValidateParams { @@ -366,7 +366,7 @@ export class ComputeInitializeHandler extends CommandHandler { } } if (hasDockerImages) { - const algoImage = getAlgorithmImage(task.algorithm) + const algoImage = getAlgorithmImage(task.algorithm, generateUniqueID(task)) if (algoImage) { const validation: ValidateParams = await C2DEngineDocker.checkDockerImage( algoImage, diff --git a/src/components/core/compute/startCompute.ts b/src/components/core/compute/startCompute.ts index 31a960ec1..aa302cb60 100644 --- a/src/components/core/compute/startCompute.ts +++ b/src/components/core/compute/startCompute.ts @@ -6,7 +6,7 @@ import { PaidComputeStartCommand } from '../../../@types/commands.js' import { CommandHandler } from '../handler/handler.js' -import { getAlgoChecksums, validateAlgoForDataset } from './utils.js' +import { generateUniqueID, getAlgoChecksums, validateAlgoForDataset } from './utils.js' import { ValidateParams, buildInvalidRequestMessage, @@ -35,7 +35,6 @@ import { Credentials, DDOManager } from '@oceanprotocol/ddo-js' import { getNonceAsNumber } from '../utils/nonceHandler.js' import { PolicyServer } from '../../policyServer/index.js' import { areKnownCredentialTypes, checkCredentials } from '../../../utils/credentials.js' -import { generateUniqueID } from '../../database/sqliteCompute.js' export class PaidComputeStartHandler extends CommandHandler { validate(command: PaidComputeStartCommand): ValidateParams { diff --git a/src/components/core/compute/utils.ts b/src/components/core/compute/utils.ts index 98065ee54..bfbfd43ac 100644 --- a/src/components/core/compute/utils.ts +++ b/src/components/core/compute/utils.ts @@ -14,6 +14,16 @@ import { createHash } from 'crypto' import { FindDdoHandler } from '../../core/handler/ddoHandler.js' import { DDOManager, VersionedDDO } from '@oceanprotocol/ddo-js' +export function generateUniqueID(jobStructure: any): string { + const timestamp = + BigInt(Date.now()) * 1_000_000n + (process.hrtime.bigint() % 1_000_000n) + const random = Math.random() + const jobId = createHash('sha256') + .update(JSON.stringify(jobStructure) + timestamp.toString() + random.toString()) + .digest('hex') + return jobId +} + export async function getAlgoChecksums( algoDID: string, algoServiceId: string, diff --git a/src/components/database/C2DDatabase.ts b/src/components/database/C2DDatabase.ts index 842c9a29f..b7b57c897 100644 --- a/src/components/database/C2DDatabase.ts +++ b/src/components/database/C2DDatabase.ts @@ -9,7 +9,7 @@ import { AbstractDatabase } from './BaseDatabase.js' import { OceanNode } from '../../OceanNode.js' import { getDatabase } from '../../utils/database.js' import { getConfiguration } from '../../utils/index.js' - +import { generateUniqueID } from '../core/compute/utils.js' export class C2DDatabase extends AbstractDatabase { private provider: SQLiteCompute @@ -32,6 +32,7 @@ export class C2DDatabase extends AbstractDatabase { } async newJob(job: DBComputeJob): Promise { + if (!job.jobId) job.jobId = generateUniqueID(job) const jobId = await this.provider.newJob(job) return jobId } diff --git a/src/components/database/sqliteCompute.ts b/src/components/database/sqliteCompute.ts index c668e03e0..c0ec4d1d1 100644 --- a/src/components/database/sqliteCompute.ts +++ b/src/components/database/sqliteCompute.ts @@ -7,7 +7,6 @@ import { } from '../../@types/C2D/C2D.js' import sqlite3, { RunResult } from 'sqlite3' import { DATABASE_LOGGER } from '../../utils/logging/common.js' -import { createHash } from 'crypto' interface ComputeDatabaseProvider { newJob(job: DBComputeJob): Promise @@ -18,16 +17,6 @@ interface ComputeDatabaseProvider { getFinishedJobs(): Promise } -export function generateUniqueID(jobStructure: any): string { - const timestamp = - BigInt(Date.now()) * 1_000_000n + (process.hrtime.bigint() % 1_000_000n) - const random = Math.random() - const jobId = createHash('sha256') - .update(JSON.stringify(jobStructure) + timestamp.toString() + random.toString()) - .digest('hex') - return jobId -} - function getInternalStructure(job: DBComputeJob): any { const internalBlob = { clusterHash: job.clusterHash, @@ -146,25 +135,6 @@ export class SQLiteCompute implements ComputeDatabaseProvider { ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?); ` - let jobId: string - if (!job.jobId) { - const jobStructure = { - assets: job.assets, - algorithm: job.algorithm, - output: {}, - environment: job.environment, - owner: job.owner, - maxJobDuration: job.maxJobDuration, - chainId: job.payment?.chainId || null, - agreementId: job.agreementId, - resources: job.resources, - metadata: job.metadata - } - jobId = generateUniqueID(jobStructure) - job.jobId = jobId - } else { - jobId = job.jobId - } return new Promise((resolve, reject) => { this.db.run( @@ -172,7 +142,7 @@ export class SQLiteCompute implements ComputeDatabaseProvider { [ job.owner, job.did, - jobId, + job.jobId, job.dateCreated || String(Date.now() / 1000), // seconds from epoch, job.status || C2DStatusNumber.JobStarted, job.statusText || C2DStatusText.JobStarted, @@ -188,8 +158,8 @@ export class SQLiteCompute implements ComputeDatabaseProvider { DATABASE_LOGGER.error('Could not insert C2D job on DB: ' + err.message) reject(err) } else { - DATABASE_LOGGER.info('Successfully inserted job with id:' + jobId) - resolve(jobId) + DATABASE_LOGGER.info('Successfully inserted job with id:' + job.jobId) + resolve(job.jobId) } } ) diff --git a/src/test/data/commands.ts b/src/test/data/commands.ts index b9a0156da..af0ab7a38 100644 --- a/src/test/data/commands.ts +++ b/src/test/data/commands.ts @@ -1,6 +1,6 @@ export const freeComputeStartPayload = { command: 'freeStartCompute', - consumerAddress: '0xC7EC1970B09224B317c52d92f37F5e1E4fF6B687', + consumerAddress: '0xeB5ae11175008E8f178d57d0152678a863FbB887', environment: '', nonce: '1', signature: '0x123',