diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4a8445603..e655fa42b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -260,7 +260,7 @@ jobs: DB_TYPE: 'elasticsearch' MAX_REQ_PER_MINUTE: 320 MAX_CONNECTIONS_PER_MINUTE: 320 - DOCKER_COMPUTE_ENVIRONMENTS: '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":1000000000}],"storageExpiry":604800,"maxJobDuration":3600,"fees":{"8996":[{"prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1000000000},{"id":"disk","max":1000000000}]}}]' + DOCKER_COMPUTE_ENVIRONMENTS: '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"storageExpiry":604800,"maxJobDuration":3600,"fees":{"8996":[{"prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]' - name: Check Ocean Node is running run: | for i in $(seq 1 90); do diff --git a/docs/GPU.md b/docs/GPU.md index a83c742ef..bfcb01df7 100644 --- a/docs/GPU.md +++ b/docs/GPU.md @@ -82,7 +82,7 @@ Here is the full definition of DOCKER_COMPUTE_ENVIRONMENTS: } } }, - { "id": "disk", "total": 1000000000 } + { "id": "disk", "total": 1 } ], "storageExpiry": 604800, "maxJobDuration": 3600, @@ -102,8 +102,8 @@ Here is the full definition of DOCKER_COMPUTE_ENVIRONMENTS: "maxJobs": 3, "resources": [ { "id": "cpu", "max": 1 }, - { "id": "ram", "max": 1000000000 }, - { "id": "disk", "max": 1000000000 }, + { "id": "ram", "max": 1 }, + { "id": "disk", "max": 1 }, { "id": "myGPU", "max": 1 } ] } @@ -122,7 +122,7 @@ root@gpu-1:/repos/ocean/ocean-node# curl http://localhost:8000/api/services/comp { "id": "0xd6b10b27aab01a72070a5164c07d0517755838b9cb9857e2d5649287ec3aaaa2-0x66073c81f833deaa2f8e2a508f69cf78f8a99b17ba1a64f369af921750f93914", "runningJobs": 0, - "consumerAddress": "0x4fb80776C8eb4cAbe7730dcBCdb1fa6ecD3c460E", + "consumerAddress": "0x00", "platform": { "architecture": "x86_64", "os": "Ubuntu 22.04.3 LTS" }, "fees": { "1": [ @@ -141,9 +141,9 @@ root@gpu-1:/repos/ocean/ocean-node# curl http://localhost:8000/api/services/comp { "id": "cpu", "total": 8, "max": 8, "min": 1, "inUse": 0 }, { "id": "ram", - "total": 24888963072, - "max": 24888963072, - "min": 1000000000, + "total": 23, + "max": 23, + "min": 1, "inUse": 0 }, { @@ -162,15 +162,15 @@ root@gpu-1:/repos/ocean/ocean-node# curl http://localhost:8000/api/services/comp "min": 0, "inUse": 0 }, - { "id": "disk", "total": 1000000000, "max": 1000000000, "min": 0, "inUse": 0 } + { "id": "disk", "total": 1, "max": 1, "min": 0, "inUse": 0 } ], "free": { "maxJobDuration": 60, "maxJobs": 3, "resources": [ { "id": "cpu", "max": 1, "inUse": 0 }, - { "id": "ram", "max": 1000000000, "inUse": 0 }, - { "id": "disk", "max": 1000000000, "inUse": 0 }, + { "id": "ram", "max": 1, "inUse": 0 }, + { "id": "disk", "max": 1, "inUse": 0 }, { "id": "myGPU", "max": 1, "inUse": 0 } ] }, @@ -194,7 +194,7 @@ Start a free job using: "rawcode": "import tensorflow as tf\nsess = tf.compat.v1.Session(config=tf.compat.v1.ConfigProto(log_device_placement=True))\nprint(\"Num GPUs Available: \", len(tf.config.list_physical_devices('GPU')))\ngpus = tf.config.list_physical_devices('GPU')\nfor gpu in gpus:\n\tprint('Name:', gpu.name, ' Type:', gpu.device_type)" } }, - "consumerAddress": "0xC7EC1970B09224B317c52d92f37F5e1E4fF6B687", + "consumerAddress": "0x00", "signature": "123", "nonce": 1, "environment": "0xd6b10b27aab01a72070a5164c07d0517755838b9cb9857e2d5649287ec3aaaa2-0x66073c81f833deaa2f8e2a508f69cf78f8a99b17ba1a64f369af921750f93914", @@ -259,7 +259,7 @@ Then define DOCKER_COMPUTE_ENVIRONMENTS with }, { "id": "disk", - "total": 1000000000 + "total": 1 } ], "storageExpiry": 604800, @@ -291,11 +291,11 @@ Then define DOCKER_COMPUTE_ENVIRONMENTS with }, { "id": "ram", - "max": 1000000000 + "max": 1 }, { "id": "disk", - "max": 1000000000 + "max": 1 }, { "id": "myGPU", @@ -311,7 +311,7 @@ aka ```bash export DOCKER_COMPUTE_ENVIRONMENTS="[{\"socketPath\":\"/var/run/docker.sock\",\"resources\":[{\"id\":\"myGPU\",\"description\":\"AMD Radeon RX 9070 XT\",\"type\":\"gpu\",\"total\":1,\"init\":{\"advanced\":{ -\"IpcMode\":\"host\",\"CapAdd\":[\"CAP_SYS_PTRACE\"],\"Devices\":[\"/dev/dxg\",\"/dev/dri/card0\"],\"Binds\":[\"/usr/lib/wsl/lib/libdxcore.so:/usr/lib/libdxcore.so\",\"/opt/rocm/lib/libhsa-runtime64.so.1:/opt/rocm/lib/libhsa-runtime64.so.1\"],\"SecurityOpt\":{\"seccomp\":\"unconfined\"}}}},{\"id\":\"disk\",\"total\":1000000000}],\"storageExpiry\":604800,\"maxJobDuration\":3600,\"fees\":{\"1\":[{\"feeToken\":\"0x123\",\"prices\":[{\"id\":\"cpu\",\"price\":1},{\"id\":\"nyGPU\",\"price\":3}]}]},\"free\":{\"maxJobDuration\":60,\"maxJobs\":3,\"resources\":[{\"id\":\"cpu\",\"max\":1},{\"id\":\"ram\",\"max\":1000000000},{\"id\":\"disk\",\"max\":1000000000},{\"id\":\"myGPU\",\"max\":1}]}}]" +\"IpcMode\":\"host\",\"CapAdd\":[\"CAP_SYS_PTRACE\"],\"Devices\":[\"/dev/dxg\",\"/dev/dri/card0\"],\"Binds\":[\"/usr/lib/wsl/lib/libdxcore.so:/usr/lib/libdxcore.so\",\"/opt/rocm/lib/libhsa-runtime64.so.1:/opt/rocm/lib/libhsa-runtime64.so.1\"],\"SecurityOpt\":{\"seccomp\":\"unconfined\"}}}},{\"id\":\"disk\",\"total\":10}],\"storageExpiry\":604800,\"maxJobDuration\":3600,\"fees\":{\"1\":[{\"feeToken\":\"0x123\",\"prices\":[{\"id\":\"cpu\",\"price\":1},{\"id\":\"nyGPU\",\"price\":3}]}]},\"free\":{\"maxJobDuration\":60,\"maxJobs\":3,\"resources\":[{\"id\":\"cpu\",\"max\":1},{\"id\":\"ram\",\"max\":1},{\"id\":\"disk\",\"max\":1},{\"id\":\"myGPU\",\"max\":1}]}}]" ``` you should have it in your compute envs: @@ -325,7 +325,7 @@ root@gpu-1:/repos/ocean/ocean-node# curl http://localhost:8000/api/services/comp { "id": "0xbb5773e734e1b188165dac88d9a3dc8ac28bc9f5624b45fa8bbd8fca043de7c1-0x2c2761f938cf186eeb81f71dee06ad7edb299493e39c316c390d0c0691e6585c", "runningJobs": 0, - "consumerAddress": "0x4fb80776C8eb4cAbe7730dcBCdb1fa6ecD3c460E", + "consumerAddress": "0x00", "platform": { "architecture": "x86_64", "os": "Ubuntu 24.04.2 LTS" @@ -359,9 +359,9 @@ root@gpu-1:/repos/ocean/ocean-node# curl http://localhost:8000/api/services/comp }, { "id": "ram", - "total": 33617674240, - "max": 33617674240, - "min": 1000000000, + "total": 31, + "max": 31, + "min": 1, "inUse": 0 }, { @@ -389,8 +389,8 @@ root@gpu-1:/repos/ocean/ocean-node# curl http://localhost:8000/api/services/comp }, { "id": "disk", - "total": 1000000000, - "max": 1000000000, + "total": 10, + "max": 10, "min": 0, "inUse": 0 } @@ -406,12 +406,12 @@ root@gpu-1:/repos/ocean/ocean-node# curl http://localhost:8000/api/services/comp }, { "id": "ram", - "max": 1000000000, + "max": 1, "inUse": 0 }, { "id": "disk", - "max": 1000000000, + "max": 1, "inUse": 0 }, { @@ -450,7 +450,7 @@ Start a free job with "rawcode": "import tensorflow as tf\nsess = tf.compat.v1.Session(config=tf.compat.v1.ConfigProto(log_device_placement=True))\nprint(\"Num GPUs Available: \", len(tf.config.list_physical_devices('GPU')))\ngpus = tf.config.list_physical_devices('GPU')\nfor gpu in gpus:\n\tprint('Name:', gpu.name, ' Type:', gpu.device_type)" } }, - "consumerAddress": "0xC7EC1970B09224B317c52d92f37F5e1E4fF6B687", + "consumerAddress": "0x00", "signature": "123", "nonce": 1, "environment": "0xbb5773e734e1b188165dac88d9a3dc8ac28bc9f5624b45fa8bbd8fca043de7c1-0x2c2761f938cf186eeb81f71dee06ad7edb299493e39c316c390d0c0691e6585c", diff --git a/docs/env.md b/docs/env.md index 70e35ad35..bf55199c9 100644 --- a/docs/env.md +++ b/docs/env.md @@ -130,7 +130,7 @@ The `DOCKER_COMPUTE_ENVIRONMENTS` environment variable should be a JSON array of "resources": [ { "id": "disk", - "total": 1000000000 + "total": 10 } ], "storageExpiry": 604800, @@ -158,11 +158,11 @@ The `DOCKER_COMPUTE_ENVIRONMENTS` environment variable should be a JSON array of }, { "id": "ram", - "max": 1000000000 + "max": 1 }, { "id": "disk", - "max": 1000000000 + "max": 1 } ] } diff --git a/scripts/ocean-node-quickstart.sh b/scripts/ocean-node-quickstart.sh index 4993d993f..13dc4b3cf 100755 --- a/scripts/ocean-node-quickstart.sh +++ b/scripts/ocean-node-quickstart.sh @@ -142,7 +142,7 @@ fi # Set default compute environments if not already defined if [ -z "$DOCKER_COMPUTE_ENVIRONMENTS" ]; then echo "Setting default DOCKER_COMPUTE_ENVIRONMENTS configuration" - export DOCKER_COMPUTE_ENVIRONMENTS="[{\"socketPath\":\"/var/run/docker.sock\",\"resources\":[{\"id\":\"disk\",\"total\":1000000000}],\"storageExpiry\":604800,\"maxJobDuration\":36000,\"fees\":{\"1\":[{\"feeToken\":\"0x123\",\"prices\":[{\"id\":\"cpu\",\"price\":1}]}]},\"free\":{\"maxJobDuration\":360000,\"maxJobs\":3,\"resources\":[{\"id\":\"cpu\",\"max\":1},{\"id\":\"ram\",\"max\":1000000000},{\"id\":\"disk\",\"max\":1000000000}]}}]" + export DOCKER_COMPUTE_ENVIRONMENTS="[{\"socketPath\":\"/var/run/docker.sock\",\"resources\":[{\"id\":\"disk\",\"total\":10}],\"storageExpiry\":604800,\"maxJobDuration\":36000,\"fees\":{\"1\":[{\"feeToken\":\"0x123\",\"prices\":[{\"id\":\"cpu\",\"price\":1}]}]},\"free\":{\"maxJobDuration\":360000,\"maxJobs\":3,\"resources\":[{\"id\":\"cpu\",\"max\":1},{\"id\":\"ram\",\"max\":1},{\"id\":\"disk\",\"max\":1}]}}]" fi cat < docker-compose.yml diff --git a/scripts/ocean-node-update.sh b/scripts/ocean-node-update.sh index b72804d9a..fa44b5337 100755 --- a/scripts/ocean-node-update.sh +++ b/scripts/ocean-node-update.sh @@ -1,6 +1,6 @@ #!/bin/bash -DEFAULT_DOCKER_ENVIRONMENTS='[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":1000000000}],"storageExpiry":604800,"maxJobDuration":36000,"fees":{"1":[{"feeToken":"0x123","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":360000,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1000000000},{"id":"disk","max":1000000000}]}}]' +DEFAULT_DOCKER_ENVIRONMENTS='[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"storageExpiry":604800,"maxJobDuration":36000,"fees":{"1":[{"feeToken":"0x123","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":360000,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]' check_prerequisites() { if [ ! -f "docker-compose.yml" ]; then diff --git a/src/@types/C2D/C2D.ts b/src/@types/C2D/C2D.ts index 28f8f5ace..f6cdb9ed6 100644 --- a/src/@types/C2D/C2D.ts +++ b/src/@types/C2D/C2D.ts @@ -1,4 +1,4 @@ -import { MetadataAlgorithm } from '@oceanprotocol/ddo-js' +import { MetadataAlgorithm, ConsumerParameter } from '@oceanprotocol/ddo-js' import type { BaseFileObject } from '../fileObject.js' export enum C2DClusterType { // eslint-disable-next-line no-unused-vars @@ -145,6 +145,11 @@ export interface ComputeResult { export type DBComputeJobMetadata = { [key: string]: string | number | boolean } + +export interface ComputeJobTerminationDetails { + OOMKilled: boolean + exitCode: number +} export interface ComputeJob { owner: string did?: string @@ -160,6 +165,7 @@ export interface ComputeJob { agreementId?: string environment?: string metadata?: DBComputeJobMetadata + terminationDetails?: ComputeJobTerminationDetails } export interface ComputeOutput { @@ -181,15 +187,27 @@ export interface ComputeAsset { transferTxId?: string userdata?: { [key: string]: any } } - +export interface ExtendedMetadataAlgorithm extends MetadataAlgorithm { + container: { + // retain existing properties + entrypoint: string + image: string + tag: string + checksum: string + dockerfile?: string // optional + additionalDockerFiles?: { [key: string]: any } + consumerParameters?: ConsumerParameter[] + } +} export interface ComputeAlgorithm { documentId?: string serviceId?: string fileObject?: BaseFileObject - meta?: MetadataAlgorithm + meta?: ExtendedMetadataAlgorithm transferTxId?: string algocustomdata?: { [key: string]: any } userdata?: { [key: string]: any } + envs?: { [key: string]: any } } export interface AlgoChecksums { @@ -236,6 +254,10 @@ export enum C2DStatusNumber { // eslint-disable-next-line no-unused-vars PullImageFailed = 11, // eslint-disable-next-line no-unused-vars + BuildImage = 12, + // eslint-disable-next-line no-unused-vars + BuildImageFailed = 13, + // eslint-disable-next-line no-unused-vars ConfiguringVolumes = 20, // eslint-disable-next-line no-unused-vars VolumeCreationFailed = 21, @@ -254,6 +276,8 @@ export enum C2DStatusNumber { // eslint-disable-next-line no-unused-vars AlgorithmFailed = 41, // eslint-disable-next-line no-unused-vars + DiskQuotaExceeded = 42, + // eslint-disable-next-line no-unused-vars FilteringResults = 50, // eslint-disable-next-line no-unused-vars PublishingResults = 60, @@ -272,6 +296,10 @@ export enum C2DStatusText { // eslint-disable-next-line no-unused-vars PullImageFailed = 'Pulling algorithm image failed', // eslint-disable-next-line no-unused-vars + BuildImage = 'Building algorithm image', + // eslint-disable-next-line no-unused-vars + BuildImageFailed = 'Building algorithm image failed', + // eslint-disable-next-line no-unused-vars ConfiguringVolumes = 'Configuring volumes', // eslint-disable-next-line no-unused-vars VolumeCreationFailed = 'Volume creation failed', @@ -290,6 +318,8 @@ export enum C2DStatusText { // eslint-disable-next-line no-unused-vars AlgorithmFailed = 'Failed to run algorithm', // eslint-disable-next-line no-unused-vars + DiskQuotaExceeded = 'Error: disk quota exceeded', + // eslint-disable-next-line no-unused-vars FilteringResults = 'Filtering results', // eslint-disable-next-line no-unused-vars PublishingResults = 'Publishing results', diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index dc5620f1a..83fa0a1dd 100644 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -30,12 +30,14 @@ import { Storage } from '../storage/index.js' import Dockerode from 'dockerode' import type { ContainerCreateOptions, HostConfig, VolumeCreateOptions } from 'dockerode' import * as tar from 'tar' +import * as tarStream from 'tar-stream' import { createWriteStream, existsSync, mkdirSync, rmSync, writeFileSync, + appendFileSync, statSync, createReadStream } from 'fs' @@ -56,6 +58,7 @@ export class C2DEngineDocker extends C2DEngine { public docker: Dockerode private cronTimer: any private cronTime: number = 2000 + private jobImageSizes: Map = new Map() public constructor(clusterConfig: C2DClusterInfo, db: C2DDatabase, escrow: Escrow) { super(clusterConfig, db, escrow) @@ -98,8 +101,6 @@ export class C2DEngineDocker extends C2DEngine { // let's build the env. Swarm and k8 will build multiple envs, based on arhitecture const config = await getConfiguration() const envConfig = await this.getC2DConfig().connection - console.log(config) - console.log(envConfig) let sysinfo = null try { sysinfo = await this.docker.info() @@ -110,10 +111,11 @@ export class C2DEngineDocker extends C2DEngine { } // console.log(sysinfo) let fees: ComputeEnvFeesStructure = null - const supportedChains: number[] = [] - for (const chain of Object.keys(config.supportedNetworks)) { - supportedChains.push(parseInt(chain)) + if (config.supportedNetworks) { + for (const chain of Object.keys(config.supportedNetworks)) { + supportedChains.push(parseInt(chain)) + } } for (const feeChain of Object.keys(envConfig.fees)) { // for (const feeConfig of envConfig.fees) { @@ -163,7 +165,7 @@ export class C2DEngineDocker extends C2DEngine { consumerAddress: config.keys.ethAddress, platform: { architecture: sysinfo.Architecture, - os: sysinfo.OperatingSystem + os: sysinfo.OSType }, fees }) @@ -184,9 +186,9 @@ export class C2DEngineDocker extends C2DEngine { this.envs[0].resources.push({ id: 'ram', type: 'ram', - total: sysinfo.MemTotal, - max: sysinfo.MemTotal, - min: 1e9 + total: Math.floor(sysinfo.MemTotal / 1024 / 1024 / 1024), + max: Math.floor(sysinfo.MemTotal / 1024 / 1024 / 1024), + min: 1 }) if (envConfig.resources) { @@ -308,48 +310,33 @@ export class C2DEngineDocker extends C2DEngine { ): Promise { try { const info = drc.default.parseRepoAndRef(image) - /** - * info: { - index: { name: 'docker.io', official: true }, - official: true, - remoteName: 'library/node', - localName: 'node', - canonicalName: 'docker.io/node', - digest: 'sha256:1155995dda741e93afe4b1c6ced2d01734a6ec69865cc0997daf1f4db7259a36' - } - */ const client = drc.createClientV2({ name: info.localName }) - const tagOrDigest = info.tag || info.digest - - // try get manifest from registry - return await new Promise((resolve, reject) => { - client.getManifest( - { ref: tagOrDigest, maxSchemaVersion: 2 }, - function (err: any, manifest: any) { - client.close() - if (manifest) { - return resolve({ - valid: checkManifestPlatform(manifest.platform, platform) - }) - } + const ref = info.tag || info.digest - if (err) { - CORE_LOGGER.error( - `Unable to get Manifest for image ${image}: ${err.message}` - ) - reject(err) - } - } - ) + const manifest = await new Promise((resolve, reject) => { + client.getManifest({ ref, maxSchemaVersion: 2 }, (err: any, result: any) => { + client.close() + err ? reject(err) : resolve(result) + }) }) - } catch (err) { - // show all aggregated errors, if present - const aggregated = err.errors && err.errors.length > 0 - aggregated ? CORE_LOGGER.error(JSON.stringify(err.errors)) : CORE_LOGGER.error(err) + + const platforms = Array.isArray(manifest.manifests) + ? manifest.manifests.map((entry: any) => entry.platform) + : [manifest.platform] + + const isValidPlatform = platforms.some((entry: any) => + checkManifestPlatform(entry, platform) + ) + + return { valid: isValidPlatform } + } catch (err: any) { + CORE_LOGGER.error(`Unable to get Manifest for image ${image}: ${err.message}`) + if (err.errors?.length) CORE_LOGGER.error(JSON.stringify(err.errors)) + return { valid: false, status: 404, - reason: aggregated ? JSON.stringify(err.errors) : err.message + reason: err.errors?.length ? JSON.stringify(err.errors) : err.message } } } @@ -372,18 +359,6 @@ export class C2DEngineDocker extends C2DEngine { // TO DO - iterate over resources and get default runtime const isFree: boolean = !(payment && payment.lockTx) - // C2D - Check image, check arhitecture, etc - const image = getAlgorithmImage(algorithm) - // ex: node@sha256:1155995dda741e93afe4b1c6ced2d01734a6ec69865cc0997daf1f4db7259a36 - if (!image) { - // send a 500 with the error message - throw new Error( - `Unable to extract docker image ${image} from algoritm: ${JSON.stringify( - algorithm - )}` - ) - } - if (metadata && Object.keys(metadata).length > 0) { const metadataSize = JSON.stringify(metadata).length if (metadataSize > 1024) { @@ -400,9 +375,29 @@ export class C2DEngineDocker extends C2DEngine { if (!env) { throw new Error(`Invalid environment ${environment}`) } - const validation = await C2DEngineDocker.checkDockerImage(image, env.platform) - if (!validation.valid) - throw new Error(`Unable to validate docker image ${image}: ${validation.reason}`) + // C2D - Check image, check arhitecture, etc + const image = getAlgorithmImage(algorithm, jobId) + // ex: node@sha256:1155995dda741e93afe4b1c6ced2d01734a6ec69865cc0997daf1f4db7259a36 + if (!image) { + // send a 500 with the error message + throw new Error( + `Unable to extract docker image ${image} from algoritm: ${JSON.stringify( + algorithm + )}` + ) + } + let additionalDockerFiles: { [key: string]: any } = null + if ( + algorithm.meta && + algorithm.meta.container && + algorithm.meta.container.additionalDockerFiles + ) { + additionalDockerFiles = JSON.parse( + JSON.stringify(algorithm.meta.container.additionalDockerFiles) + ) + // make sure that we don't keep them in the db structure + algorithm.meta.container.additionalDockerFiles = null + } const job: DBComputeJob = { clusterHash: this.getC2DConfig().hash, containerImage: image, @@ -430,15 +425,37 @@ export class C2DEngineDocker extends C2DEngine { algoStopTimestamp: '0', payment, metadata, - additionalViewers + additionalViewers, + terminationDetails: { exitCode: null, OOMKilled: null } + } + + if (algorithm.meta.container && algorithm.meta.container.dockerfile) { + // we need to build the image + job.status = C2DStatusNumber.BuildImage + job.statusText = C2DStatusText.BuildImage + } else { + // already built, we need to validate it + const validation = await C2DEngineDocker.checkDockerImage(image, env.platform) + console.log('Validation: ', validation) + if (!validation.valid) + throw new Error( + `Cannot find image ${image} for ${env.platform.architecture}. Maybe it does not exist or it's build for other arhitectures.` + ) + job.status = C2DStatusNumber.PullImage + job.statusText = C2DStatusText.PullImage } + await this.makeJobFolders(job) // make sure we actually were able to insert on DB const addedId = await this.db.newJob(job) if (!addedId) { return [] } - + if (algorithm.meta.container && algorithm.meta.container.dockerfile) { + this.buildImage(job, additionalDockerFiles) + } else { + this.pullImage(job) + } // only now set the timer if (!this.cronTimer) { this.setNewTimer() @@ -489,20 +506,6 @@ export class C2DEngineDocker extends C2DEngine { index = index + 1 } } catch (e) {} - try { - const logStat = statSync( - this.getC2DConfig().tempFolder + '/' + jobId + '/data/logs/configuration.log' - ) - if (logStat) { - res.push({ - filename: 'configuration.log', - filesize: logStat.size, - type: 'configurationLog', - index - }) - index = index + 1 - } - } catch (e) {} try { const logStat = statSync( this.getC2DConfig().tempFolder + '/' + jobId + '/data/logs/algorithm.log' @@ -531,20 +534,6 @@ export class C2DEngineDocker extends C2DEngine { index = index + 1 } } catch (e) {} - try { - const logStat = statSync( - this.getC2DConfig().tempFolder + '/' + jobId + '/data/logs/publish.log' - ) - if (logStat) { - res.push({ - filename: 'publish.log', - filesize: logStat.size, - type: 'publishLog', - index - }) - index = index + 1 - } - } catch (e) {} return res } @@ -602,29 +591,6 @@ export class C2DEngineDocker extends C2DEngine { } } } - if (i.type === 'configurationLog') { - return { - stream: createReadStream( - this.getC2DConfig().tempFolder + - '/' + - jobId + - '/data/logs/configuration.log' - ), - headers: { - 'Content-Type': 'text/plain' - } - } - } - if (i.type === 'publishLog') { - return { - stream: createReadStream( - this.getC2DConfig().tempFolder + '/' + jobId + '/data/logs/publish.log' - ), - headers: { - 'Content-Type': 'text/plain' - } - } - } if (i.type === 'imageLog') { return { stream: createReadStream( @@ -685,7 +651,7 @@ export class C2DEngineDocker extends C2DEngine { const jobs = await this.db.getRunningJobs(this.getC2DConfig().hash) if (jobs.length === 0) { - // CORE_LOGGER.info('No C2D jobs found for engine ' + this.getC2DConfig().hash) + CORE_LOGGER.info('No C2D jobs found for engine ' + this.getC2DConfig().hash) this.setNewTimer() return } else { @@ -708,7 +674,6 @@ export class C2DEngineDocker extends C2DEngine { ): Promise | null { try { const container = await this.docker.createContainer(containerInfo) - console.log('container: ', container) return container } catch (e) { CORE_LOGGER.error(`Unable to create docker container: ${e.message}`) @@ -727,6 +692,16 @@ export class C2DEngineDocker extends C2DEngine { } } + private async inspectContainer(container: Dockerode.Container): Promise { + try { + const data = await container.inspect() + return data.State + } catch (e) { + CORE_LOGGER.error(`Unable to inspect docker container: ${e.message}`) + return null + } + } + private async createDockerVolume( volume: VolumeCreateOptions, retry: boolean = false @@ -763,7 +738,7 @@ export class C2DEngineDocker extends C2DEngine { // - monitor running containers and stop them if over limits // - monitor disc space and clean up /* steps: - - instruct docker to pull image + - wait until image is ready - create volume - after image is ready, create the container - download assets & algo into temp folder @@ -776,65 +751,6 @@ export class C2DEngineDocker extends C2DEngine { - delete the container - delete the volume */ - if (job.status === C2DStatusNumber.JobStarted) { - // pull docker image - try { - const pullStream = await this.docker.pull(job.containerImage) - await new Promise((resolve, reject) => { - let wroteStatusBanner = false - this.docker.modem.followProgress( - pullStream, - (err: any, res: any) => { - // onFinished - if (err) return reject(err) - CORE_LOGGER.info('############# Pull docker image complete ##############') - resolve(res) - }, - (progress: any) => { - // onProgress - if (!wroteStatusBanner) { - wroteStatusBanner = true - CORE_LOGGER.info('############# Pull docker image status: ##############') - } - // only write the status banner once, its cleaner - CORE_LOGGER.info(progress.status) - } - ) - }) - } catch (err) { - CORE_LOGGER.error( - `Unable to pull docker image: ${job.containerImage}: ${err.message}` - ) - job.status = C2DStatusNumber.PullImageFailed - job.statusText = C2DStatusText.PullImageFailed - job.isRunning = false - job.dateFinished = String(Date.now() / 1000) - await this.db.updateJob(job) - await this.cleanupJob(job) - return - } - - job.status = C2DStatusNumber.PullImage - job.statusText = C2DStatusText.PullImage - await this.db.updateJob(job) - return // now we wait until image is ready - } - if (job.status === C2DStatusNumber.PullImage) { - try { - const imageInfo = await this.docker.getImage(job.containerImage) - console.log('imageInfo', imageInfo) - const details = await imageInfo.inspect() - console.log('details:', details) - job.status = C2DStatusNumber.ConfiguringVolumes - job.statusText = C2DStatusText.ConfiguringVolumes - await this.db.updateJob(job) - // now we can move forward - } catch (e) { - // not ready yet - CORE_LOGGER.error(`Unable to inspect docker image: ${e.message}`) - } - return - } if (job.status === C2DStatusNumber.ConfiguringVolumes) { // create the volume & create container // TO DO C2D: Choose driver & size @@ -884,7 +800,7 @@ export class C2DEngineDocker extends C2DEngine { // ram const ramSize = this.getResourceRequest(job.resources, 'ram') if (ramSize && ramSize > 0) { - hostConfig.Memory = ramSize + hostConfig.Memory = ramSize * 1024 * 1024 * 1024 // config is in GB, docker wants bytes // set swap to same memory value means no swap (otherwise it use like 2X mem) hostConfig.MemorySwap = hostConfig.Memory } @@ -934,6 +850,13 @@ export class C2DEngineDocker extends C2DEngine { ) containerInfo.Entrypoint = newEntrypoint.split(' ') } + if (job.algorithm.envs) { + const envVars: string[] = [] + for (const key of Object.keys(job.algorithm.envs)) { + envVars.push(`${key}=${job.algorithm.envs[key]}`) + } + containerInfo.Env = envVars + } const container = await this.createDockerContainer(containerInfo, true) if (container) { console.log('Container created: ', container) @@ -999,6 +922,9 @@ export class C2DEngineDocker extends C2DEngine { job.isStarted = true job.algoStartTimestamp = String(Date.now() / 1000) await this.db.updateJob(job) + CORE_LOGGER.info(`Container started successfully for job ${job.jobId}`) + + await this.measureContainerBaseSize(job, container) return } catch (e) { // container failed to start @@ -1028,7 +954,12 @@ export class C2DEngineDocker extends C2DEngine { } } } else { - // is running, we need to stop it.. + const canContinue = await this.monitorDiskUsage(job) + if (!canContinue) { + // Job was terminated due to disk quota exceeded + return + } + console.log('running, need to stop it?') const timeNow = Date.now() / 1000 const expiry = parseFloat(job.algoStartTimestamp) + job.maxJobDuration @@ -1091,6 +1022,15 @@ export class C2DEngineDocker extends C2DEngine { await this.cleanupJob(job) return } + const state = await this.inspectContainer(container) + if (state) { + job.terminationDetails.OOMKilled = state.OOMKilled + job.terminationDetails.exitCode = state.ExitCode + } else { + job.terminationDetails.OOMKilled = null + job.terminationDetails.exitCode = null + } + const outputsArchivePath = this.getC2DConfig().tempFolder + '/' + job.jobId + '/data/outputs/outputs.tar' try { @@ -1120,6 +1060,8 @@ export class C2DEngineDocker extends C2DEngine { // - delete volume // - delete container + this.jobImageSizes.delete(job.jobId) + // payments if (!job.isFree && job.payment) { let txId = null @@ -1182,7 +1124,7 @@ export class C2DEngineDocker extends C2DEngine { await container.remove() } } catch (e) { - console.error('Container not found! ' + e.message) + // console.error('Container not found! ' + e.message) } try { const volume = await this.docker.getVolume(job.jobId + '-volume') @@ -1194,7 +1136,17 @@ export class C2DEngineDocker extends C2DEngine { } } } catch (e) { - console.error('Container volume not found! ' + e.message) + // console.error('Container volume not found! ' + e.message) + } + if (job.algorithm.meta.container && job.algorithm.meta.container.dockerfile) { + const image = getAlgorithmImage(job.algorithm, job.jobId) + if (image) { + try { + await this.docker.getImage(image).remove({ force: true }) + } catch (e) { + console.log('Could not delete image: ' + image + ' : ' + e.message) + } + } } try { // remove folders @@ -1230,6 +1182,249 @@ export class C2DEngineDocker extends C2DEngine { }) } + private getDiskQuota(job: DBComputeJob): number { + if (!job.resources) return 0 + + const diskResource = job.resources.find((resource) => resource.id === 'disk') + return diskResource ? diskResource.amount : 0 + } + + // Inspect the real runtime size of the container + private async measureContainerBaseSize( + job: DBComputeJob, + container: Dockerode.Container + ): Promise { + try { + if (this.jobImageSizes.has(job.jobId)) { + CORE_LOGGER.debug(`Using cached base size for job ${job.jobId.slice(-8)}`) + return + } + + // Wait for container filesystem to stabilize + await new Promise((resolve) => setTimeout(resolve, 3000)) + + const actualBaseSize = await this.getContainerDiskUsage(container.id, '/') + this.jobImageSizes.set(job.jobId, actualBaseSize) + + CORE_LOGGER.info( + `Base container ${job.containerImage} runtime size: ${( + actualBaseSize / + 1024 / + 1024 / + 1024 + ).toFixed(2)}GB` + ) + } catch (error) { + CORE_LOGGER.error(`Failed to measure base container size: ${error.message}`) + this.jobImageSizes.set(job.jobId, 0) + } + } + + private async getContainerDiskUsage( + containerName: string, + path: string = '/data' + ): Promise { + try { + const container = this.docker.getContainer(containerName) + const containerInfo = await container.inspect() + if (!containerInfo.State.Running) { + CORE_LOGGER.debug( + `Container ${containerName} is not running, cannot check disk usage` + ) + return 0 + } + + const exec = await container.exec({ + Cmd: ['du', '-sb', path], + AttachStdout: true, + AttachStderr: true + }) + + const stream = await exec.start({ Detach: false, Tty: false }) + + const chunks: Buffer[] = [] + for await (const chunk of stream) { + chunks.push(chunk as Buffer) + } + + const output = Buffer.concat(chunks).toString() + + const match = output.match(/(\d+)\s/) + return match ? parseInt(match[1], 10) : 0 + } catch (error) { + CORE_LOGGER.error( + `Failed to get container disk usage for ${containerName}: ${error.message}` + ) + return 0 + } + } + + private async monitorDiskUsage(job: DBComputeJob): Promise { + const diskQuota = this.getDiskQuota(job) + if (diskQuota <= 0) return true + + const containerName = job.jobId + '-algoritm' + const totalUsage = await this.getContainerDiskUsage(containerName, '/') + const baseImageSize = this.jobImageSizes.get(job.jobId) || 0 + const algorithmUsage = Math.max(0, totalUsage - baseImageSize) + + const usageGB = (algorithmUsage / 1024 / 1024 / 1024).toFixed(2) + const quotaGB = diskQuota.toFixed(1) + const usagePercent = ((algorithmUsage / diskQuota) * 100).toFixed(1) + + CORE_LOGGER.info( + `Job ${job.jobId.slice(-8)} disk: ${usageGB}GB / ${quotaGB}GB (${usagePercent}%)` + ) + + if (algorithmUsage > diskQuota) { + CORE_LOGGER.warn( + `DISK QUOTA EXCEEDED - Stopping job ${job.jobId}: ${usageGB}GB used, ${quotaGB}GB allowed` + ) + + try { + const container = this.docker.getContainer(containerName) + await container.stop() + CORE_LOGGER.info(`Container stopped for job ${job.jobId}`) + } catch (e) { + CORE_LOGGER.warn(`Could not stop container: ${e.message}`) + } + + job.status = C2DStatusNumber.DiskQuotaExceeded + job.statusText = C2DStatusText.DiskQuotaExceeded + job.isRunning = false + job.isStarted = false + job.algoStopTimestamp = String(Date.now() / 1000) + job.dateFinished = String(Date.now() / 1000) + + await this.db.updateJob(job) + CORE_LOGGER.info(`Job ${job.jobId} terminated - DISK QUOTA EXCEEDED`) + + return false + } + + return true + } + + private async pullImage(originaljob: DBComputeJob) { + const job = JSON.parse(JSON.stringify(originaljob)) as DBComputeJob + const imageLogFile = + this.getC2DConfig().tempFolder + '/' + job.jobId + '/data/logs/image.log' + try { + const pullStream = await this.docker.pull(job.containerImage) + await new Promise((resolve, reject) => { + let wroteStatusBanner = false + this.docker.modem.followProgress( + pullStream, + (err: any, res: any) => { + // onFinished + if (err) { + appendFileSync(imageLogFile, String(err.message)) + return reject(err) + } + const logText = `Successfully pulled image: ${job.containerImage}` + CORE_LOGGER.debug(logText) + appendFileSync(imageLogFile, logText + '\n') + resolve(res) + }, + (progress: any) => { + // onProgress + if (!wroteStatusBanner) { + wroteStatusBanner = true + CORE_LOGGER.debug('############# Pull docker image status: ##############') + } + // only write the status banner once, its cleaner + let logText = '' + if (progress.id) logText += progress.id + ' : ' + progress.status + else logText = progress.status + CORE_LOGGER.debug("Pulling image for jobId '" + job.jobId + "': " + logText) + console.log(progress) + appendFileSync(imageLogFile, logText + '\n') + } + ) + }) + job.status = C2DStatusNumber.ConfiguringVolumes + job.statusText = C2DStatusText.ConfiguringVolumes + this.db.updateJob(job) + } catch (err) { + const logText = `Unable to pull docker image: ${job.containerImage}: ${err.message}` + CORE_LOGGER.error(logText) + appendFileSync(imageLogFile, logText) + job.status = C2DStatusNumber.PullImageFailed + job.statusText = C2DStatusText.PullImageFailed + job.isRunning = false + job.dateFinished = String(Date.now() / 1000) + await this.db.updateJob(job) + await this.cleanupJob(job) + } + } + + private async buildImage( + originaljob: DBComputeJob, + additionalDockerFiles: { [key: string]: any } + ) { + const job = JSON.parse(JSON.stringify(originaljob)) as DBComputeJob + const imageLogFile = + this.getC2DConfig().tempFolder + '/' + job.jobId + '/data/logs/image.log' + try { + const pack = tarStream.pack() + + // Append the Dockerfile to the tar archive + pack.entry({ name: 'Dockerfile' }, job.algorithm.meta.container.dockerfile) + // Append any additional files to the tar archive + if (additionalDockerFiles) { + for (const filePath of Object.keys(additionalDockerFiles)) { + pack.entry({ name: filePath }, additionalDockerFiles[filePath]) + } + } + pack.finalize() + + // Build the image using the tar stream as context + const buildStream = await this.docker.buildImage(pack, { + t: job.containerImage + }) + + // Optional: listen to build output + buildStream.on('data', (data) => { + try { + const text = JSON.parse(data.toString('utf8')) + CORE_LOGGER.debug( + "Building image for jobId '" + job.jobId + "': " + text.stream.trim() + ) + appendFileSync(imageLogFile, String(text.stream)) + } catch (e) { + // console.log('non json build data: ', data.toString('utf8')) + } + }) + + await new Promise((resolve, reject) => { + buildStream.on('end', () => { + CORE_LOGGER.debug(`Image '${job.containerImage}' built successfully.`) + + resolve() + }) + buildStream.on('error', (err) => { + CORE_LOGGER.debug(`Error building image '${job.containerImage}':` + err.message) + appendFileSync(imageLogFile, String(err.message)) + reject(err) + }) + }) + job.status = C2DStatusNumber.ConfiguringVolumes + job.statusText = C2DStatusText.ConfiguringVolumes + this.db.updateJob(job) + } catch (err) { + CORE_LOGGER.error( + `Unable to build docker image: ${job.containerImage}: ${err.message}` + ) + appendFileSync(imageLogFile, String(err.message)) + job.status = C2DStatusNumber.BuildImageFailed + job.statusText = C2DStatusText.BuildImageFailed + job.isRunning = false + job.dateFinished = String(Date.now() / 1000) + await this.db.updateJob(job) + await this.cleanupJob(job) + } + } + private async uploadData( job: DBComputeJob ): Promise<{ status: C2DStatusNumber; statusText: C2DStatusText }> { @@ -1238,15 +1433,25 @@ export class C2DEngineDocker extends C2DEngine { status: C2DStatusNumber.RunningAlgorithm, statusText: C2DStatusText.RunningAlgorithm } - const jobFolderPath = this.getC2DConfig().tempFolder + '/' + job.jobId - const fullAlgoPath = jobFolderPath + '/data/transformations/algorithm' - const configLogPath = jobFolderPath + '/data/logs/configuration.log' + // for testing purposes + // if (!job.algorithm.fileObject) { + // console.log('no file object') + // const file: UrlFileObject = { + // type: 'url', + // url: 'https://raw.githubusercontent.com/oceanprotocol/test-algorithm/master/javascript/algo.js', + // method: 'get' + // } + // job.algorithm.fileObject = file + // } + // download algo + // TODO: we currently DO NOT have a way to set this field unencrypted (once we publish the asset its encrypted) + // So we cannot test this from the CLI for instance... Only Option is to actually send it encrypted + // OR extract the files object from the passed DDO, decrypt it and use it + // console.log(job.algorithm.fileObject) + const fullAlgoPath = + this.getC2DConfig().tempFolder + '/' + job.jobId + '/data/transformations/algorithm' try { - writeFileSync( - configLogPath, - "Writing algocustom data to '/data/inputs/algoCustomData.json'\n" - ) const customdataPath = this.getC2DConfig().tempFolder + '/' + @@ -1258,7 +1463,6 @@ export class C2DEngineDocker extends C2DEngine { if (job.algorithm.meta.rawcode && job.algorithm.meta.rawcode.length > 0) { // we have the code, just write it - writeFileSync(configLogPath, `Writing raw algo code to ${fullAlgoPath}\n`) writeFileSync(fullAlgoPath, job.algorithm.meta.rawcode) } else { // do we have a files object? @@ -1266,41 +1470,17 @@ export class C2DEngineDocker extends C2DEngine { // is it unencrypted? if (job.algorithm.fileObject.type) { // we can get the storage directly - try { - storage = Storage.getStorageClass(job.algorithm.fileObject, config) - } catch (e) { - CORE_LOGGER.error(`Unable to get storage class for algorithm: ${e.message}`) - writeFileSync( - configLogPath, - `Unable to get storage class for algorithm: ${e.message}\n` - ) - return { - status: C2DStatusNumber.AlgorithmProvisioningFailed, - statusText: C2DStatusText.AlgorithmProvisioningFailed - } - } + storage = Storage.getStorageClass(job.algorithm.fileObject, config) } else { // ok, maybe we have this encrypted instead CORE_LOGGER.info( 'algorithm file object seems to be encrypted, checking it...' ) // 1. Decrypt the files object - try { - const decryptedFileObject = await decryptFilesObject( - job.algorithm.fileObject - ) - storage = Storage.getStorageClass(decryptedFileObject, config) - } catch (e) { - CORE_LOGGER.error(`Unable to decrypt algorithm files object: ${e.message}`) - writeFileSync( - configLogPath, - `Unable to decrypt algorithm files object: ${e.message}\n` - ) - return { - status: C2DStatusNumber.AlgorithmProvisioningFailed, - statusText: C2DStatusText.AlgorithmProvisioningFailed - } - } + const decryptedFileObject = await decryptFilesObject(job.algorithm.fileObject) + console.log('decryptedFileObject: ', decryptedFileObject) + // 2. Get default storage settings + storage = Storage.getStorageClass(decryptedFileObject, config) } } else { // no files object, try to get information from documentId and serviceId @@ -1308,49 +1488,25 @@ export class C2DEngineDocker extends C2DEngine { 'algorithm file object seems to be missing, checking "serviceId" and "documentId"...' ) const { serviceId, documentId } = job.algorithm - writeFileSync( - configLogPath, - `Using ${documentId} and serviceId ${serviceId} to get algorithm files.\n` - ) // we can get it from this info if (serviceId && documentId) { const algoDdo = await new FindDdoHandler( OceanNode.getInstance() ).findAndFormatDdo(documentId) + console.log('algo ddo:', algoDdo) // 1. Get the service const service: Service = AssetUtils.getServiceById(algoDdo, serviceId) - if (!service) { - CORE_LOGGER.error( - `Could not find service with ID ${serviceId} in DDO ${documentId}` - ) - writeFileSync( - configLogPath, - `Could not find service with ID ${serviceId} in DDO ${documentId}\n` - ) - return { - status: C2DStatusNumber.AlgorithmProvisioningFailed, - statusText: C2DStatusText.AlgorithmProvisioningFailed - } - } - try { - // 2. Decrypt the files object - const decryptedFileObject = await decryptFilesObject(service.files) - storage = Storage.getStorageClass(decryptedFileObject, config) - } catch (e) { - CORE_LOGGER.error(`Unable to decrypt algorithm files object: ${e.message}`) - writeFileSync( - configLogPath, - `Unable to decrypt algorithm files object: ${e.message}\n` - ) - return { - status: C2DStatusNumber.AlgorithmProvisioningFailed, - statusText: C2DStatusText.AlgorithmProvisioningFailed - } - } + + // 2. Decrypt the files object + const decryptedFileObject = await decryptFilesObject(service.files) + console.log('decryptedFileObject: ', decryptedFileObject) + // 4. Get default storage settings + storage = Storage.getStorageClass(decryptedFileObject, config) } } if (storage) { + console.log('fullAlgoPath', fullAlgoPath) await pipeline( (await storage.getReadableStream()).stream, createWriteStream(fullAlgoPath) @@ -1359,20 +1515,12 @@ export class C2DEngineDocker extends C2DEngine { CORE_LOGGER.info( 'Could not extract any files object from the compute algorithm, skipping...' ) - writeFileSync( - configLogPath, - 'Could not extract any files object from the compute algorithm, skipping...\n' - ) } } } catch (e) { CORE_LOGGER.error( 'Unable to write algorithm to path: ' + fullAlgoPath + ': ' + e.message ) - writeFileSync( - configLogPath, - 'Unable to write algorithm to path: ' + fullAlgoPath + ': ' + e.message + '\n' - ) return { status: C2DStatusNumber.AlgorithmProvisioningFailed, statusText: C2DStatusText.AlgorithmProvisioningFailed @@ -1384,73 +1532,53 @@ export class C2DEngineDocker extends C2DEngine { const asset = job.assets[i] let storage = null let fileInfo = null - console.log('checking now asset: ', i) - writeFileSync(configLogPath, `Downloading asset ${i} to /data/inputs/\n`) + console.log('checking now asset: ', asset) // without this check it would break if no fileObject is present if (asset.fileObject) { - try { - if (asset.fileObject.type) { - storage = Storage.getStorageClass(asset.fileObject, config) - } else { - CORE_LOGGER.info('asset file object seems to be encrypted, checking it...') - // get the encrypted bytes - const filesObject: any = await decryptFilesObject(asset.fileObject) - storage = Storage.getStorageClass(filesObject, config) - } - - // we need the file info for the name (but could be something else here) - fileInfo = await storage.getFileInfo({ - type: storage.getStorageType(asset.fileObject) - }) - } catch (e) { - CORE_LOGGER.error(`Unable to get storage class for asset: ${e.message}`) - writeFileSync( - configLogPath, - `Unable to get storage class for asset: ${e.message}\n` - ) - return { - status: C2DStatusNumber.DataProvisioningFailed, - statusText: C2DStatusText.DataProvisioningFailed - } + if (asset.fileObject.type) { + storage = Storage.getStorageClass(asset.fileObject, config) + } else { + CORE_LOGGER.info('asset file object seems to be encrypted, checking it...') + // get the encrypted bytes + const filesObject: any = await decryptFilesObject(asset.fileObject) + storage = Storage.getStorageClass(filesObject, config) } + + // we need the file info for the name (but could be something else here) + fileInfo = await storage.getFileInfo({ + type: storage.getStorageType(asset.fileObject) + }) } else { // we need to go the hard way const { serviceId, documentId } = asset - writeFileSync( - configLogPath, - `Using ${documentId} and serviceId ${serviceId} for this asset.\n` - ) if (serviceId && documentId) { // need to get the file - try { - const ddo = await new FindDdoHandler( - OceanNode.getInstance() - ).findAndFormatDdo(documentId) - // 2. Get the service - const service: Service = AssetUtils.getServiceById(ddo, serviceId) - // 3. Decrypt the url - const decryptedFileObject = await decryptFilesObject(service.files) - storage = Storage.getStorageClass(decryptedFileObject, config) - fileInfo = await storage.getFileInfo({ - type: storage.getStorageType(decryptedFileObject) - }) - } catch (e) { - CORE_LOGGER.error(`Unable to get storage class for asset: ${e.message}`) - writeFileSync( - configLogPath, - `Unable to get storage class for asset: ${e.message}\n` - ) - return { - status: C2DStatusNumber.DataProvisioningFailed, - statusText: C2DStatusText.DataProvisioningFailed - } - } + const ddo = await new FindDdoHandler(OceanNode.getInstance()).findAndFormatDdo( + documentId + ) + + // 2. Get the service + const service: Service = AssetUtils.getServiceById(ddo, serviceId) + // 3. Decrypt the url + const decryptedFileObject = await decryptFilesObject(service.files) + console.log('decryptedFileObject: ', decryptedFileObject) + storage = Storage.getStorageClass(decryptedFileObject, config) + + fileInfo = await storage.getFileInfo({ + type: storage.getStorageType(decryptedFileObject) + }) } } if (storage && fileInfo) { - const fullPath = jobFolderPath + '/data/inputs/' + fileInfo[0].name - writeFileSync(configLogPath, `Downloading asset to ${fullPath}\n`) + const fullPath = + this.getC2DConfig().tempFolder + + '/' + + job.jobId + + '/data/inputs/' + + fileInfo[0].name + + console.log('asset full path: ' + fullPath) try { await pipeline( (await storage.getReadableStream()).stream, @@ -1460,10 +1588,6 @@ export class C2DEngineDocker extends C2DEngine { CORE_LOGGER.error( 'Unable to write input data to path: ' + fullPath + ': ' + e.message ) - writeFileSync( - configLogPath, - 'Unable to write input data to path: ' + fullPath + ': ' + e.message + '\n' - ) return { status: C2DStatusNumber.DataProvisioningFailed, statusText: C2DStatusText.DataProvisioningFailed @@ -1473,20 +1597,13 @@ export class C2DEngineDocker extends C2DEngine { CORE_LOGGER.info( 'Could not extract any files object from the compute asset, skipping...' ) - writeFileSync( - configLogPath, - 'Could not extract any files object from the compute asset, skipping...\n' - ) } } CORE_LOGGER.info('All good with data provisioning, will start uploading it...') - writeFileSync( - configLogPath, - 'All good with data provisioning, will start uploading it...\n' - ) // now, we have to create a tar arhive - const folderToTar = jobFolderPath + '/data' - const destination = jobFolderPath + '/tarData/upload.tar.gz' + const folderToTar = this.getC2DConfig().tempFolder + '/' + job.jobId + '/data' + const destination = + this.getC2DConfig().tempFolder + '/' + job.jobId + '/tarData/upload.tar.gz' try { tar.create( { @@ -1498,6 +1615,7 @@ export class C2DEngineDocker extends C2DEngine { ['./'] ) // check if tar.gz actually exists + console.log('Start uploading') if (existsSync(destination)) { // now, upload it to the container @@ -1513,10 +1631,8 @@ export class C2DEngineDocker extends C2DEngine { console.log('Done uploading') } catch (e) { - writeFileSync( - configLogPath, - 'Data upload to container failed: ' + e.message + '\n' - ) + console.log('Data upload failed') + console.log(e) return { status: C2DStatusNumber.DataUploadFailed, statusText: C2DStatusText.DataUploadFailed @@ -1524,26 +1640,20 @@ export class C2DEngineDocker extends C2DEngine { } } else { CORE_LOGGER.debug('No data to upload, empty tar.gz') - writeFileSync(configLogPath, `No data to upload, empty tar.gz\n`) } } catch (e) { CORE_LOGGER.debug(e.message) - writeFileSync(configLogPath, `Error creating data archive: ${e.message}\n`) - return { - status: C2DStatusNumber.DataProvisioningFailed, - statusText: C2DStatusText.DataProvisioningFailed - } } - rmSync(jobFolderPath + '/data/inputs', { + rmSync(this.getC2DConfig().tempFolder + '/' + job.jobId + '/data/inputs', { recursive: true, force: true }) - rmSync(jobFolderPath + '/data/transformations', { + rmSync(this.getC2DConfig().tempFolder + '/' + job.jobId + '/data/transformations', { recursive: true, force: true }) - rmSync(jobFolderPath + '/tarData', { + rmSync(this.getC2DConfig().tempFolder + '/' + job.jobId + '/tarData', { recursive: true, force: true }) @@ -1600,10 +1710,13 @@ export class C2DEngineDocker extends C2DEngine { // this uses the docker engine, but exposes only one env, the free one -export function getAlgorithmImage(algorithm: ComputeAlgorithm): string { +export function getAlgorithmImage(algorithm: ComputeAlgorithm, jobId: string): string { if (!algorithm.meta || !algorithm.meta.container) { return null } + if (algorithm.meta.container.dockerfile) { + return jobId.toLowerCase() + '-image:latest' + } let { image } = algorithm.meta.container if (algorithm.meta.container.checksum) image = image + '@' + algorithm.meta.container.checksum @@ -1619,6 +1732,9 @@ export function checkManifestPlatform( envPlatform?: RunningPlatform ): boolean { if (!manifestPlatform || !envPlatform) return true // skips if not present + if (envPlatform.architecture === 'amd64') envPlatform.architecture = 'x86_64' // x86_64 is compatible with amd64 + if (manifestPlatform.architecture === 'amd64') manifestPlatform.architecture = 'x86_64' // x86_64 is compatible with amd64 + if ( envPlatform.architecture !== manifestPlatform.architecture || envPlatform.os !== manifestPlatform.os diff --git a/src/components/core/compute/initialize.ts b/src/components/core/compute/initialize.ts index b1bd8eb6d..f80402547 100644 --- a/src/components/core/compute/initialize.ts +++ b/src/components/core/compute/initialize.ts @@ -33,7 +33,7 @@ import { C2DEngineDocker, getAlgorithmImage } from '../../c2d/compute_engine_doc import { Credentials, DDOManager } from '@oceanprotocol/ddo-js' import { areKnownCredentialTypes, checkCredentials } from '../../../utils/credentials.js' import { PolicyServer } from '../../policyServer/index.js' -import { getAlgoChecksums, validateAlgoForDataset } from './utils.js' +import { generateUniqueID, getAlgoChecksums, validateAlgoForDataset } from './utils.js' export class ComputeInitializeHandler extends CommandHandler { validate(command: ComputeInitializeCommand): ValidateParams { @@ -257,7 +257,7 @@ export class ComputeInitializeHandler extends CommandHandler { httpStatus: 400, error: `Algorithm ${ task.algorithm.documentId - } with serviceId ${task.algorithm.serviceId} not allowed to run on the dataset: ${ddoInstance.getDid()} with serviceId: ${task.datasets[safeIndex].serviceId}` + } not allowed to run on the dataset: ${ddoInstance.getDid()}` } } } @@ -366,7 +366,7 @@ export class ComputeInitializeHandler extends CommandHandler { } } if (hasDockerImages) { - const algoImage = getAlgorithmImage(task.algorithm) + const algoImage = getAlgorithmImage(task.algorithm, generateUniqueID(task)) if (algoImage) { const validation: ValidateParams = await C2DEngineDocker.checkDockerImage( algoImage, @@ -387,7 +387,7 @@ export class ComputeInitializeHandler extends CommandHandler { const signer = blockchain.getSigner() // check if oasis evm or similar - const confidentialEVM = isConfidentialChainDDO(BigInt(ddoChainId), service) + const confidentialEVM = isConfidentialChainDDO(BigInt(ddo.chainId), service) // let's see if we can access this asset let canDecrypt = false try { diff --git a/src/components/core/compute/utils.ts b/src/components/core/compute/utils.ts index 55f664d53..716b03b68 100644 --- a/src/components/core/compute/utils.ts +++ b/src/components/core/compute/utils.ts @@ -14,6 +14,16 @@ import { createHash } from 'crypto' import { FindDdoHandler } from '../../core/handler/ddoHandler.js' import { DDOManager, VersionedDDO } from '@oceanprotocol/ddo-js' +export function generateUniqueID(jobStructure: any): string { + const timestamp = + BigInt(Date.now()) * 1_000_000n + (process.hrtime.bigint() % 1_000_000n) + const random = Math.random() + const jobId = createHash('sha256') + .update(JSON.stringify(jobStructure) + timestamp.toString() + random.toString()) + .digest('hex') + return jobId +} + export async function getAlgoChecksums( algoDID: string, algoServiceId: string, @@ -110,14 +120,9 @@ export async function validateAlgoForDataset( if ('serviceId' in algo) { const serviceIdMatch = algo.serviceId === '*' || algo.serviceId === algoChecksums.serviceId - CORE_LOGGER.info( - `didMatch: ${didMatch}, filesMatch: ${filesMatch}, containerMatch: ${containerMatch}, serviceIdMatch: ${serviceIdMatch}` - ) return didMatch && filesMatch && containerMatch && serviceIdMatch } - CORE_LOGGER.info( - `didMatch: ${didMatch}, filesMatch: ${filesMatch}, containerMatch: ${containerMatch}` - ) + return didMatch && filesMatch && containerMatch }) @@ -135,6 +140,7 @@ export async function validateAlgoForDataset( .includes(nftAddress?.toLowerCase()) } } + return isAlgoTrusted && isPublisherTrusted } diff --git a/src/components/database/sqliteCompute.ts b/src/components/database/sqliteCompute.ts index f968e3c9c..279e14151 100644 --- a/src/components/database/sqliteCompute.ts +++ b/src/components/database/sqliteCompute.ts @@ -46,7 +46,8 @@ function getInternalStructure(job: DBComputeJob): any { algoStartTimestamp: job.algoStartTimestamp, algoStopTimestamp: job.algoStopTimestamp, metadata: job.metadata, - additionalViewers: job.additionalViewers + additionalViewers: job.additionalViewers, + terminationDetails: job.terminationDetails } return internalBlob } diff --git a/src/helpers/scripts/setupNodeEnv.sh b/src/helpers/scripts/setupNodeEnv.sh index 0e2bc572f..c42508994 100755 --- a/src/helpers/scripts/setupNodeEnv.sh +++ b/src/helpers/scripts/setupNodeEnv.sh @@ -255,8 +255,7 @@ if [ "$enable_compute" == 'y' ]; then echo " You can customize this in your .env file for production use." echo "" - DOCKER_COMPUTE_ENV="[{\"socketPath\":\"/var/run/docker.sock\",\"resources\":[{\"id\":\"disk\",\"total\":1000000000}],\"storageExpiry\":604800,\"maxJobDuration\":36000,\"fees\":{\"1\":[{\"feeToken\":\"0x123\",\"prices\":[{\"id\":\"cpu\",\"price\":1}]}]},\"free\":{\"maxJobDuration\":360000,\"maxJobs\":3,\"resources\":[{\"id\":\"cpu\",\"max\":1},{\"id\":\"ram\",\"max\":1000000000},{\"id\":\"disk\",\"max\":1000000000}]}}]" - + DOCKER_COMPUTE_ENV="[{\"socketPath\":\"/var/run/docker.sock\",\"resources\":[{\"id\":\"disk\",\"total\":10}],\"storageExpiry\":604800,\"maxJobDuration\":36000,\"fees\":{\"1\":[{\"feeToken\":\"0x123\",\"prices\":[{\"id\":\"cpu\",\"price\":1}]}]},\"free\":{\"maxJobDuration\":360000,\"maxJobs\":3,\"resources\":[{\"id\":\"cpu\",\"max\":1},{\"id\":\"ram\",\"max\":1},{\"id\":\"disk\",\"max\":1}]}}]" REPLACE_STR="DOCKER_COMPUTE_ENVIRONMENTS='$DOCKER_COMPUTE_ENV'" if [ "$(uname)" == "Darwin" ]; then sed -i '' -e "s;DOCKER_COMPUTE_ENVIRONMENTS=;$REPLACE_STR;" "$env_file_path" diff --git a/src/test/integration/algorithmsAccess.test.ts b/src/test/integration/algorithmsAccess.test.ts index 8682d7957..90608f901 100644 --- a/src/test/integration/algorithmsAccess.test.ts +++ b/src/test/integration/algorithmsAccess.test.ts @@ -99,11 +99,11 @@ describe('Trusted algorithms Flow', () => { '0xc594c6e5def4bab63ac29eed19a134c130388f74f019bc74b8f4389df2837a58', JSON.stringify(['0xe2DD09d719Da89e5a3D0F2549c7E24566e947260']), `${homedir}/.ocean/ocean-contracts/artifacts/address.json`, - '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":1000000000}],"storageExpiry":604800,"maxJobDuration":3600,"fees":{"' + + '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"storageExpiry":604800,"maxJobDuration":3600,"fees":{"' + DEVELOPMENT_CHAIN_ID + '":[{"feeToken":"' + paymentToken + - '","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1000000000},{"id":"disk","max":1000000000}]}}]' + '","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]' ] ) ) @@ -235,6 +235,11 @@ describe('Trusted algorithms Flow', () => { console.log(resp) assert(resp, 'Failed to get response') assert(resp.status.httpStatus === 400, 'Failed to get 400 response') + assert( + resp.status.error === + `Algorithm ${publishedAlgoDataset.ddo.id} not allowed to run on the dataset: ${publishedComputeDataset.ddo.id}`, + 'Inconsistent error message' + ) assert(resp.stream === null, 'Failed to get stream') }) diff --git a/src/test/integration/compute.test.ts b/src/test/integration/compute.test.ts index eab2a6fa4..71dddb14c 100644 --- a/src/test/integration/compute.test.ts +++ b/src/test/integration/compute.test.ts @@ -145,11 +145,11 @@ describe('Compute', () => { '0xc594c6e5def4bab63ac29eed19a134c130388f74f019bc74b8f4389df2837a58', JSON.stringify(['0xe2DD09d719Da89e5a3D0F2549c7E24566e947260']), `${homedir}/.ocean/ocean-contracts/artifacts/address.json`, - '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":1000000000}],"storageExpiry":604800,"maxJobDuration":3600,"fees":{"' + + '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"storageExpiry":604800,"maxJobDuration":3600,"fees":{"' + DEVELOPMENT_CHAIN_ID + '":[{"feeToken":"' + paymentToken + - '","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1000000000},{"id":"disk","max":1000000000}]}}]' + '","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]' ] ) ) @@ -257,25 +257,19 @@ describe('Compute', () => { publishedComputeDataset = await waitToIndex( publishedComputeDataset.ddo.id, EVENTS.METADATA_UPDATED, - DEFAULT_TEST_TIMEOUT * 3, + DEFAULT_TEST_TIMEOUT * 2, true ) - if (!publishedComputeDataset.ddo) { - expect(expectedTimeoutFailure(this.test.title)).to.be.equal( - publishedComputeDataset.wasTimeout - ) - } else { - assert( - publishedComputeDataset?.ddo?.services[0]?.compute?.publisherTrustedAlgorithms - .length > 0, - 'Trusted algorithms not updated' - ) - assert( - publishedComputeDataset?.ddo?.services[0]?.compute?.publisherTrustedAlgorithms[0] - .did === publishedAlgoDataset.ddo.id, - 'Algorithm DID mismatch in trusted algorithms' - ) - } + assert( + publishedComputeDataset?.ddo?.services[0]?.compute?.publisherTrustedAlgorithms + .length > 0, + 'Trusted algorithms not updated' + ) + assert( + publishedComputeDataset?.ddo?.services[0]?.compute?.publisherTrustedAlgorithms[0] + .did === publishedAlgoDataset.ddo.id, + 'Algorithm DID mismatch in trusted algorithms' + ) }) it('Get compute environments', async () => { @@ -327,7 +321,6 @@ describe('Compute', () => { const response = await new ComputeGetEnvironmentsHandler(oceanNode).handle( getEnvironmentsTask ) - console.log('firstEnv', firstEnv) computeEnvironments = await streamToObject(response.stream as Readable) firstEnv = computeEnvironments[0] @@ -1081,14 +1074,7 @@ describe('Compute', () => { const response = await handler.handle(command) assert(response.status.httpStatus === 500, 'Failed to get 500 response') assert(response.stream === null, 'Should not get stream') - assert( - response.status.error.includes( - freeComputeStartPayload.algorithm.meta.container.image - ), - 'Should have image error' - ) }) - // algo and checksums related describe('C2D algo and checksums related', () => { it('should publish AlgoDDO', async () => { @@ -1270,6 +1256,7 @@ describe('Compute', () => { expect(algoChecksums.container).to.equal( 'ba8885fcc7d366f058d6c3bb0b7bfe191c5f85cb6a4ee3858895342436c23504' ) + expect(algoChecksums.serviceId).to.equal(algoDDOTest.services[0].id) } else expect(expectedTimeoutFailure(this.test.title)).to.be.equal(wasTimeout) }) @@ -1298,7 +1285,6 @@ describe('Compute', () => { const datasetDDOTest = ddo const datasetInstance = DDOManager.getDDOClass(datasetDDO) - console.log('datasetDDOTest', datasetDDOTest) if (datasetDDOTest) { const result = await validateAlgoForDataset( algoDDOTest.id, diff --git a/src/test/integration/credentials.test.ts b/src/test/integration/credentials.test.ts index b1c18ab49..1ee0728f8 100644 --- a/src/test/integration/credentials.test.ts +++ b/src/test/integration/credentials.test.ts @@ -125,11 +125,11 @@ describe('[Credentials Flow] - Should run a complete node flow.', () => { await publisherAccount.getAddress() // signer 0 ]), `${homedir}/.ocean/ocean-contracts/artifacts/address.json`, - '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":1000000000}],"storageExpiry":604800,"maxJobDuration":3600,"fees":{"' + + '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"storageExpiry":604800,"maxJobDuration":3600,"fees":{"' + DEVELOPMENT_CHAIN_ID + '":[{"feeToken":"' + paymentToken + - '","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1000000000},{"id":"disk","max":1000000000}]}}]' + '","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]' ] ) ) diff --git a/src/test/unit/compute.test.ts b/src/test/unit/compute.test.ts index 8ef3ffae3..a6a430ba5 100644 --- a/src/test/unit/compute.test.ts +++ b/src/test/unit/compute.test.ts @@ -52,7 +52,7 @@ describe('Compute Jobs Database', () => { envOverrides = buildEnvOverrideConfig( [ENVIRONMENT_VARIABLES.DOCKER_COMPUTE_ENVIRONMENTS], [ - '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":1000000000}],"storageExpiry":604800,"maxJobDuration":3600,"fees":{"1":[{"feeToken":"0x123","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1000000000},{"id":"disk","max":1000000000}]}}]' + '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"storageExpiry":604800,"maxJobDuration":3600,"fees":{"1":[{"feeToken":"0x123","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]' ] ) envOverrides = await setupEnvironment(TEST_ENV_CONFIG_FILE, envOverrides)