diff --git a/README.md b/README.md index ca2d3c255..701b7f856 100644 --- a/README.md +++ b/README.md @@ -139,6 +139,7 @@ Your node is now running, the control panel will be available at `http://localho - [Logging & accessing logs](docs/networking.md) - [Control Panel: Local development](controlpanel/README.md) - [Docker Deployment Guide](docs/dockerDeployment.md) +- [C2D GPU Guide](docs/GPU.md) ## Control Panel diff --git a/docs/GPU.md b/docs/GPU.md new file mode 100644 index 000000000..a83c742ef --- /dev/null +++ b/docs/GPU.md @@ -0,0 +1,484 @@ +Supporting GPUs for c2d jobs comes down to: + +- define gpu list for each c2d env +- pass docker args for each gpu +- set a price for each gpu + +## Nvidia GPU Example + +Start by installing nvidia cuda drivers (ie:https://docs.nvidia.com/cuda/cuda-installation-guide-linux/), then install nvidia container toolkit (https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/latest/install-guide.html) + +Once that is done, check if you can get gpu details by running 'nvidia-smi': + +``` +root@gpu-1:/repos/ocean/ocean-node# nvidia-smi +Fri Apr 25 06:00:34 2025 ++-----------------------------------------------------------------------------------------+ +| NVIDIA-SMI 550.163.01 Driver Version: 550.163.01 CUDA Version: 12.4 | +|-----------------------------------------+------------------------+----------------------+ +| GPU Name Persistence-M | Bus-Id Disp.A | Volatile Uncorr. ECC | +| Fan Temp Perf Pwr:Usage/Cap | Memory-Usage | GPU-Util Compute M. | +| | | MIG M. | +|=========================================+========================+======================| +| 0 NVIDIA GeForce GTX 1060 3GB Off | 00000000:01:00.0 Off | N/A | +| 0% 39C P8 6W / 120W | 2MiB / 3072MiB | 0% Default | +| | | N/A | ++-----------------------------------------+------------------------+----------------------+ + ++-----------------------------------------------------------------------------------------+ +| Processes: | +| GPU GI CI PID Type Process name GPU Memory | +| ID ID Usage | +|=========================================================================================| +| No running processes found | ++-----------------------------------------------------------------------------------------+ +``` + +Now, time to get the id of the gpu: + +```bash +root@gpu-1:/repos/ocean/ocean-node# nvidia-smi --query-gpu=name,uuid --format=csv +name, uuid +NVIDIA GeForce GTX 1060 3GB, GPU-294c6802-bb2f-fedb-f9e0-a26b9142dd81 +``` + +Now, we can define the gpu for node: + +```json +{ + "id": "myGPU", + "description": "NVIDIA GeForce GTX 1060 3GB", + "type": "gpu", + "total": 1, + "init": { + "deviceRequests": { + "Driver": "nvidia", + "DeviceIDs": ["GPU-294c6802-bb2f-fedb-f9e0-a26b9142dd81"], + "Capabilities": [["gpu"]] + } + } +} +``` + +Don't forget to add it to fees definition and free definition (if desired). + +Here is the full definition of DOCKER_COMPUTE_ENVIRONMENTS: + +```json +[ + { + "socketPath": "/var/run/docker.sock", + "resources": [ + { + "id": "myGPU", + "description": "NVIDIA GeForce GTX 1060 3GB", + "type": "gpu", + "total": 1, + "init": { + "deviceRequests": { + "Driver": "nvidia", + "DeviceIDs": ["GPU-294c6802-bb2f-fedb-f9e0-a26b9142dd81"], + "Capabilities": [["gpu"]] + } + } + }, + { "id": "disk", "total": 1000000000 } + ], + "storageExpiry": 604800, + "maxJobDuration": 3600, + "fees": { + "1": [ + { + "feeToken": "0x123", + "prices": [ + { "id": "cpu", "price": 1 }, + { "id": "nyGPU", "price": 3 } + ] + } + ] + }, + "free": { + "maxJobDuration": 60, + "maxJobs": 3, + "resources": [ + { "id": "cpu", "max": 1 }, + { "id": "ram", "max": 1000000000 }, + { "id": "disk", "max": 1000000000 }, + { "id": "myGPU", "max": 1 } + ] + } + } +] +``` + +And you should have it in your compute envs: + +```bash +root@gpu-1:/repos/ocean/ocean-node# curl http://localhost:8000/api/services/computeEnvironments +``` + +```json +[ + { + "id": "0xd6b10b27aab01a72070a5164c07d0517755838b9cb9857e2d5649287ec3aaaa2-0x66073c81f833deaa2f8e2a508f69cf78f8a99b17ba1a64f369af921750f93914", + "runningJobs": 0, + "consumerAddress": "0x4fb80776C8eb4cAbe7730dcBCdb1fa6ecD3c460E", + "platform": { "architecture": "x86_64", "os": "Ubuntu 22.04.3 LTS" }, + "fees": { + "1": [ + { + "feeToken": "0x123", + "prices": [ + { "id": "cpu", "price": 1 }, + { "id": "myGPU", "price": 3 } + ] + } + ] + }, + "storageExpiry": 604800, + "maxJobDuration": 3600, + "resources": [ + { "id": "cpu", "total": 8, "max": 8, "min": 1, "inUse": 0 }, + { + "id": "ram", + "total": 24888963072, + "max": 24888963072, + "min": 1000000000, + "inUse": 0 + }, + { + "id": "myGPU", + "description": "NVIDIA GeForce GTX 1060 3GB", + "type": "gpu", + "total": 1, + "init": { + "deviceRequests": { + "Driver": "nvidia", + "DeviceIDs": ["GPU-294c6802-bb2f-fedb-f9e0-a26b9142dd81"], + "Capabilities": [["gpu"]] + } + }, + "max": 1, + "min": 0, + "inUse": 0 + }, + { "id": "disk", "total": 1000000000, "max": 1000000000, "min": 0, "inUse": 0 } + ], + "free": { + "maxJobDuration": 60, + "maxJobs": 3, + "resources": [ + { "id": "cpu", "max": 1, "inUse": 0 }, + { "id": "ram", "max": 1000000000, "inUse": 0 }, + { "id": "disk", "max": 1000000000, "inUse": 0 }, + { "id": "myGPU", "max": 1, "inUse": 0 } + ] + }, + "runningfreeJobs": 0 + } +] +``` + +Start a free job using: + +```json +{ + "command": "freeStartCompute", + "algorithm": { + "meta": { + "container": { + "image": "tensorflow/tensorflow", + "tag": "2.17.0-gpu", + "entrypoint": "python $ALGO" + }, + "rawcode": "import tensorflow as tf\nsess = tf.compat.v1.Session(config=tf.compat.v1.ConfigProto(log_device_placement=True))\nprint(\"Num GPUs Available: \", len(tf.config.list_physical_devices('GPU')))\ngpus = tf.config.list_physical_devices('GPU')\nfor gpu in gpus:\n\tprint('Name:', gpu.name, ' Type:', gpu.device_type)" + } + }, + "consumerAddress": "0xC7EC1970B09224B317c52d92f37F5e1E4fF6B687", + "signature": "123", + "nonce": 1, + "environment": "0xd6b10b27aab01a72070a5164c07d0517755838b9cb9857e2d5649287ec3aaaa2-0x66073c81f833deaa2f8e2a508f69cf78f8a99b17ba1a64f369af921750f93914", + "resources": [ + { + "id": "cpu", + "amount": 1 + }, + { + "id": "myGPU", + "amount": 1 + } + ] +} +``` + +And the output of `getComputeResult` should look like: + +```bash +2025-04-25 06:18:20.890217: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered +2025-04-25 06:18:21.192330: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered +2025-04-25 06:18:21.292230: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1452] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered +WARNING: All log messages before absl::InitializeLog() is called are written to STDERR +I0000 00:00:1745561915.985558 1 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355 +I0000 00:00:1745561915.993514 1 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355 +I0000 00:00:1745561915.993799 1 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355 +Num GPUs Available: 1 +Name: /physical_device:GPU:0 Type: GPU +``` + +## AMD Radeon 9070 XT ON WSL2 + +First, install ROCm (https://rocm.docs.amd.com/projects/radeon/en/latest/docs/install/wsl/install-radeon.html) + +Then define DOCKER_COMPUTE_ENVIRONMENTS with + +```json +[ + { + "socketPath": "/var/run/docker.sock", + "resources": [ + { + "id": "myGPU", + "description": "AMD Radeon RX 9070 XT", + "type": "gpu", + "total": 1, + "init": { + "advanced": { + "IpcMode": "host", + "ShmSize": 8589934592, + "CapAdd": ["SYS_PTRACE"], + "Devices": ["/dev/dxg", "/dev/dri/card0"], + "Binds": [ + "/usr/lib/wsl/lib/libdxcore.so:/usr/lib/libdxcore.so", + "/opt/rocm/lib/libhsa-runtime64.so.1:/opt/rocm/lib/libhsa-runtime64.so.1" + ], + "SecurityOpt": { + "seccomp": "unconfined" + } + } + } + }, + { + "id": "disk", + "total": 1000000000 + } + ], + "storageExpiry": 604800, + "maxJobDuration": 3600, + "fees": { + "1": [ + { + "feeToken": "0x123", + "prices": [ + { + "id": "cpu", + "price": 1 + }, + { + "id": "nyGPU", + "price": 3 + } + ] + } + ] + }, + "free": { + "maxJobDuration": 60, + "maxJobs": 3, + "resources": [ + { + "id": "cpu", + "max": 1 + }, + { + "id": "ram", + "max": 1000000000 + }, + { + "id": "disk", + "max": 1000000000 + }, + { + "id": "myGPU", + "max": 1 + } + ] + } + } +] +``` + +aka + +```bash +export DOCKER_COMPUTE_ENVIRONMENTS="[{\"socketPath\":\"/var/run/docker.sock\",\"resources\":[{\"id\":\"myGPU\",\"description\":\"AMD Radeon RX 9070 XT\",\"type\":\"gpu\",\"total\":1,\"init\":{\"advanced\":{ +\"IpcMode\":\"host\",\"CapAdd\":[\"CAP_SYS_PTRACE\"],\"Devices\":[\"/dev/dxg\",\"/dev/dri/card0\"],\"Binds\":[\"/usr/lib/wsl/lib/libdxcore.so:/usr/lib/libdxcore.so\",\"/opt/rocm/lib/libhsa-runtime64.so.1:/opt/rocm/lib/libhsa-runtime64.so.1\"],\"SecurityOpt\":{\"seccomp\":\"unconfined\"}}}},{\"id\":\"disk\",\"total\":1000000000}],\"storageExpiry\":604800,\"maxJobDuration\":3600,\"fees\":{\"1\":[{\"feeToken\":\"0x123\",\"prices\":[{\"id\":\"cpu\",\"price\":1},{\"id\":\"nyGPU\",\"price\":3}]}]},\"free\":{\"maxJobDuration\":60,\"maxJobs\":3,\"resources\":[{\"id\":\"cpu\",\"max\":1},{\"id\":\"ram\",\"max\":1000000000},{\"id\":\"disk\",\"max\":1000000000},{\"id\":\"myGPU\",\"max\":1}]}}]" +``` + +you should have it in your compute envs: + +```bash +root@gpu-1:/repos/ocean/ocean-node# curl http://localhost:8000/api/services/computeEnvironments +``` + +```json +[ + { + "id": "0xbb5773e734e1b188165dac88d9a3dc8ac28bc9f5624b45fa8bbd8fca043de7c1-0x2c2761f938cf186eeb81f71dee06ad7edb299493e39c316c390d0c0691e6585c", + "runningJobs": 0, + "consumerAddress": "0x4fb80776C8eb4cAbe7730dcBCdb1fa6ecD3c460E", + "platform": { + "architecture": "x86_64", + "os": "Ubuntu 24.04.2 LTS" + }, + "fees": { + "1": [ + { + "feeToken": "0x123", + "prices": [ + { + "id": "cpu", + "price": 1 + }, + { + "id": "nyGPU", + "price": 3 + } + ] + } + ] + }, + "storageExpiry": 604800, + "maxJobDuration": 3600, + "resources": [ + { + "id": "cpu", + "total": 16, + "max": 16, + "min": 1, + "inUse": 0 + }, + { + "id": "ram", + "total": 33617674240, + "max": 33617674240, + "min": 1000000000, + "inUse": 0 + }, + { + "id": "myGPU", + "description": "AMD Radeon RX 9070 XT", + "type": "gpu", + "total": 1, + "init": { + "advanced": { + "IpcMode": "host", + "CapAdd": ["CAP_SYS_PTRACE"], + "Devices": ["/dev/dxg", "/dev/dri/card0"], + "Binds": [ + "/usr/lib/wsl/lib/libdxcore.so:/usr/lib/libdxcore.so", + "/opt/rocm/lib/libhsa-runtime64.so.1:/opt/rocm/lib/libhsa-runtime64.so.1" + ], + "SecurityOpt": { + "seccomp": "unconfined" + } + } + }, + "max": 1, + "min": 0, + "inUse": 0 + }, + { + "id": "disk", + "total": 1000000000, + "max": 1000000000, + "min": 0, + "inUse": 0 + } + ], + "free": { + "maxJobDuration": 60, + "maxJobs": 3, + "resources": [ + { + "id": "cpu", + "max": 1, + "inUse": 0 + }, + { + "id": "ram", + "max": 1000000000, + "inUse": 0 + }, + { + "id": "disk", + "max": 1000000000, + "inUse": 0 + }, + { + "id": "myGPU", + "max": 1, + "inUse": 0 + } + ] + }, + "runningfreeJobs": 0 + } +] +``` + +Start a free job with + +```json +{ + "command": "freeStartCompute", + "datasets": [ + { + "fileObject": { + "type": "url", + "url": "https://raw.githubusercontent.com/oceanprotocol/test-algorithm/master/javascript/algo.js", + "method": "get" + } + } + ], + "algorithm": { + "meta": { + "container": { + "image": "rocm/tensorflow", + "tag": "rocm6.4-py3.12-tf2.18-dev", + "entrypoint": "python $ALGO" + }, + "rawcode": "import tensorflow as tf\nsess = tf.compat.v1.Session(config=tf.compat.v1.ConfigProto(log_device_placement=True))\nprint(\"Num GPUs Available: \", len(tf.config.list_physical_devices('GPU')))\ngpus = tf.config.list_physical_devices('GPU')\nfor gpu in gpus:\n\tprint('Name:', gpu.name, ' Type:', gpu.device_type)" + } + }, + "consumerAddress": "0xC7EC1970B09224B317c52d92f37F5e1E4fF6B687", + "signature": "123", + "nonce": 1, + "environment": "0xbb5773e734e1b188165dac88d9a3dc8ac28bc9f5624b45fa8bbd8fca043de7c1-0x2c2761f938cf186eeb81f71dee06ad7edb299493e39c316c390d0c0691e6585c", + "resources": [ + { + "id": "cpu", + "amount": 1 + }, + { + "id": "myGPU", + "amount": 1 + } + ] +} +``` + +and get the results + +```bash +2025-04-25 15:16:15.218050: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations. +To enable the following instructions: SSE3 SSE4.1 SSE4.2 AVX AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags. +WARNING: All log messages before absl::InitializeLog() is called are written to STDERR +I0000 00:00:1745594260.720023 1 gpu_device.cc:2022] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 2874 MB memory: -> device: 0, name: AMD Radeon RX 9070 XT, pci bus id: 0000:0d:00.0 +2025-04-25 15:17:44.018225: I tensorflow/core/common_runtime/direct_session.cc:378] Device mapping: +/job:localhost/replica:0/task:0/device:GPU:0 -> device: 0, name: AMD Radeon RX 9070 XT, pci bus id: 0000:0d:00.0 + +Num GPUs Available: 1 +Name: /physical_device:GPU:0 Type: GPU +Warning: Resource leak detected by SharedSignalPool, 385 Signals leaked. +pid:1 tid:0x7f4476ac1740 [~VaMgr] frag_map_ size is not 1. +``` diff --git a/src/@types/C2D/C2D.ts b/src/@types/C2D/C2D.ts index 578c21c15..e7c6e30af 100644 --- a/src/@types/C2D/C2D.ts +++ b/src/@types/C2D/C2D.ts @@ -27,14 +27,36 @@ export interface ComputeResourcesPricingInfo { price: number // price per unit per minute } +export interface ArgumentValues { + [key: string]: string | number | boolean | any[] // Supports multiple value types +} + +export interface dockerDeviceRequest { + Driver: string + Count?: number + DeviceIDs: string[] + Capabilities?: any + Options?: any +} + +// docker hw can be defined with either deviceRequests (simpler, if you have a driver), or in advanced way +// advanced way means you have to defined different params like devices, cggroups, caps, etc +export interface dockerHwInit { + deviceRequests?: dockerDeviceRequest + advanced?: ArgumentValues + runtime?: string +} + export interface ComputeResource { id: ComputeResourceType + description?: string type?: string kind?: string total: number // total number of specific resource min: number // min number of resource needed for a job max: number // max number of resource for a job inUse?: number // for display purposes + init?: dockerHwInit } export interface ComputeResourceRequest { id: string @@ -77,6 +99,12 @@ export interface ComputeEnvironmentBaseConfig { platform: RunningPlatform } +export interface ComputeRuntimes { + [key: string]: { + path?: string + runtimeArgs?: string[] // Optional runtime arguments + } +} export interface ComputeEnvironment extends ComputeEnvironmentBaseConfig { id: string // v1 runningJobs: number diff --git a/src/components/Auth/index.ts b/src/components/Auth/index.ts index fbc8f726f..dee25ebf1 100644 --- a/src/components/Auth/index.ts +++ b/src/components/Auth/index.ts @@ -9,6 +9,14 @@ export interface CommonValidation { error: string } +export interface AuthValidation { + token?: string + address?: string + nonce?: string + signature?: string + message?: string +} + export class Auth { private authTokenDatabase: AuthTokenDatabase @@ -21,10 +29,6 @@ export class Auth { return config.jwtSecret } - public getMessage(address: string, nonce: string): string { - return address + nonce - } - async getJWTToken(address: string, nonce: string, createdAt: number): Promise { const jwtToken = jwt.sign( { @@ -68,17 +72,10 @@ export class Auth { * @param {string} message - The message to validate * @returns The validation result */ - async validateAuthenticationOrToken({ - token, - address, - nonce, - signature - }: { - token?: string - address?: string - nonce?: string - signature?: string - }): Promise { + async validateAuthenticationOrToken( + authValidation: AuthValidation + ): Promise { + const { token, address, nonce, signature, message } = authValidation try { if (signature && address && nonce) { const oceanNode = OceanNode.getInstance() @@ -87,7 +84,7 @@ export class Auth { address, parseInt(nonce), signature, - this.getMessage(address, nonce) + message ) if (!nonceCheckResult.valid) { diff --git a/src/components/c2d/compute_engine_base.ts b/src/components/c2d/compute_engine_base.ts index c7611c8e3..3bda17305 100644 --- a/src/components/c2d/compute_engine_base.ts +++ b/src/components/c2d/compute_engine_base.ts @@ -12,7 +12,8 @@ import type { ComputeResource, ComputeResourcesPricingInfo, DBComputeJobPayment, - DBComputeJob + DBComputeJob, + dockerDeviceRequest } from '../../@types/C2D/C2D.js' import { C2DClusterType } from '../../@types/C2D/C2D.js' import { C2DDatabase } from '../database/C2DDatabase.js' @@ -145,6 +146,14 @@ export abstract class C2DEngine { isFree: boolean ): ComputeResource { const paid = this.getResource(env.resources, id) + if (!paid) { + return { + id, + total: 0, + max: 0, + min: 0 + } + } let free = null if (isFree && 'free' in env && 'resources' in env.free) { free = this.getResource(env.free.resources, id) @@ -192,7 +201,7 @@ export abstract class C2DEngine { for (const device of elements) { let desired = this.getResourceRequest(resources, device) const minMax = this.getMaxMinResource(device, env, isFree) - if (!desired && minMax.min > 0) { + if (!desired && minMax.min >= 0) { // it's required desired = minMax.min } else { @@ -208,7 +217,7 @@ export abstract class C2DEngine { ) } } - properResources.push({ id: device, amount: minMax.min }) + properResources.push({ id: device, amount: desired }) } return properResources @@ -295,6 +304,88 @@ export abstract class C2DEngine { return null } + public getDockerDeviceRequest( + requests: ComputeResourceRequest[], + resources: ComputeResource[] + ) { + if (!resources) return null + const ret: dockerDeviceRequest[] = [] + for (const resource of requests) { + const res = this.getResource(resources, resource.id) + if (res.init && res.init.deviceRequests) { + ret.push(res.init.deviceRequests) + } + } + return ret + } + + public getDockerAdvancedConfig( + requests: ComputeResourceRequest[], + resources: ComputeResource[] + ) { + const ret = { + Devices: [] as any[], + GroupAdd: [] as string[], + SecurityOpt: [] as string[], + Binds: [] as string[], + CapAdd: [] as string[], + CapDrop: [] as string[], + IpcMode: null as string, + ShmSize: 0 as number + } + for (const resource of requests) { + const res = this.getResource(resources, resource.id) + if (res.init && res.init.advanced) { + for (const [key, value] of Object.entries(res.init.advanced)) { + switch (key) { + case 'IpcMode': + ret.IpcMode = value as string + break + case 'ShmSize': + ret.ShmSize = value as number + break + case 'GroupAdd': + for (const grp of value as string[]) { + if (!ret.GroupAdd.includes(grp)) ret.GroupAdd.push(grp) + } + break + case 'CapAdd': + for (const grp of value as string[]) { + if (!ret.CapAdd.includes(grp)) ret.CapAdd.push(grp) + } + break + case 'CapDrop': + for (const grp of value as string[]) { + if (!ret.CapDrop.includes(grp)) ret.CapDrop.push(grp) + } + break + case 'Devices': + for (const device of value as string[]) { + if (!ret.Devices.find((d) => d.PathOnHost === device)) + ret.Devices.push({ + PathOnHost: device, + PathInContainer: device, + CgroupPermissions: 'rwm' + }) + } + break + case 'SecurityOpt': + for (const [secKeys, secValues] of Object.entries(value)) + if (!ret.SecurityOpt.includes(secKeys + '=' + secValues)) + ret.SecurityOpt.push(secKeys + '=' + secValues) + break + case 'Binds': + for (const grp of value as string[]) { + if (!ret.Binds.includes(grp)) ret.Binds.push(grp) + } + break + } + } + } + } + return ret + } + public getEnvPricesForToken( env: ComputeEnvironment, chainId: number, diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index 423ed9e93..20da6c9ec 100644 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -178,6 +178,7 @@ export class C2DEngineDocker extends C2DEngine { max: sysinfo.MemTotal, min: 1e9 }) + if (envConfig.resources) { for (const res of envConfig.resources) { // allow user to add other resources @@ -188,6 +189,36 @@ export class C2DEngineDocker extends C2DEngine { } } } + /* TODO - get namedresources & discreete one + if (sysinfo.GenericResources) { + for (const [key, value] of Object.entries(sysinfo.GenericResources)) { + for (const [type, val] of Object.entries(value)) { + // for (const resType in sysinfo.GenericResources) { + if (type === 'NamedResourceSpec') { + // if we have it, ignore it + const resourceId = val.Value + const resourceType = val.Kind + let found = false + for (const res of this.envs[0].resources) { + if (res.id === resourceId) { + found = true + break + } + } + if (!found) { + this.envs[0].resources.push({ + id: resourceId, + kind: resourceType, + total: 1, + max: 1, + min: 0 + }) + } + } + } + } + } + */ // limits for free env if ('free' in envConfig) { this.envs[0].free = {} @@ -203,6 +234,11 @@ export class C2DEngineDocker extends C2DEngine { } this.envs[0].id = this.getC2DConfig().hash + '-' + create256Hash(JSON.stringify(this.envs[0])) + + // only now set the timer + if (!this.cronTimer) { + this.setNewTimer() + } } // eslint-disable-next-line require-await @@ -214,11 +250,19 @@ export class C2DEngineDocker extends C2DEngine { */ if (!this.docker) return [] const filteredEnvs = [] + // const systemInfo = this.docker ? await this.docker.info() : null for (const computeEnv of this.envs) { if ( !chainId || (computeEnv.fees && Object.hasOwn(computeEnv.fees, String(chainId))) ) { + // TO DO - At some point in time we need to handle multiple runtimes + // console.log('********************************') + // console.log(systemInfo.GenericResources) + // console.log('********************************') + // if (systemInfo.Runtimes) computeEnv.runtimes = systemInfo.Runtimes + // if (systemInfo.DefaultRuntime) + // computeEnv.defaultRuntime = systemInfo.DefaultRuntime const { totalJobs, totalFreeJobs, usedResources, usedFreeResources } = await this.getUsedResources(computeEnv) computeEnv.runningJobs = totalJobs @@ -313,6 +357,7 @@ export class C2DEngineDocker extends C2DEngine { jobId: string ): Promise { if (!this.docker) return [] + // TO DO - iterate over resources and get default runtime const isFree: boolean = !(payment && payment.lockTx) // C2D - Check image, check arhitecture, etc @@ -521,7 +566,8 @@ export class C2DEngineDocker extends C2DEngine { const jobs = await this.db.getRunningJobs(this.getC2DConfig().hash) if (jobs.length === 0) { - CORE_LOGGER.info('No C2D jobs found for engine ' + this.getC2DConfig().hash) + // CORE_LOGGER.info('No C2D jobs found for engine ' + this.getC2DConfig().hash) + this.setNewTimer() return } else { CORE_LOGGER.info(`Got ${jobs.length} jobs for engine ${this.getC2DConfig().hash}`) @@ -577,7 +623,14 @@ export class C2DEngineDocker extends C2DEngine { ) { delete volume.DriverOpts CORE_LOGGER.info('Retrying again without DriverOpts options...') - return this.createDockerVolume(volume) + try { + return this.createDockerVolume(volume) + } catch (e) { + CORE_LOGGER.error( + `Unable to create docker volume without DriverOpts: ${e.message}` + ) + return false + } } return false } @@ -667,19 +720,19 @@ export class C2DEngineDocker extends C2DEngine { // create the volume & create container // TO DO C2D: Choose driver & size // get env info - // const environment = await this.getJobEnvironment(job) - + const envResource = this.envs[0].resources const volume: VolumeCreateOptions = { Name: job.jobId + '-volume' } // volume - const diskSize = this.getResourceRequest(job.resources, 'disk') - if (diskSize && diskSize > 0) { + /* const diskSize = this.getResourceRequest(job.resources, 'disk') + if (diskSize && diskSize > 0) { volume.DriverOpts = { - o: 'size=' + String(diskSize) + o: 'size=' + String(diskSize), + device: 'local', + type: 'local' } - } - + } */ const volumeCreated = await this.createDockerVolume(volume, true) if (!volumeCreated) { job.status = C2DStatusNumber.VolumeCreationFailed @@ -704,11 +757,11 @@ export class C2DEngineDocker extends C2DEngine { ] } // disk - if (diskSize && diskSize > 0) { - hostConfig.StorageOpt = { - size: String(diskSize) - } - } + // if (diskSize && diskSize > 0) { + // hostConfig.StorageOpt = { + // size: String(diskSize) + // } + // } // ram const ramSize = this.getResourceRequest(job.resources, 'ram') if (ramSize && ramSize > 0) { @@ -734,7 +787,27 @@ export class C2DEngineDocker extends C2DEngine { Volumes: mountVols, HostConfig: hostConfig } - + // TO DO - iterate over resources and get default runtime + // TO DO - check resources and pass devices + const dockerDeviceRequest = this.getDockerDeviceRequest(job.resources, envResource) + if (dockerDeviceRequest) { + containerInfo.HostConfig.DeviceRequests = dockerDeviceRequest + } + const advancedConfig = this.getDockerAdvancedConfig(job.resources, envResource) + if (advancedConfig.Devices) + containerInfo.HostConfig.Devices = advancedConfig.Devices + if (advancedConfig.GroupAdd) + containerInfo.HostConfig.GroupAdd = advancedConfig.GroupAdd + if (advancedConfig.SecurityOpt) + containerInfo.HostConfig.SecurityOpt = advancedConfig.SecurityOpt + if (advancedConfig.Binds) containerInfo.HostConfig.Binds = advancedConfig.Binds + if (advancedConfig.CapAdd) containerInfo.HostConfig.CapAdd = advancedConfig.CapAdd + if (advancedConfig.CapDrop) + containerInfo.HostConfig.CapDrop = advancedConfig.CapDrop + if (advancedConfig.IpcMode) + containerInfo.HostConfig.IpcMode = advancedConfig.IpcMode + if (advancedConfig.ShmSize) + containerInfo.HostConfig.ShmSize = advancedConfig.ShmSize if (job.algorithm.meta.container.entrypoint) { const newEntrypoint = job.algorithm.meta.container.entrypoint.replace( '$ALGO', @@ -742,11 +815,9 @@ export class C2DEngineDocker extends C2DEngine { ) containerInfo.Entrypoint = newEntrypoint.split(' ') } - console.log('CREATING CONTAINER') - console.log(containerInfo) const container = await this.createDockerContainer(containerInfo, true) if (container) { - console.log('container: ', container) + console.log('Container created: ', container) job.status = C2DStatusNumber.Provisioning job.statusText = C2DStatusText.Provisioning await this.db.updateJob(job) @@ -1004,7 +1075,7 @@ export class C2DEngineDocker extends C2DEngine { // So we cannot test this from the CLI for instance... Only Option is to actually send it encrypted // OR extract the files object from the passed DDO, decrypt it and use it - console.log(job.algorithm.fileObject) + // console.log(job.algorithm.fileObject) const fullAlgoPath = this.getC2DConfig().tempFolder + '/' + job.jobId + '/data/transformations/algorithm' try { diff --git a/src/components/core/compute/getResults.ts b/src/components/core/compute/getResults.ts index f7555cae0..682278f07 100644 --- a/src/components/core/compute/getResults.ts +++ b/src/components/core/compute/getResults.ts @@ -2,7 +2,6 @@ import { P2PCommandResponse } from '../../../@types/index.js' import { CORE_LOGGER } from '../../../utils/logging/common.js' import { CommandHandler } from '../handler/handler.js' import { ComputeGetResultCommand } from '../../../@types/commands.js' -import { checkNonce, NonceResponse } from '../utils/nonceHandler.js' import { buildInvalidRequestMessage, validateCommandParameters, @@ -12,13 +11,7 @@ import { isAddress } from 'ethers' export class ComputeGetResultHandler extends CommandHandler { validate(command: ComputeGetResultCommand): ValidateParams { - const validation = validateCommandParameters(command, [ - 'consumerAddress', - 'signature', - 'nonce', - 'jobId', - 'index' - ]) + const validation = validateCommandParameters(command, ['jobId', 'index']) if (validation.valid) { if (command.consumerAddress && !isAddress(command.consumerAddress)) { return buildInvalidRequestMessage( @@ -38,33 +31,17 @@ export class ComputeGetResultHandler extends CommandHandler { return validationResponse } - let error = null - - // signature message to check against - const message = task.consumerAddress + task.jobId + task.index.toString() + task.nonce - const nonceCheckResult: NonceResponse = await checkNonce( - this.getOceanNode().getDatabase().nonce, + const authValidationResponse = await this.validateTokenOrSignature( + task.authorization, task.consumerAddress, - parseInt(task.nonce), + task.nonce, task.signature, - message // task.jobId + task.index.toString() + String(task.consumerAddress + task.jobId + task.index.toString() + task.nonce) ) - - if (!nonceCheckResult.valid) { - // eslint-disable-next-line prefer-destructuring - error = nonceCheckResult.error + if (authValidationResponse.status.httpStatus !== 200) { + return authValidationResponse } - if (error) { - CORE_LOGGER.logMessage(error, true) - return { - stream: null, - status: { - httpStatus: 400, - error - } - } - } // split jobId (which is already in hash-jobId format) and get the hash // then get jobId which might contain dashes as well const index = task.jobId.indexOf('-') diff --git a/src/components/core/compute/getStreamableLogs.ts b/src/components/core/compute/getStreamableLogs.ts index 92ff46e96..45c701d28 100644 --- a/src/components/core/compute/getStreamableLogs.ts +++ b/src/components/core/compute/getStreamableLogs.ts @@ -2,7 +2,6 @@ import { P2PCommandResponse } from '../../../@types/index.js' import { CORE_LOGGER } from '../../../utils/logging/common.js' import { CommandHandler } from '../handler/handler.js' import { ComputeGetStreamableLogsCommand } from '../../../@types/commands.js' -import { checkNonce, NonceResponse } from '../utils/nonceHandler.js' import { Stream } from 'stream' import { buildInvalidRequestMessage, @@ -13,12 +12,7 @@ import { isAddress } from 'ethers' export class ComputeGetStreamableLogsHandler extends CommandHandler { validate(command: ComputeGetStreamableLogsCommand): ValidateParams { - const validation = validateCommandParameters(command, [ - 'consumerAddress', - 'signature', - 'nonce', - 'jobId' - ]) + const validation = validateCommandParameters(command, ['jobId']) if (validation.valid) { if (command.consumerAddress && !isAddress(command.consumerAddress)) { return buildInvalidRequestMessage( @@ -30,37 +24,22 @@ export class ComputeGetStreamableLogsHandler extends CommandHandler { } async handle(task: ComputeGetStreamableLogsCommand): Promise { + const oceanNode = this.getOceanNode() + const validationResponse = await this.verifyParamsAndRateLimits(task) if (this.shouldDenyTaskHandling(validationResponse)) { return validationResponse } - const oceanNode = this.getOceanNode() - let error = null - // signature message to check against - const message = task.consumerAddress + task.jobId + task.nonce - const nonceCheckResult: NonceResponse = await checkNonce( - oceanNode.getDatabase().nonce, + const authValidationResponse = await this.validateTokenOrSignature( + task.authorization, task.consumerAddress, - parseInt(task.nonce), + task.nonce, task.signature, - message + String(task.consumerAddress + task.jobId + task.nonce) ) - - if (!nonceCheckResult.valid) { - // eslint-disable-next-line prefer-destructuring - error = nonceCheckResult.error - } - - if (error) { - CORE_LOGGER.logMessage(error, true) - return { - stream: null, - status: { - httpStatus: 400, - error - } - } + if (authValidationResponse.status.httpStatus !== 200) { + return authValidationResponse } // split jobId (which is already in hash-jobId format) and get the hash diff --git a/src/components/core/compute/initialize.ts b/src/components/core/compute/initialize.ts index 52a59e407..979b13d5e 100644 --- a/src/components/core/compute/initialize.ts +++ b/src/components/core/compute/initialize.ts @@ -336,7 +336,6 @@ export class ComputeInitializeHandler extends CommandHandler { message: false } result.consumerAddress = env.consumerAddress - CORE_LOGGER.info(`elem: ${JSON.stringify(elem)}`) if ('transferTxId' in elem && elem.transferTxId) { // search for that compute env and see if it has access to dataset const paymentValidation = await validateOrderTransaction( @@ -349,8 +348,6 @@ export class ComputeInitializeHandler extends CommandHandler { service.timeout, blockchain.getSigner() ) - CORE_LOGGER.info(`paymentValidation: ${JSON.stringify(paymentValidation)}`) - if (paymentValidation.isValid === true) { // order is valid, so let's check providerFees result.validOrder = elem.transferTxId diff --git a/src/components/core/compute/startCompute.ts b/src/components/core/compute/startCompute.ts index 0677ddad0..6a3be64a3 100644 --- a/src/components/core/compute/startCompute.ts +++ b/src/components/core/compute/startCompute.ts @@ -32,15 +32,12 @@ import { FindDdoHandler } from '../handler/ddoHandler.js' // import { ProviderFeeValidation } from '../../../@types/Fees.js' import { isOrderingAllowedForAsset } from '../handler/downloadHandler.js' import { DDOManager } from '@oceanprotocol/ddo-js' -import { getNonceAsNumber, checkNonce, NonceResponse } from '../utils/nonceHandler.js' +import { getNonceAsNumber } from '../utils/nonceHandler.js' import { generateUniqueID } from '../../database/sqliteCompute.js' export class PaidComputeStartHandler extends CommandHandler { validate(command: PaidComputeStartCommand): ValidateParams { const commandValidation = validateCommandParameters(command, [ - 'consumerAddress', - 'signature', - 'nonce', 'environment', 'algorithm', 'datasets', @@ -64,6 +61,19 @@ export class PaidComputeStartHandler extends CommandHandler { if (this.shouldDenyTaskHandling(validationResponse)) { return validationResponse } + + const authValidationResponse = await this.validateTokenOrSignature( + task.authorization, + task.consumerAddress, + task.nonce, + task.signature, + String(task.consumerAddress + task.datasets[0]?.documentId + task.nonce) + ) + + if (authValidationResponse.status.httpStatus !== 200) { + return authValidationResponse + } + try { const node = this.getOceanNode() // split compute env (which is already in hash-envId format) and get the hash @@ -452,9 +462,6 @@ export class FreeComputeStartHandler extends CommandHandler { const commandValidation = validateCommandParameters(command, [ 'algorithm', 'datasets', - 'consumerAddress', - 'signature', - 'nonce', 'environment' ]) if (commandValidation.valid) { @@ -468,34 +475,23 @@ export class FreeComputeStartHandler extends CommandHandler { } async handle(task: FreeComputeStartCommand): Promise { + const thisNode = this.getOceanNode() const validationResponse = await this.verifyParamsAndRateLimits(task) if (this.shouldDenyTaskHandling(validationResponse)) { return validationResponse } - const thisNode = this.getOceanNode() - // Validate nonce and signature - const nonceCheckResult: NonceResponse = await checkNonce( - thisNode.getDatabase().nonce, + + const authValidationResponse = await this.validateTokenOrSignature( + task.authorization, task.consumerAddress, - parseInt(task.nonce), + task.nonce, task.signature, String(task.nonce) ) - - if (!nonceCheckResult.valid) { - CORE_LOGGER.logMessage( - 'Invalid nonce or signature, unable to proceed: ' + nonceCheckResult.error, - true - ) - return { - stream: null, - status: { - httpStatus: 500, - error: - 'Invalid nonce or signature, unable to proceed: ' + nonceCheckResult.error - } - } + if (authValidationResponse.status.httpStatus !== 200) { + return authValidationResponse } + let engine = null try { // split compute env (which is already in hash-envId format) and get the hash diff --git a/src/components/core/compute/stopCompute.ts b/src/components/core/compute/stopCompute.ts index 3352a923d..bed33cd9d 100644 --- a/src/components/core/compute/stopCompute.ts +++ b/src/components/core/compute/stopCompute.ts @@ -12,12 +12,7 @@ import { isAddress } from 'ethers' export class ComputeStopHandler extends CommandHandler { validate(command: ComputeStopCommand): ValidateParams { - const validation = validateCommandParameters(command, [ - 'consumerAddress', - 'signature', - 'nonce', - 'jobId' - ]) + const validation = validateCommandParameters(command, ['jobId']) if (validation.valid) { if (!isAddress(command.consumerAddress)) { return buildInvalidRequestMessage( @@ -33,6 +28,18 @@ export class ComputeStopHandler extends CommandHandler { if (this.shouldDenyTaskHandling(validationResponse)) { return validationResponse } + + const authValidationResponse = await this.validateTokenOrSignature( + task.authorization, + task.consumerAddress, + task.nonce, + task.signature, + String(task.consumerAddress + (task.jobId || '')) + ) + if (authValidationResponse.status.httpStatus !== 200) { + return authValidationResponse + } + try { // split jobId (which is already in hash-jobId format) and get the hash // then get jobId which might contain dashes as well diff --git a/src/components/core/compute/utils.ts b/src/components/core/compute/utils.ts index 2cf973d22..5b92876b7 100644 --- a/src/components/core/compute/utils.ts +++ b/src/components/core/compute/utils.ts @@ -83,57 +83,44 @@ export async function validateAlgoForDataset( if (datasetService.type !== 'compute' || !compute) { throw new Error('Service not compute') } + const publishers = compute.publisherTrustedAlgorithmPublishers || [] + const algorithms = compute.publisherTrustedAlgorithms || [] + + // If no restrictions are set, deny by default + const hasTrustedPublishers = publishers.length > 0 + const hasTrustedAlgorithms = algorithms.length > 0 + if (!hasTrustedPublishers && !hasTrustedAlgorithms) return false if (algoDID) { - if ( - // if not set deny them all - (!Array.isArray(compute.publisherTrustedAlgorithms) || - compute.publisherTrustedAlgorithms.length === 0) && - (!Array.isArray(compute.publisherTrustedAlgorithmPublishers) || - compute.publisherTrustedAlgorithmPublishers.length === 0) - ) { - return false - } + // Check if algorithm is explicitly trusted + const isAlgoTrusted = + hasTrustedAlgorithms && + algorithms.some((algo: any) => { + const didMatch = algo.did === '*' || algo.did === algoDID + const filesMatch = + algo.filesChecksum === '*' || algo.filesChecksum === algoChecksums.files + const containerMatch = + algo.containerSectionChecksum === '*' || + algo.containerSectionChecksum === algoChecksums.container + return didMatch && filesMatch && containerMatch + }) - if ( - compute.publisherTrustedAlgorithms.includes('*') && - compute.publisherTrustedAlgorithmPublishers.includes('*') - ) { - return true - } + // Check if algorithm publisher is trusted + let isPublisherTrusted = true + if (hasTrustedPublishers) { + if (!publishers.includes('*')) { + const algoDDO = await new FindDdoHandler(oceanNode).findAndFormatDdo(algoDID) + if (!algoDDO) return false + const algoInstance = DDOManager.getDDOClass(algoDDO) + const { nftAddress } = algoInstance.getDDOFields() - if ( - Array.isArray(compute.publisherTrustedAlgorithms) && - compute.publisherTrustedAlgorithms.length > 0 && - !compute.publisherTrustedAlgorithms.includes('*') - ) { - const trustedAlgo = compute.publisherTrustedAlgorithms.find( - (algo: any) => algo.did === algoDID - ) - if (trustedAlgo) { - return ( - trustedAlgo.filesChecksum === algoChecksums.files && - trustedAlgo.containerSectionChecksum === algoChecksums.container - ) - } - return false - } - if ( - Array.isArray(compute.publisherTrustedAlgorithmPublishers) && - compute.publisherTrustedAlgorithmPublishers.length > 0 && - !compute.publisherTrustedAlgorithmPublishers.includes('*') - ) { - const algoDDO = await new FindDdoHandler(oceanNode).findAndFormatDdo(algoDID) - const algoInstance = DDOManager.getDDOClass(algoDDO) - const { nftAddress } = algoInstance.getDDOFields() - if (algoDDO) { - return compute.publisherTrustedAlgorithmPublishers - .map((address: string) => address?.toLowerCase()) + isPublisherTrusted = publishers + .map((addr: string) => addr?.toLowerCase()) .includes(nftAddress?.toLowerCase()) } - return false } - return true + + return isAlgoTrusted && isPublisherTrusted } return compute.allowRawAlgorithm diff --git a/src/components/core/handler/authHandler.ts b/src/components/core/handler/authHandler.ts index b1400a10f..8e960c14f 100644 --- a/src/components/core/handler/authHandler.ts +++ b/src/components/core/handler/authHandler.ts @@ -31,7 +31,6 @@ export class CreateAuthTokenHandler extends CommandHandler { async handle(task: CreateAuthTokenCommand): Promise { const { address, nonce, signature } = task const nonceDb = this.getOceanNode().getDatabase().nonce - const auth = this.getOceanNode().getAuth() const validationResponse = await this.verifyParamsAndRateLimits(task) if (this.shouldDenyTaskHandling(validationResponse)) { return validationResponse @@ -43,7 +42,7 @@ export class CreateAuthTokenHandler extends CommandHandler { address, parseInt(nonce), signature, - auth.getMessage(address, nonce) + String(address + nonce) ) if (!nonceCheckResult.valid) { @@ -83,7 +82,6 @@ export class InvalidateAuthTokenHandler extends CommandHandler { async handle(task: InvalidateAuthTokenCommand): Promise { const { address, nonce, signature, token } = task const nonceDb = this.getOceanNode().getDatabase().nonce - const auth = this.getOceanNode().getAuth() const validationResponse = await this.verifyParamsAndRateLimits(task) if (this.shouldDenyTaskHandling(validationResponse)) { return validationResponse @@ -95,7 +93,7 @@ export class InvalidateAuthTokenHandler extends CommandHandler { address, parseInt(nonce), signature, - auth.getMessage(address, nonce) + String(address + nonce) ) if (!isValid) { return { diff --git a/src/components/core/handler/ddoHandler.ts b/src/components/core/handler/ddoHandler.ts index b7937f8bb..6b9a1851e 100644 --- a/src/components/core/handler/ddoHandler.ts +++ b/src/components/core/handler/ddoHandler.ts @@ -42,7 +42,6 @@ import { Asset, DDO, DDOManager } from '@oceanprotocol/ddo-js' import { checkCredentialOnAccessList } from '../../../utils/credentials.js' import { createHash } from 'crypto' import { Storage } from '../../../components/storage/index.js' -import { ValidateTokenOrSignature } from '../../../utils/decorators/validate-token.decorator.js' const MAX_NUM_PROVIDERS = 5 // after 60 seconds it returns whatever info we have available @@ -825,13 +824,26 @@ export class ValidateDDOHandler extends CommandHandler { return validation } - // Skip validation if allowed by env variable - @ValidateTokenOrSignature(skipValidation) async handle(task: ValidateDDOCommand): Promise { const validationResponse = await this.verifyParamsAndRateLimits(task) + const shouldSkipValidation = await skipValidation() + if (!shouldSkipValidation) { + const validationResponse = await this.validateTokenOrSignature( + task.authorization, + task.publisherAddress, + task.nonce, + task.signature, + String(task.publisherAddress + task.nonce) + ) + if (validationResponse.status.httpStatus !== 200) { + return validationResponse + } + } + if (this.shouldDenyTaskHandling(validationResponse)) { return validationResponse } + try { const ddoInstance = DDOManager.getDDOClass(task.ddo) const validation = await ddoInstance.validate() diff --git a/src/components/core/handler/downloadHandler.ts b/src/components/core/handler/downloadHandler.ts index 2c3508ad6..118fabae0 100644 --- a/src/components/core/handler/downloadHandler.ts +++ b/src/components/core/handler/downloadHandler.ts @@ -1,5 +1,4 @@ import { CommandHandler } from './handler.js' -import { checkNonce, NonceResponse } from '../utils/nonceHandler.js' import { ENVIRONMENT_VARIABLES, MetadataStates, @@ -213,16 +212,24 @@ export class DownloadHandler extends CommandHandler { 'fileIndex', 'documentId', 'serviceId', - 'transferTxId', - 'nonce', - 'consumerAddress', - 'signature' + 'transferTxId' ]) } // No encryption here yet async handle(task: DownloadCommand): Promise { const validationResponse = await this.verifyParamsAndRateLimits(task) + const isAuthRequestValid = await this.validateTokenOrSignature( + task.authorization, + task.consumerAddress, + task.nonce, + task.signature, + String(task.documentId + task.nonce) + ) + if (isAuthRequestValid.status.httpStatus !== 200) { + return isAuthRequestValid + } + if (this.shouldDenyTaskHandling(validationResponse)) { return validationResponse } @@ -314,29 +321,6 @@ export class DownloadHandler extends CommandHandler { } } - // 3. Validate nonce and signature - const nonceCheckResult: NonceResponse = await checkNonce( - this.getOceanNode().getDatabase().nonce, - task.consumerAddress, - parseInt(task.nonce), - task.signature, - String(ddo.id + task.nonce) - ) - - if (!nonceCheckResult.valid) { - CORE_LOGGER.logMessage( - 'Invalid nonce or signature, unable to proceed with download: ' + - nonceCheckResult.error, - true - ) - return { - stream: null, - status: { - httpStatus: 500, - error: nonceCheckResult.error - } - } - } // from now on, we need blockchain checks const config = await getConfiguration() const { rpc, network, chainId, fallbackRPCs } = config.supportedNetworks[ddoChainId] diff --git a/src/components/core/handler/handler.ts b/src/components/core/handler/handler.ts index 22def5572..762f22c15 100644 --- a/src/components/core/handler/handler.ts +++ b/src/components/core/handler/handler.ts @@ -167,4 +167,33 @@ export abstract class CommandHandler status: { httpStatus: 200, error: null } } } + + async validateTokenOrSignature( + authToken: string, + address: string, + nonce: string, + signature: string, + message: string + ): Promise { + const oceanNode = this.getOceanNode() + const auth = oceanNode.getAuth() + const isAuthRequestValid = await auth.validateAuthenticationOrToken({ + token: authToken, + address, + nonce, + signature, + message + }) + if (!isAuthRequestValid.valid) { + return { + stream: null, + status: { httpStatus: 401, error: isAuthRequestValid.error } + } + } + + return { + stream: null, + status: { httpStatus: 200 } + } + } } diff --git a/src/components/database/ElasticSearchDatabase.ts b/src/components/database/ElasticSearchDatabase.ts index 327dafced..4f93d2e24 100644 --- a/src/components/database/ElasticSearchDatabase.ts +++ b/src/components/database/ElasticSearchDatabase.ts @@ -367,7 +367,6 @@ export class ElasticsearchOrderDatabase extends AbstractOrderDatabase { did: string, startOrderId?: string ) { - DATABASE_LOGGER.info(`index: ${this.getSchema().index}`) try { const document = { orderId, @@ -380,8 +379,6 @@ export class ElasticsearchOrderDatabase extends AbstractOrderDatabase { did, startOrderId } - DATABASE_LOGGER.info(`document: ${JSON.stringify(document)}`) - DATABASE_LOGGER.info(`orderid: ${orderId}`) await this.provider.index({ index: this.getSchema().index, diff --git a/src/components/database/sqliteCompute.ts b/src/components/database/sqliteCompute.ts index a4c6bc1dc..2eba47501 100644 --- a/src/components/database/sqliteCompute.ts +++ b/src/components/database/sqliteCompute.ts @@ -326,7 +326,7 @@ export class SQLiteCompute implements ComputeDatabaseProvider { }) resolve(filtered) } else { - DATABASE_LOGGER.info('Could not find any running C2D jobs!') + // DATABASE_LOGGER.info('Could not find any running C2D jobs!') resolve([]) } } diff --git a/src/components/httpRoutes/compute.ts b/src/components/httpRoutes/compute.ts index 553e1d1dd..da87c9519 100644 --- a/src/components/httpRoutes/compute.ts +++ b/src/components/httpRoutes/compute.ts @@ -76,7 +76,8 @@ computeRoutes.post(`${SERVICES_API_BASE_PATH}/compute`, async (req, res) => { algorithm: (req.body.algorithm as ComputeAlgorithm) || null, datasets: (req.body.datasets as unknown as ComputeAsset[]) || null, payment: (req.body.payment as unknown as ComputePayment) || null, - resources: (req.body.resources as unknown as ComputeResourceRequest[]) || null + resources: (req.body.resources as unknown as ComputeResourceRequest[]) || null, + authorization: req.headers?.authorization } if (req.body.output) { startComputeTask.output = req.body.output as ComputeOutput @@ -107,7 +108,6 @@ computeRoutes.post(`${SERVICES_API_BASE_PATH}/freeCompute`, async (req, res) => )}`, true ) - const startComputeTask: FreeComputeStartCommand = { command: PROTOCOL_COMMANDS.FREE_COMPUTE_START, node: (req.body.node as string) || null, @@ -118,7 +118,8 @@ computeRoutes.post(`${SERVICES_API_BASE_PATH}/freeCompute`, async (req, res) => algorithm: (req.body.algorithm as ComputeAlgorithm) || null, datasets: (req.body.datasets as unknown as ComputeAsset[]) || null, resources: (req.body.resources as unknown as ComputeResourceRequest[]) || null, - maxJobDuration: req.body.maxJobDuration || null + maxJobDuration: req.body.maxJobDuration || null, + authorization: req.headers?.authorization } if (req.body.output) { startComputeTask.output = req.body.output as ComputeOutput @@ -207,7 +208,8 @@ computeRoutes.get(`${SERVICES_API_BASE_PATH}/computeResult`, async (req, res) => index: req.query.index ? Number(req.query.index) : null, // can't be parseInt() because that excludes index 0 jobId: (req.query.jobId as string) || null, signature: (req.query.signature as string) || null, - nonce: (req.query.nonce as string) || null + nonce: (req.query.nonce as string) || null, + authorization: req.headers?.authorization } const response = await new ComputeGetResultHandler(req.oceanNode).handle( @@ -235,13 +237,15 @@ computeRoutes.get(`${SERVICES_API_BASE_PATH}/computeStreamableLogs`, async (req, )}`, true ) + const resultComputeTask: ComputeGetStreamableLogsCommand = { command: PROTOCOL_COMMANDS.COMPUTE_GET_STREAMABLE_LOGS, node: (req.query.node as string) || null, consumerAddress: (req.query.consumerAddress as string) || null, jobId: (req.query.jobId as string) || null, signature: (req.query.signature as string) || null, - nonce: (req.query.nonce as string) || null + nonce: (req.query.nonce as string) || null, + authorization: req.headers?.authorization } const response = await new ComputeGetStreamableLogsHandler(req.oceanNode).handle( diff --git a/src/components/httpRoutes/provider.ts b/src/components/httpRoutes/provider.ts index b17bd9033..73830570d 100644 --- a/src/components/httpRoutes/provider.ts +++ b/src/components/httpRoutes/provider.ts @@ -200,6 +200,7 @@ providerRoutes.get( `Download request received: ${JSON.stringify(req.query)}`, true ) + const authorization = req.headers?.authorization try { const { fileIndex, @@ -220,7 +221,8 @@ providerRoutes.get( consumerAddress: consumerAddress as string, signature: signature as string, command: PROTOCOL_COMMANDS.DOWNLOAD, - policyServer: req.query.policyServer || null + policyServer: req.query.policyServer || null, + authorization: authorization as string } const response = await new DownloadHandler(req.oceanNode).handle(downloadTask) diff --git a/src/test/integration/algorithmsAccess.test.ts b/src/test/integration/algorithmsAccess.test.ts index d1b725629..f18c6a43b 100644 --- a/src/test/integration/algorithmsAccess.test.ts +++ b/src/test/integration/algorithmsAccess.test.ts @@ -270,7 +270,9 @@ describe('Trusted algorithms Flow', () => { it('should not start a compute job because algorithm is not trusted by dataset', async () => { let balance = await paymentTokenContract.balanceOf(await consumerAccount.getAddress()) const nonce = Date.now().toString() - const message = String(nonce) + const message = String( + (await consumerAccount.getAddress()) + publishedComputeDataset.ddo.id + nonce + ) // sign message/nonce const consumerMessage = ethers.solidityPackedKeccak256( ['bytes'], @@ -405,8 +407,9 @@ describe('Trusted algorithms Flow', () => { } } const nonce = Date.now().toString() - const message = String(nonce) - // sign message/nonce + const message = String( + (await consumerAccount.getAddress()) + publishedComputeDataset.ddo.id + nonce + ) const consumerMessage = ethers.solidityPackedKeccak256( ['bytes'], [ethers.hexlify(ethers.toUtf8Bytes(message))] diff --git a/src/test/integration/auth.test.ts b/src/test/integration/auth.test.ts index 417247ca1..2df34ae08 100644 --- a/src/test/integration/auth.test.ts +++ b/src/test/integration/auth.test.ts @@ -1,6 +1,5 @@ import { JsonRpcProvider, Signer, Wallet } from 'ethers' import { Database } from '../../components/database/index.js' -import { Auth } from '../../components/Auth/index.js' import { getConfiguration, getMessageHash } from '../../utils/index.js' import { DEFAULT_TEST_TIMEOUT, @@ -27,7 +26,6 @@ import { ValidateDDOHandler } from '../../components/core/handler/ddoHandler.js' describe('Auth Token Integration Tests', () => { let config: OceanNodeConfig let database: Database - let auth: Auth let provider: JsonRpcProvider let consumerAccount: Signer let previousConfiguration: OverrideEnvConfig[] @@ -50,7 +48,6 @@ describe('Auth Token Integration Tests', () => { config = await getConfiguration(true) database = await new Database(config.dbConfig) - auth = new Auth(database.authToken) oceanNode = await OceanNode.getInstance(config, database) provider = new JsonRpcProvider(mockSupportedNetworks['8996'].rpc) @@ -119,7 +116,7 @@ describe('Auth Token Integration Tests', () => { const consumerAddress = await consumerAccount.getAddress() const nonce = getRandomNonce() - const message = auth.getMessage(consumerAddress, nonce) + const message = String(consumerAddress + nonce) const messageHash = getMessageHash(message) const signature = await consumerAccount.signMessage(messageHash) @@ -140,7 +137,7 @@ describe('Auth Token Integration Tests', () => { const consumerAddress = await consumerAccount.getAddress() const nonce = getRandomNonce() - const message = auth.getMessage(consumerAddress, nonce) + const message = String(consumerAddress + nonce) const messageHash = getMessageHash(message) const signature = await consumerAccount.signMessage(messageHash) @@ -166,7 +163,7 @@ describe('Auth Token Integration Tests', () => { const consumerAddress = await consumerAccount.getAddress() const nonce = getRandomNonce() - const message = auth.getMessage(consumerAddress, nonce) + const message = String(consumerAddress + nonce) const messageHash = getMessageHash(message) const signature = await consumerAccount.signMessage(messageHash) @@ -228,7 +225,7 @@ describe('Auth Token Integration Tests', () => { // Missing address const nonce = getRandomNonce() - const message = auth.getMessage(await consumerAccount.getAddress(), nonce) + const message = String((await consumerAccount.getAddress()) + nonce) const messageHash = getMessageHash(message) const signature = await consumerAccount.signMessage(messageHash) diff --git a/src/test/integration/compute.test.ts b/src/test/integration/compute.test.ts index a9a9ab6fb..bf2d3599d 100644 --- a/src/test/integration/compute.test.ts +++ b/src/test/integration/compute.test.ts @@ -583,7 +583,9 @@ describe('Compute', () => { it('should fail to start a compute job', async () => { const nonce = Date.now().toString() - const message = String(nonce) + const message = String( + (await consumerAccount.getAddress()) + publishedComputeDataset.ddo.id + nonce + ) // sign message/nonce const consumerMessage = ethers.solidityPackedKeccak256( ['bytes'], @@ -679,7 +681,9 @@ describe('Compute', () => { } const locksBefore = locks.length const nonce = Date.now().toString() - const message = String(nonce) + const message = String( + (await consumerAccount.getAddress()) + publishedComputeDataset.ddo.id + nonce + ) // sign message/nonce const consumerMessage = ethers.solidityPackedKeccak256( ['bytes'], @@ -759,7 +763,21 @@ describe('Compute', () => { BigInt(auth[0].maxLockCounts.toString()) > BigInt(0), ' Should have maxLockCounts in auth' ) - response = await new PaidComputeStartHandler(oceanNode).handle(startComputeTask) + const nonce2 = Date.now().toString() + const message2 = String( + (await consumerAccount.getAddress()) + publishedComputeDataset.ddo.id + nonce2 + ) + const consumerMessage2 = ethers.solidityPackedKeccak256( + ['bytes'], + [ethers.hexlify(ethers.toUtf8Bytes(message2))] + ) + const messageHashBytes2 = ethers.toBeArray(consumerMessage2) + const signature2 = await wallet.signMessage(messageHashBytes2) + response = await new PaidComputeStartHandler(oceanNode).handle({ + ...startComputeTask, + nonce: nonce2, + signature: signature2 + }) assert(response, 'Failed to get response') assert(response.status.httpStatus === 200, 'Failed to get 200 response') assert(response.stream, 'Failed to get stream') @@ -927,7 +945,7 @@ describe('Compute', () => { }) it('should stop a compute job', async () => { const nonce = Date.now().toString() - const message = String(nonce) + const message = String((await consumerAccount.getAddress()) + (jobId || '')) // sign message/nonce const consumerMessage = ethers.solidityPackedKeccak256( ['bytes'], @@ -953,12 +971,8 @@ describe('Compute', () => { const command: FreeComputeStartCommand = freeComputeStartPayload const handler = new FreeComputeStartHandler(oceanNode) const response = await handler.handle(command) - assert(response.status.httpStatus === 500, 'Failed to get 500 response') + assert(response.status.httpStatus === 401, 'Failed to get 401 response') assert(response.stream === null, 'Should not get stream') - assert( - response.status.error.includes('Invalid nonce or signature'), - 'Should have signature error' - ) }) it('should deny the Free job due to bad container image (directCommand payload)', async function () { const nonce = Date.now().toString() diff --git a/src/test/integration/download.test.ts b/src/test/integration/download.test.ts index d1f148a2d..64b33c78c 100644 --- a/src/test/integration/download.test.ts +++ b/src/test/integration/download.test.ts @@ -292,15 +292,24 @@ describe('Should run a complete node flow.', () => { }) it('should not allow to download the asset with different consumer address', async function () { const assetDID = publishedDataset.ddo.id + const nonce = Date.now().toString() + const message = String(assetDID + nonce) + const consumerMessage = ethers.solidityPackedKeccak256( + ['bytes'], + [ethers.hexlify(ethers.toUtf8Bytes(message))] + ) + const messageHashBytes = ethers.toBeArray(consumerMessage) + const signature = await anotherConsumer.signMessage(messageHashBytes) + const doCheck = async () => { const downloadTask = { fileIndex: 0, documentId: assetDID, serviceId, transferTxId: orderTxId, - nonce: Date.now().toString(), + nonce, consumerAddress: await anotherConsumer.getAddress(), - signature: '0xBE5449a6', + signature, command: PROTOCOL_COMMANDS.DOWNLOAD } const response = await new DownloadHandler(oceanNode).handle(downloadTask) diff --git a/src/test/unit/commands.test.ts b/src/test/unit/commands.test.ts index 66c659194..d0c79cf23 100644 --- a/src/test/unit/commands.test.ts +++ b/src/test/unit/commands.test.ts @@ -86,7 +86,7 @@ describe('Commands and handlers', () => { command: PROTOCOL_COMMANDS.DOWNLOAD } expect(downloadHandler.validate(downloadCommand).valid).to.be.equal(true) - downloadCommand.nonce = null + downloadCommand.documentId = undefined expect(downloadHandler.validate(downloadCommand).valid).to.be.equal(false) // ----------------------------------------- // DecryptDDOHandler diff --git a/src/test/unit/compute.test.ts b/src/test/unit/compute.test.ts index 9583efac8..5cb42c1fd 100644 --- a/src/test/unit/compute.test.ts +++ b/src/test/unit/compute.test.ts @@ -8,7 +8,7 @@ import { ComputeAlgorithm, ComputeAsset, // ComputeEnvironment, - ComputeJob, + // ComputeJob, DBComputeJob, RunningPlatform } from '../../@types/C2D/C2D.js' @@ -28,7 +28,9 @@ import { } from '../utils/utils.js' import { OceanNodeConfig } from '../../@types/OceanNode.js' import { ENVIRONMENT_VARIABLES } from '../../utils/constants.js' +// eslint-disable-next-line no-unused-vars import { completeDBComputeJob, dockerImageManifest } from '../data/assets.js' +// eslint-disable-next-line no-unused-vars import { omitDBComputeFieldsFromComputeJob } from '../../components/c2d/index.js' import { checkManifestPlatform } from '../../components/c2d/compute_engine_docker.js' @@ -196,30 +198,30 @@ describe('Compute Jobs Database', () => { expect(convertStringToArray(str)).to.deep.equal(expectedArray) }) - it('should convert DBComputeJob to ComputeJob and omit internal DB data', () => { - const source: any = completeDBComputeJob - const output: ComputeJob = omitDBComputeFieldsFromComputeJob(source as DBComputeJob) + // it('should convert DBComputeJob to ComputeJob and omit internal DB data', () => { + // const source: any = completeDBComputeJob + // const output: ComputeJob = omitDBComputeFieldsFromComputeJob(source as DBComputeJob) - expect(Object.prototype.hasOwnProperty.call(output, 'clusterHash')).to.be.equal(false) - expect(Object.prototype.hasOwnProperty.call(output, 'configlogURL')).to.be.equal( - false - ) - expect(Object.prototype.hasOwnProperty.call(output, 'publishlogURL')).to.be.equal( - false - ) - expect(Object.prototype.hasOwnProperty.call(output, 'algologURL')).to.be.equal(false) - expect(Object.prototype.hasOwnProperty.call(output, 'outputsURL')).to.be.equal(false) - expect(Object.prototype.hasOwnProperty.call(output, 'stopRequested')).to.be.equal( - false - ) - expect(Object.prototype.hasOwnProperty.call(output, 'algorithm')).to.be.equal(false) - expect(Object.prototype.hasOwnProperty.call(output, 'assets')).to.be.equal(false) - expect(Object.prototype.hasOwnProperty.call(output, 'isRunning')).to.be.equal(false) - expect(Object.prototype.hasOwnProperty.call(output, 'isStarted')).to.be.equal(false) - expect(Object.prototype.hasOwnProperty.call(output, 'containerImage')).to.be.equal( - false - ) - }) + // expect(Object.prototype.hasOwnProperty.call(output, 'clusterHash')).to.be.equal(false) + // expect(Object.prototype.hasOwnProperty.call(output, 'configlogURL')).to.be.equal( + // false + // ) + // expect(Object.prototype.hasOwnProperty.call(output, 'publishlogURL')).to.be.equal( + // false + // ) + // expect(Object.prototype.hasOwnProperty.call(output, 'algologURL')).to.be.equal(false) + // expect(Object.prototype.hasOwnProperty.call(output, 'outputsURL')).to.be.equal(false) + // expect(Object.prototype.hasOwnProperty.call(output, 'stopRequested')).to.be.equal( + // false + // ) + // expect(Object.prototype.hasOwnProperty.call(output, 'algorithm')).to.be.equal(false) + // expect(Object.prototype.hasOwnProperty.call(output, 'assets')).to.be.equal(false) + // expect(Object.prototype.hasOwnProperty.call(output, 'isRunning')).to.be.equal(false) + // expect(Object.prototype.hasOwnProperty.call(output, 'isStarted')).to.be.equal(false) + // expect(Object.prototype.hasOwnProperty.call(output, 'containerImage')).to.be.equal( + // false + // ) + // }) it('should check manifest platform against local platform env', () => { const arch = os.machine() // ex: arm diff --git a/src/test/unit/indexer/validation.test.ts b/src/test/unit/indexer/validation.test.ts index aa7edbb21..fb309141e 100644 --- a/src/test/unit/indexer/validation.test.ts +++ b/src/test/unit/indexer/validation.test.ts @@ -158,10 +158,9 @@ describe('Schema validation tests', () => { const handler = new ValidateDDOHandler(oceanNode) const ddoInstance = DDOManager.getDDOClass(ddoValidationSignature) const ddo = ddoInstance.getDDOData() as DDO - const auth = oceanNode.getAuth() const publisherAddress = await wallet.getAddress() const nonce = Date.now().toString() - const message = auth.getMessage(publisherAddress, nonce) + const message = String(publisherAddress + nonce) const messageHash = ethers.solidityPackedKeccak256( ['bytes'], [ethers.hexlify(ethers.toUtf8Bytes(message))] diff --git a/src/utils/decorators/validate-token.decorator.ts b/src/utils/decorators/validate-token.decorator.ts deleted file mode 100644 index 16b0b60a1..000000000 --- a/src/utils/decorators/validate-token.decorator.ts +++ /dev/null @@ -1,61 +0,0 @@ -import { P2PCommandResponse } from '../../@types' - -/** - * This decorator validates the token or signature of the request - * You can use it by adding @ValidateTokenOrSignature above the handler method - * @param skipValidation - If true, the validation will be skipped. You can also pass a function that returns a boolean. - */ -export function ValidateTokenOrSignature( - skipValidation?: boolean | (() => Promise) -) { - return function ( - _target: Object, - _propertyKey: string | symbol, - descriptor: TypedPropertyDescriptor<(...args: any[]) => Promise> - ) { - const originalMethod = descriptor.value - - descriptor.value = async function (...args: any[]): Promise { - let shouldSkip = skipValidation - if (typeof skipValidation === 'function') { - shouldSkip = await skipValidation() - } - - if (shouldSkip) { - return originalMethod.apply(this, args) - } - - const task = args[0] - const { authorization, signature, nonce } = task - const address = task.address || task.publisherAddress - const jwt = authorization?.includes('Bearer') - ? authorization.split(' ')[1] - : authorization - const oceanNode = this.getOceanNode() - - const auth = oceanNode.getAuth() - const isAuthRequestValid = await auth.validateAuthenticationOrToken({ - token: jwt, - signature, - nonce, - address - }) - if (!isAuthRequestValid.valid) { - console.log( - `Error validating token or signature while executing command: ${task.command}` - ) - return { - status: { - httpStatus: 401, - error: 'Invalid token or signature' - }, - stream: null - } - } - - return await originalMethod.apply(this, args) - } - - return descriptor - } -}