diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4a8445603..e655fa42b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -260,7 +260,7 @@ jobs: DB_TYPE: 'elasticsearch' MAX_REQ_PER_MINUTE: 320 MAX_CONNECTIONS_PER_MINUTE: 320 - DOCKER_COMPUTE_ENVIRONMENTS: '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":1000000000}],"storageExpiry":604800,"maxJobDuration":3600,"fees":{"8996":[{"prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1000000000},{"id":"disk","max":1000000000}]}}]' + DOCKER_COMPUTE_ENVIRONMENTS: '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"storageExpiry":604800,"maxJobDuration":3600,"fees":{"8996":[{"prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]' - name: Check Ocean Node is running run: | for i in $(seq 1 90); do diff --git a/README.md b/README.md index 701b7f856..705261d69 100644 --- a/README.md +++ b/README.md @@ -58,6 +58,14 @@ export PRIVATE_KEY="0x_your_private_key_here" The `PRIVATE_KEY` is the only mandatory environmental variable, you must include the `0x` at the front of your private key. Additional configurations can be set as needed. For all available configurations, refer to the [Environment Variables](docs/env.md) documentation. +2.1. If the config is a JSON file, run: + +``` +export CONFIG_PATH='' +``` + +Config file should be absolute path. + 3. Quick start the Ocean Node with PM2 ```bash diff --git a/docs/API.md b/docs/API.md index dda61000b..455b4aeaa 100644 --- a/docs/API.md +++ b/docs/API.md @@ -1242,6 +1242,7 @@ It can include information about the file object, document ID, service ID, trans - **transferTxId**: Optional. A string representing the transaction ID for the transfer of the compute algorithm. - **algocustomdata**: Optional. An object containing additional custom data related to the compute algorithm. - **userdata**: Optional. An object containing additional user-defined data related to the compute algorithm. +- **envs**: Optional. Array of keys:values to be used as environment variables for algo. ```typescript export interface ComputeAlgorithm { @@ -1337,7 +1338,7 @@ starts a free compute job and returns jobId if succesfull "algorithm": { "meta": { "container": { "image": "ubuntu", "entrypoint": "/bin/bash'" } } }, - "consumerAddress": "0xC7EC1970B09224B317c52d92f37F5e1E4fF6B687", + "consumerAddress": "0x00", "signature": "123", "nonce": 1, "environment": "0x7d187e4c751367be694497ead35e2937ece3c7f3b325dcb4f7571e5972d092bd-0xbeaf12703d708f39ef98c3d8939ce458553254176dbb69fe83d535883c4cee38", @@ -1351,7 +1352,7 @@ starts a free compute job and returns jobId if succesfull ```json [ { - "owner": "0xC7EC1970B09224B317c52d92f37F5e1E4fF6B687", + "owner": "0x00", "jobId": "0x7d187e4c751367be694497ead35e2937ece3c7f3b325dcb4f7571e5972d092bd-a4ad237d-dfd8-404c-a5d6-b8fc3a1f66d3", "dateCreated": "1742291065.119", "dateFinished": null, @@ -1395,7 +1396,7 @@ Required at least one of the following parameters: ```json [ { - "owner": "0xC7EC1970B09224B317c52d92f37F5e1E4fF6B687", + "owner": "0x00", "did": null, "jobId": "a4ad237d-dfd8-404c-a5d6-b8fc3a1f66d3", "dateCreated": "1742291065.119", diff --git a/docs/GPU.md b/docs/GPU.md index a83c742ef..bfcb01df7 100644 --- a/docs/GPU.md +++ b/docs/GPU.md @@ -82,7 +82,7 @@ Here is the full definition of DOCKER_COMPUTE_ENVIRONMENTS: } } }, - { "id": "disk", "total": 1000000000 } + { "id": "disk", "total": 1 } ], "storageExpiry": 604800, "maxJobDuration": 3600, @@ -102,8 +102,8 @@ Here is the full definition of DOCKER_COMPUTE_ENVIRONMENTS: "maxJobs": 3, "resources": [ { "id": "cpu", "max": 1 }, - { "id": "ram", "max": 1000000000 }, - { "id": "disk", "max": 1000000000 }, + { "id": "ram", "max": 1 }, + { "id": "disk", "max": 1 }, { "id": "myGPU", "max": 1 } ] } @@ -122,7 +122,7 @@ root@gpu-1:/repos/ocean/ocean-node# curl http://localhost:8000/api/services/comp { "id": "0xd6b10b27aab01a72070a5164c07d0517755838b9cb9857e2d5649287ec3aaaa2-0x66073c81f833deaa2f8e2a508f69cf78f8a99b17ba1a64f369af921750f93914", "runningJobs": 0, - "consumerAddress": "0x4fb80776C8eb4cAbe7730dcBCdb1fa6ecD3c460E", + "consumerAddress": "0x00", "platform": { "architecture": "x86_64", "os": "Ubuntu 22.04.3 LTS" }, "fees": { "1": [ @@ -141,9 +141,9 @@ root@gpu-1:/repos/ocean/ocean-node# curl http://localhost:8000/api/services/comp { "id": "cpu", "total": 8, "max": 8, "min": 1, "inUse": 0 }, { "id": "ram", - "total": 24888963072, - "max": 24888963072, - "min": 1000000000, + "total": 23, + "max": 23, + "min": 1, "inUse": 0 }, { @@ -162,15 +162,15 @@ root@gpu-1:/repos/ocean/ocean-node# curl http://localhost:8000/api/services/comp "min": 0, "inUse": 0 }, - { "id": "disk", "total": 1000000000, "max": 1000000000, "min": 0, "inUse": 0 } + { "id": "disk", "total": 1, "max": 1, "min": 0, "inUse": 0 } ], "free": { "maxJobDuration": 60, "maxJobs": 3, "resources": [ { "id": "cpu", "max": 1, "inUse": 0 }, - { "id": "ram", "max": 1000000000, "inUse": 0 }, - { "id": "disk", "max": 1000000000, "inUse": 0 }, + { "id": "ram", "max": 1, "inUse": 0 }, + { "id": "disk", "max": 1, "inUse": 0 }, { "id": "myGPU", "max": 1, "inUse": 0 } ] }, @@ -194,7 +194,7 @@ Start a free job using: "rawcode": "import tensorflow as tf\nsess = tf.compat.v1.Session(config=tf.compat.v1.ConfigProto(log_device_placement=True))\nprint(\"Num GPUs Available: \", len(tf.config.list_physical_devices('GPU')))\ngpus = tf.config.list_physical_devices('GPU')\nfor gpu in gpus:\n\tprint('Name:', gpu.name, ' Type:', gpu.device_type)" } }, - "consumerAddress": "0xC7EC1970B09224B317c52d92f37F5e1E4fF6B687", + "consumerAddress": "0x00", "signature": "123", "nonce": 1, "environment": "0xd6b10b27aab01a72070a5164c07d0517755838b9cb9857e2d5649287ec3aaaa2-0x66073c81f833deaa2f8e2a508f69cf78f8a99b17ba1a64f369af921750f93914", @@ -259,7 +259,7 @@ Then define DOCKER_COMPUTE_ENVIRONMENTS with }, { "id": "disk", - "total": 1000000000 + "total": 1 } ], "storageExpiry": 604800, @@ -291,11 +291,11 @@ Then define DOCKER_COMPUTE_ENVIRONMENTS with }, { "id": "ram", - "max": 1000000000 + "max": 1 }, { "id": "disk", - "max": 1000000000 + "max": 1 }, { "id": "myGPU", @@ -311,7 +311,7 @@ aka ```bash export DOCKER_COMPUTE_ENVIRONMENTS="[{\"socketPath\":\"/var/run/docker.sock\",\"resources\":[{\"id\":\"myGPU\",\"description\":\"AMD Radeon RX 9070 XT\",\"type\":\"gpu\",\"total\":1,\"init\":{\"advanced\":{ -\"IpcMode\":\"host\",\"CapAdd\":[\"CAP_SYS_PTRACE\"],\"Devices\":[\"/dev/dxg\",\"/dev/dri/card0\"],\"Binds\":[\"/usr/lib/wsl/lib/libdxcore.so:/usr/lib/libdxcore.so\",\"/opt/rocm/lib/libhsa-runtime64.so.1:/opt/rocm/lib/libhsa-runtime64.so.1\"],\"SecurityOpt\":{\"seccomp\":\"unconfined\"}}}},{\"id\":\"disk\",\"total\":1000000000}],\"storageExpiry\":604800,\"maxJobDuration\":3600,\"fees\":{\"1\":[{\"feeToken\":\"0x123\",\"prices\":[{\"id\":\"cpu\",\"price\":1},{\"id\":\"nyGPU\",\"price\":3}]}]},\"free\":{\"maxJobDuration\":60,\"maxJobs\":3,\"resources\":[{\"id\":\"cpu\",\"max\":1},{\"id\":\"ram\",\"max\":1000000000},{\"id\":\"disk\",\"max\":1000000000},{\"id\":\"myGPU\",\"max\":1}]}}]" +\"IpcMode\":\"host\",\"CapAdd\":[\"CAP_SYS_PTRACE\"],\"Devices\":[\"/dev/dxg\",\"/dev/dri/card0\"],\"Binds\":[\"/usr/lib/wsl/lib/libdxcore.so:/usr/lib/libdxcore.so\",\"/opt/rocm/lib/libhsa-runtime64.so.1:/opt/rocm/lib/libhsa-runtime64.so.1\"],\"SecurityOpt\":{\"seccomp\":\"unconfined\"}}}},{\"id\":\"disk\",\"total\":10}],\"storageExpiry\":604800,\"maxJobDuration\":3600,\"fees\":{\"1\":[{\"feeToken\":\"0x123\",\"prices\":[{\"id\":\"cpu\",\"price\":1},{\"id\":\"nyGPU\",\"price\":3}]}]},\"free\":{\"maxJobDuration\":60,\"maxJobs\":3,\"resources\":[{\"id\":\"cpu\",\"max\":1},{\"id\":\"ram\",\"max\":1},{\"id\":\"disk\",\"max\":1},{\"id\":\"myGPU\",\"max\":1}]}}]" ``` you should have it in your compute envs: @@ -325,7 +325,7 @@ root@gpu-1:/repos/ocean/ocean-node# curl http://localhost:8000/api/services/comp { "id": "0xbb5773e734e1b188165dac88d9a3dc8ac28bc9f5624b45fa8bbd8fca043de7c1-0x2c2761f938cf186eeb81f71dee06ad7edb299493e39c316c390d0c0691e6585c", "runningJobs": 0, - "consumerAddress": "0x4fb80776C8eb4cAbe7730dcBCdb1fa6ecD3c460E", + "consumerAddress": "0x00", "platform": { "architecture": "x86_64", "os": "Ubuntu 24.04.2 LTS" @@ -359,9 +359,9 @@ root@gpu-1:/repos/ocean/ocean-node# curl http://localhost:8000/api/services/comp }, { "id": "ram", - "total": 33617674240, - "max": 33617674240, - "min": 1000000000, + "total": 31, + "max": 31, + "min": 1, "inUse": 0 }, { @@ -389,8 +389,8 @@ root@gpu-1:/repos/ocean/ocean-node# curl http://localhost:8000/api/services/comp }, { "id": "disk", - "total": 1000000000, - "max": 1000000000, + "total": 10, + "max": 10, "min": 0, "inUse": 0 } @@ -406,12 +406,12 @@ root@gpu-1:/repos/ocean/ocean-node# curl http://localhost:8000/api/services/comp }, { "id": "ram", - "max": 1000000000, + "max": 1, "inUse": 0 }, { "id": "disk", - "max": 1000000000, + "max": 1, "inUse": 0 }, { @@ -450,7 +450,7 @@ Start a free job with "rawcode": "import tensorflow as tf\nsess = tf.compat.v1.Session(config=tf.compat.v1.ConfigProto(log_device_placement=True))\nprint(\"Num GPUs Available: \", len(tf.config.list_physical_devices('GPU')))\ngpus = tf.config.list_physical_devices('GPU')\nfor gpu in gpus:\n\tprint('Name:', gpu.name, ' Type:', gpu.device_type)" } }, - "consumerAddress": "0xC7EC1970B09224B317c52d92f37F5e1E4fF6B687", + "consumerAddress": "0x00", "signature": "123", "nonce": 1, "environment": "0xbb5773e734e1b188165dac88d9a3dc8ac28bc9f5624b45fa8bbd8fca043de7c1-0x2c2761f938cf186eeb81f71dee06ad7edb299493e39c316c390d0c0691e6585c", diff --git a/docs/env.md b/docs/env.md index 70e35ad35..6b1f59006 100644 --- a/docs/env.md +++ b/docs/env.md @@ -5,6 +5,7 @@ Environmental variables are also tracked in `ENVIRONMENT_VARIABLES` within `src/ ## Core - `PRIVATE_KEY` (Required): The private key for the node, required for node operations. Example: `"0x1d751ded5a32226054cd2e71261039b65afb9ee1c746d055dd699b1150a5befc"` +- `CONFIG_PATH`: Absolute path to JSON config file - `RPCS`: JSON object defining RPC endpoints for various networks. Example: `"{ \"11155420\":{ \"rpc\":\"https://sepolia.optimism.io\", \"fallbackRPCs\": [\"https://public.stackup.sh/api/v1/node/optimism-sepolia\"], \"chainId\": 11155420, \"network\": \"optimism-sepolia\", \"chunkSize\": 1000 }}"` - `DB_URL`: URL for connecting to the database. Required for running a database with the node. Example: `"http://localhost:8108/?apiKey=xyz"` - `IPFS_GATEWAY`: The gateway URL for IPFS, used for downloading files from IPFS. Example: `"https://ipfs.io/"` @@ -123,6 +124,8 @@ The `DOCKER_COMPUTE_ENVIRONMENTS` environment variable is used to configure Dock Example Configuration The `DOCKER_COMPUTE_ENVIRONMENTS` environment variable should be a JSON array of objects, where each object represents a Docker compute environment configuration. Below is an example configuration: +`Disk` and `Ram` resources are always expressed in GB. + ```json [ { @@ -130,7 +133,7 @@ The `DOCKER_COMPUTE_ENVIRONMENTS` environment variable should be a JSON array of "resources": [ { "id": "disk", - "total": 1000000000 + "total": 10 } ], "storageExpiry": 604800, @@ -158,11 +161,11 @@ The `DOCKER_COMPUTE_ENVIRONMENTS` environment variable should be a JSON array of }, { "id": "ram", - "max": 1000000000 + "max": 1 }, { "id": "disk", - "max": 1000000000 + "max": 1 } ] } diff --git a/elasticsearch-compose.yml b/elasticsearch-compose.yml new file mode 100644 index 000000000..504eb2cf1 --- /dev/null +++ b/elasticsearch-compose.yml @@ -0,0 +1,17 @@ +services: + elasticsearch: + image: elasticsearch:8.5.1 + ports: + - 9200:9200 + - 9300:9300 + volumes: + - esdata:/usr/share/elasticsearch/data + environment: + ES_JAVA_OPTS: "-Xms512m -Xmx512m" + MAX_MAP_COUNT: "64000" + discovery.type: "single-node" + ELASTIC_PASSWORD: "changeme" + xpack.security.enabled: "false" + xpack.security.http.ssl.enabled: "false" +volumes: + esdata: diff --git a/package-lock.json b/package-lock.json index 3ef38b138..3f8f53996 100644 --- a/package-lock.json +++ b/package-lock.json @@ -38,7 +38,7 @@ "@oceanprotocol/ddo-js": "^0.1.4", "@types/lodash.clonedeep": "^4.5.7", "aws-sdk": "^2.1591.0", - "axios": "^1.8.4", + "axios": "^1.12.0", "base58-js": "^2.0.0", "cors": "^2.8.5", "delay": "^5.0.0", @@ -49,6 +49,7 @@ "eth-crypto": "^2.6.0", "ethers": "^6.14.1", "express": "^4.21.1", + "humanhash": "^1.0.4", "hyperdiff": "^2.0.16", "ip": "^2.0.1", "it-pipe": "^3.0.1", @@ -68,7 +69,8 @@ "uuid": "^11.1.0", "winston": "^3.11.0", "winston-daily-rotate-file": "^4.7.1", - "winston-transport": "^4.6.0" + "winston-transport": "^4.6.0", + "zod": "^3.25.76" }, "devDependencies": { "@types/chai": "^4.3.10", @@ -83,6 +85,7 @@ "@types/node-cron": "^3.0.11", "@types/rdfjs__formats-common": "^3.1.5", "@types/sinon": "^17.0.4", + "@types/tar-stream": "^3.1.4", "@typescript-eslint/eslint-plugin": "^6.8.0", "@typescript-eslint/parser": "^6.8.0", "auto-changelog": "^2.4.0", @@ -6235,6 +6238,16 @@ "undici-types": "~5.26.4" } }, + "node_modules/@types/tar-stream": { + "version": "3.1.4", + "resolved": "https://registry.npmjs.org/@types/tar-stream/-/tar-stream-3.1.4.tgz", + "integrity": "sha512-921gW0+g29mCJX0fRvqeHzBlE/XclDaAG0Ousy1LCghsOhvaKacDeRGEVzQP9IPfKn8Vysy7FEXAIxycpc/CMg==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@types/triple-beam": { "version": "1.3.5", "resolved": "https://registry.npmjs.org/@types/triple-beam/-/triple-beam-1.3.5.tgz", @@ -7132,6 +7145,24 @@ "node": ">=4" } }, + "node_modules/async-function": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/async-function/-/async-function-1.0.0.tgz", + "integrity": "sha512-hsU18Ae8CDTR6Kgu9DYf0EbCr/a5iGL0rytQDobUcdpYOKokk8LEjVphnXkDkgpi0wYVsqrXuP0bZxJaTqdgoA==", + "license": "MIT", + "engines": { + "node": ">= 0.4" + } + }, + "node_modules/async-generator-function": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/async-generator-function/-/async-generator-function-1.0.0.tgz", + "integrity": "sha512-+NAXNqgCrB95ya4Sr66i1CL2hqLVckAk7xwRYWdcm39/ELQ6YNn1aw5r0bdQtqNZgQpEWzc5yc/igXc7aL5SLA==", + "license": "MIT", + "engines": { + "node": ">= 0.4" + } + }, "node_modules/async-retry": { "version": "1.3.3", "resolved": "https://registry.npmjs.org/async-retry/-/async-retry-1.3.3.tgz", @@ -7194,9 +7225,9 @@ } }, "node_modules/aws-sdk": { - "version": "2.1675.0", - "resolved": "https://registry.npmjs.org/aws-sdk/-/aws-sdk-2.1675.0.tgz", - "integrity": "sha512-gkqNAP0m3gDpnZCKL2OLdwAG+SjYT9MURGfTkixAWHIPDYD4OQf3sCcZNBTTTeOvOXus/tJIpgafKHD9DCIOCQ==", + "version": "2.1692.0", + "resolved": "https://registry.npmjs.org/aws-sdk/-/aws-sdk-2.1692.0.tgz", + "integrity": "sha512-x511uiJ/57FIsbgUe5csJ13k3uzu25uWQE+XqfBis/sB0SFoiElJWXRkgEAUh0U6n40eT3ay5Ue4oPkRMu1LYw==", "hasInstallScript": true, "license": "Apache-2.0", "dependencies": { @@ -7242,12 +7273,13 @@ } }, "node_modules/axios": { - "version": "1.9.0", - "resolved": "https://registry.npmjs.org/axios/-/axios-1.9.0.tgz", - "integrity": "sha512-re4CqKTJaURpzbLHtIi6XpDv20/CnpXOtjRY5/CU32L8gU8ek9UIivcfvSWvmKEngmVbrUtPpdDwWDWL7DNHvg==", + "version": "1.12.2", + "resolved": "https://registry.npmjs.org/axios/-/axios-1.12.2.tgz", + "integrity": "sha512-vMJzPewAlRyOgxV2dU0Cuz2O8zzzx9VYtbJOaBgXFeLc4IV/Eg50n4LowmehOOR61S8ZMpc2K5Sa7g6A4jfkUw==", + "license": "MIT", "dependencies": { "follow-redirects": "^1.15.6", - "form-data": "^4.0.0", + "form-data": "^4.0.4", "proxy-from-env": "^1.1.0" } }, @@ -8008,6 +8040,19 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/call-bind-apply-helpers": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/call-bind-apply-helpers/-/call-bind-apply-helpers-1.0.2.tgz", + "integrity": "sha512-Sp1ablJ0ivDkSzjcaJdxEunN5/XvksFJ2sMBFfq6x0ryhQV/2b/KwFe21cMpmHtPOSij8K99/wSfoEuTObmuMQ==", + "license": "MIT", + "dependencies": { + "es-errors": "^1.3.0", + "function-bind": "^1.1.2" + }, + "engines": { + "node": ">= 0.4" + } + }, "node_modules/callsites": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/callsites/-/callsites-3.1.0.tgz", @@ -9444,6 +9489,20 @@ "node": ">=0.10" } }, + "node_modules/dunder-proto": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/dunder-proto/-/dunder-proto-1.0.1.tgz", + "integrity": "sha512-KIN/nDJBQRcXw0MLVhZE9iQHmG68qAVIBg9CqmUYjmQIhgij9U5MFvrqkUL5FbtyyzZuOeOt0zdeRe4UY7ct+A==", + "license": "MIT", + "dependencies": { + "call-bind-apply-helpers": "^1.0.1", + "es-errors": "^1.3.0", + "gopd": "^1.2.0" + }, + "engines": { + "node": ">= 0.4" + } + }, "node_modules/duplex-to": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/duplex-to/-/duplex-to-2.0.0.tgz", @@ -9723,13 +9782,10 @@ "license": "MIT" }, "node_modules/es-define-property": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/es-define-property/-/es-define-property-1.0.0.tgz", - "integrity": "sha512-jxayLKShrEqqzJ0eumQbVhTYQM27CfT1T35+gCgDFoL82JLsXqTJ76zv6A0YLOgEnLUMvLzsDsGIrl8NFpT2gQ==", + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/es-define-property/-/es-define-property-1.0.1.tgz", + "integrity": "sha512-e3nRfgfUZ4rNGL232gUgX06QNyyez04KdjFrF+LTRoOXmrOgFKDg4BCdsjW8EnT69eqdYGmRpJwiPVYNrCaW3g==", "license": "MIT", - "dependencies": { - "get-intrinsic": "^1.2.4" - }, "engines": { "node": ">= 0.4" } @@ -9790,9 +9846,9 @@ } }, "node_modules/es-object-atoms": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/es-object-atoms/-/es-object-atoms-1.0.0.tgz", - "integrity": "sha512-MZ4iQ6JwHOBQjahnjwaC1ZtIBH+2ohjamzAO3oaHcXYup7qxjF2fixyH+Q71voWHeOkI2q/TnJao/KfXYIZWbw==", + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/es-object-atoms/-/es-object-atoms-1.1.1.tgz", + "integrity": "sha512-FGgH2h8zKNim9ljj7dankFPcICIK9Cp5bm+c2gQSYePhpaG5+esrLODihIorn+Pe6FGJzWhXQotPv73jTaldXA==", "license": "MIT", "dependencies": { "es-errors": "^1.3.0" @@ -9802,14 +9858,15 @@ } }, "node_modules/es-set-tostringtag": { - "version": "2.0.3", - "resolved": "https://registry.npmjs.org/es-set-tostringtag/-/es-set-tostringtag-2.0.3.tgz", - "integrity": "sha512-3T8uNMC3OQTHkFUsFq8r/BwAXLHvU/9O9mE0fBc/MY5iq/8H7ncvO947LmYA6ldWw9Uh8Yhf25zu6n7nML5QWQ==", + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/es-set-tostringtag/-/es-set-tostringtag-2.1.0.tgz", + "integrity": "sha512-j6vWzfrGVfyXxge+O0x5sh6cvxAog0a/4Rdd2K36zCMV5eJ+/+tOAngRO8cODMNWbVRdVlmGZQL2YS3yR8bIUA==", "license": "MIT", "dependencies": { - "get-intrinsic": "^1.2.4", + "es-errors": "^1.3.0", + "get-intrinsic": "^1.2.6", "has-tostringtag": "^1.0.2", - "hasown": "^2.0.1" + "hasown": "^2.0.2" }, "engines": { "node": ">= 0.4" @@ -11561,13 +11618,15 @@ } }, "node_modules/form-data": { - "version": "4.0.0", - "resolved": "https://registry.npmjs.org/form-data/-/form-data-4.0.0.tgz", - "integrity": "sha512-ETEklSGi5t0QMZuiXoA/Q6vcnxcLQP5vdugSpuAyi6SVGi2clPPp+xgEhuMaHC+zGgn31Kd235W35f7Hykkaww==", + "version": "4.0.4", + "resolved": "https://registry.npmjs.org/form-data/-/form-data-4.0.4.tgz", + "integrity": "sha512-KrGhL9Q4zjj0kiUt5OO4Mr/A/jlI2jDYs5eHBpYHPcBEVSiipAvn2Ko2HnPe20rmcuuvMHNdZFp+4IlGTMF0Ow==", "license": "MIT", "dependencies": { "asynckit": "^0.4.0", "combined-stream": "^1.0.8", + "es-set-tostringtag": "^2.1.0", + "hasown": "^2.0.2", "mime-types": "^2.1.12" }, "engines": { @@ -11778,6 +11837,15 @@ "node": ">=8" } }, + "node_modules/generator-function": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/generator-function/-/generator-function-2.0.0.tgz", + "integrity": "sha512-xPypGGincdfyl/AiSGa7GjXLkvld9V7GjZlowup9SHIJnQnHLFiLODCd/DqKOp0PBagbHJ68r1KJI9Mut7m4sA==", + "license": "MIT", + "engines": { + "node": ">= 0.4" + } + }, "node_modules/gensync": { "version": "1.0.0-beta.2", "resolved": "https://registry.npmjs.org/gensync/-/gensync-1.0.0-beta.2.tgz", @@ -11821,16 +11889,24 @@ } }, "node_modules/get-intrinsic": { - "version": "1.2.4", - "resolved": "https://registry.npmjs.org/get-intrinsic/-/get-intrinsic-1.2.4.tgz", - "integrity": "sha512-5uYhsJH8VJBTv7oslg4BznJYhDoRI6waYCxMmCdnTrcCrHA/fCFKoTFz2JKKE0HdDFUF7/oQuhzumXJK7paBRQ==", + "version": "1.3.1", + "resolved": "https://registry.npmjs.org/get-intrinsic/-/get-intrinsic-1.3.1.tgz", + "integrity": "sha512-fk1ZVEeOX9hVZ6QzoBNEC55+Ucqg4sTVwrVuigZhuRPESVFpMyXnd3sbXvPOwp7Y9riVyANiqhEuRF0G1aVSeQ==", "license": "MIT", "dependencies": { + "async-function": "^1.0.0", + "async-generator-function": "^1.0.0", + "call-bind-apply-helpers": "^1.0.2", + "es-define-property": "^1.0.1", "es-errors": "^1.3.0", + "es-object-atoms": "^1.1.1", "function-bind": "^1.1.2", - "has-proto": "^1.0.1", - "has-symbols": "^1.0.3", - "hasown": "^2.0.0" + "generator-function": "^2.0.0", + "get-proto": "^1.0.1", + "gopd": "^1.2.0", + "has-symbols": "^1.1.0", + "hasown": "^2.0.2", + "math-intrinsics": "^1.1.0" }, "engines": { "node": ">= 0.4" @@ -11855,6 +11931,19 @@ "node": ">=8.0.0" } }, + "node_modules/get-proto": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/get-proto/-/get-proto-1.0.1.tgz", + "integrity": "sha512-sTSfBjoXBp89JvIKIefqw7U2CCebsc74kiY6awiGogKtoSGbgjYE/G/+l9sF3MWFPNc9IcoOC4ODfKHfxFmp0g==", + "license": "MIT", + "dependencies": { + "dunder-proto": "^1.0.1", + "es-object-atoms": "^1.0.0" + }, + "engines": { + "node": ">= 0.4" + } + }, "node_modules/get-stream": { "version": "9.0.1", "resolved": "https://registry.npmjs.org/get-stream/-/get-stream-9.0.1.tgz", @@ -12069,12 +12158,12 @@ } }, "node_modules/gopd": { - "version": "1.0.1", - "resolved": "https://registry.npmjs.org/gopd/-/gopd-1.0.1.tgz", - "integrity": "sha512-d65bNlIadxvpb/A2abVdlqKqV563juRnZ1Wtk6s1sIR8uNsXR70xqIzVqxVf1eTqDunwT2MkczEeaezCKTZhwA==", + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/gopd/-/gopd-1.2.0.tgz", + "integrity": "sha512-ZUKRh6/kUFoAiTAtTYPZJ3hw9wNxx+BIBOijnlG9PnrJsCcSjs1wyyD6vJpaYtgnzDrKYRSqf3OO6Rfa93xsRg==", "license": "MIT", - "dependencies": { - "get-intrinsic": "^1.1.3" + "engines": { + "node": ">= 0.4" }, "funding": { "url": "https://github.com/sponsors/ljharb" @@ -12157,9 +12246,9 @@ } }, "node_modules/has-symbols": { - "version": "1.0.3", - "resolved": "https://registry.npmjs.org/has-symbols/-/has-symbols-1.0.3.tgz", - "integrity": "sha512-l3LCuF6MgDNwTDKkdYGEihYjt5pRPbEg46rtlmnSPlUbgmB8LOIrKJbYYFBSbnPaJexMKtiPO8hmeRjRz2Td+A==", + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/has-symbols/-/has-symbols-1.1.0.tgz", + "integrity": "sha512-1cDNdwJ2Jaohmb3sg4OmKaMBwuC48sYni5HUw2DvsC8LjGTLK9h+eb1X6RyuOHe4hT0ULCW68iomhjUoKUqlPQ==", "license": "MIT", "engines": { "node": ">= 0.4" @@ -12389,6 +12478,25 @@ "node": ">= 6" } }, + "node_modules/humanhash": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/humanhash/-/humanhash-1.0.4.tgz", + "integrity": "sha512-fxOhEl/Ezv7PobYOTomDmQKWaSC0hk0mzl5et5McPtr+6LRBP7LYoeFLPjKW6xOSGmMNLj50BufrrgX+M5EvEA==", + "license": "Unlicense", + "dependencies": { + "uuid": "^3.3.2" + } + }, + "node_modules/humanhash/node_modules/uuid": { + "version": "3.4.0", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-3.4.0.tgz", + "integrity": "sha512-HjSDRw6gZE5JMggctHBcjVak08+KEVhSIiDzFnT9S9aegmp85S/bReBVTb4QTFaRNptJ9kuYaNhnbNEOkbKb/A==", + "deprecated": "Please upgrade to version 7 or higher. Older versions may use Math.random() in certain circumstances, which is known to be problematic. See https://v8.dev/blog/math-random for details.", + "license": "MIT", + "bin": { + "uuid": "bin/uuid" + } + }, "node_modules/humanize-ms": { "version": "1.2.1", "resolved": "https://registry.npmjs.org/humanize-ms/-/humanize-ms-1.2.1.tgz", @@ -14743,6 +14851,15 @@ "node": ">= 10" } }, + "node_modules/math-intrinsics": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/math-intrinsics/-/math-intrinsics-1.1.0.tgz", + "integrity": "sha512-/IXtbwEk5HTPyEwyKX6hGkYXxM9nbj64B+ilVJnC/R6B0pH5G4V3b0pVbL7DBj4tkhBAppbQUlf6F6Xl9LHu1g==", + "license": "MIT", + "engines": { + "node": ">= 0.4" + } + }, "node_modules/md5.js": { "version": "1.3.5", "resolved": "https://registry.npmjs.org/md5.js/-/md5.js-1.3.5.tgz", @@ -22034,6 +22151,15 @@ "funding": { "url": "https://github.com/sponsors/sindresorhus" } + }, + "node_modules/zod": { + "version": "3.25.76", + "resolved": "https://registry.npmjs.org/zod/-/zod-3.25.76.tgz", + "integrity": "sha512-gzUt/qt81nXsFGKIFcC3YnfEAx5NkunCfnDlvuBSSFS02bcXu4Lmea0AFIUwbLWxWPx3d9p8S5QoaujKcNQxcQ==", + "license": "MIT", + "funding": { + "url": "https://github.com/sponsors/colinhacks" + } } } } diff --git a/package.json b/package.json index d9979a5ea..941106390 100644 --- a/package.json +++ b/package.json @@ -15,7 +15,7 @@ "build:controlpanel": "cd controlpanel && npm install --maxsockets 1 && NODE_ENV=production npx next build", "quickstart": "bash scripts/ocean-node-quickstart.sh", "setupEnv": "bash -c './src/helpers/scripts/setupNodeEnv.sh && source .env'", - "build-tests:tsc": "tsc --sourceMap --sourceRoot ./src/test && cp ./src/test/.env.test ./dist/test", + "build-tests:tsc": "tsc --sourceMap --sourceRoot ./src/test && cp ./src/test/.env.test ./dist/test && cp ./src/test/.env.test2 ./dist/test && cp ./src/test/config.json $HOME/config.json", "client": "mkdir -p ./dist/helpers/scripts/output && node dist/helpers/scripts/clientExample.js", "clean": "if [ -d ./dist ]; then find ./dist -mindepth 1 -not -path './dist/controlpanel*' -delete; fi", "clean:all": "rm -rf ./dist/ ./doc/ ./.nyc_output", @@ -77,7 +77,7 @@ "@oceanprotocol/ddo-js": "^0.1.4", "@types/lodash.clonedeep": "^4.5.7", "aws-sdk": "^2.1591.0", - "axios": "^1.8.4", + "axios": "^1.12.0", "base58-js": "^2.0.0", "cors": "^2.8.5", "delay": "^5.0.0", @@ -88,6 +88,7 @@ "eth-crypto": "^2.6.0", "ethers": "^6.14.1", "express": "^4.21.1", + "humanhash": "^1.0.4", "hyperdiff": "^2.0.16", "ip": "^2.0.1", "it-pipe": "^3.0.1", @@ -107,7 +108,8 @@ "uuid": "^11.1.0", "winston": "^3.11.0", "winston-daily-rotate-file": "^4.7.1", - "winston-transport": "^4.6.0" + "winston-transport": "^4.6.0", + "zod": "^3.25.76" }, "devDependencies": { "@types/chai": "^4.3.10", @@ -122,6 +124,7 @@ "@types/node-cron": "^3.0.11", "@types/rdfjs__formats-common": "^3.1.5", "@types/sinon": "^17.0.4", + "@types/tar-stream": "^3.1.4", "@typescript-eslint/eslint-plugin": "^6.8.0", "@typescript-eslint/parser": "^6.8.0", "auto-changelog": "^2.4.0", diff --git a/scripts/ocean-node-quickstart.sh b/scripts/ocean-node-quickstart.sh index 4993d993f..13dc4b3cf 100755 --- a/scripts/ocean-node-quickstart.sh +++ b/scripts/ocean-node-quickstart.sh @@ -142,7 +142,7 @@ fi # Set default compute environments if not already defined if [ -z "$DOCKER_COMPUTE_ENVIRONMENTS" ]; then echo "Setting default DOCKER_COMPUTE_ENVIRONMENTS configuration" - export DOCKER_COMPUTE_ENVIRONMENTS="[{\"socketPath\":\"/var/run/docker.sock\",\"resources\":[{\"id\":\"disk\",\"total\":1000000000}],\"storageExpiry\":604800,\"maxJobDuration\":36000,\"fees\":{\"1\":[{\"feeToken\":\"0x123\",\"prices\":[{\"id\":\"cpu\",\"price\":1}]}]},\"free\":{\"maxJobDuration\":360000,\"maxJobs\":3,\"resources\":[{\"id\":\"cpu\",\"max\":1},{\"id\":\"ram\",\"max\":1000000000},{\"id\":\"disk\",\"max\":1000000000}]}}]" + export DOCKER_COMPUTE_ENVIRONMENTS="[{\"socketPath\":\"/var/run/docker.sock\",\"resources\":[{\"id\":\"disk\",\"total\":10}],\"storageExpiry\":604800,\"maxJobDuration\":36000,\"fees\":{\"1\":[{\"feeToken\":\"0x123\",\"prices\":[{\"id\":\"cpu\",\"price\":1}]}]},\"free\":{\"maxJobDuration\":360000,\"maxJobs\":3,\"resources\":[{\"id\":\"cpu\",\"max\":1},{\"id\":\"ram\",\"max\":1},{\"id\":\"disk\",\"max\":1}]}}]" fi cat < docker-compose.yml diff --git a/scripts/ocean-node-update.sh b/scripts/ocean-node-update.sh index b72804d9a..fa44b5337 100755 --- a/scripts/ocean-node-update.sh +++ b/scripts/ocean-node-update.sh @@ -1,6 +1,6 @@ #!/bin/bash -DEFAULT_DOCKER_ENVIRONMENTS='[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":1000000000}],"storageExpiry":604800,"maxJobDuration":36000,"fees":{"1":[{"feeToken":"0x123","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":360000,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1000000000},{"id":"disk","max":1000000000}]}}]' +DEFAULT_DOCKER_ENVIRONMENTS='[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"storageExpiry":604800,"maxJobDuration":36000,"fees":{"1":[{"feeToken":"0x123","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":360000,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]' check_prerequisites() { if [ ! -f "docker-compose.yml" ]; then diff --git a/src/@types/C2D/C2D.ts b/src/@types/C2D/C2D.ts index 28f8f5ace..f6cdb9ed6 100644 --- a/src/@types/C2D/C2D.ts +++ b/src/@types/C2D/C2D.ts @@ -1,4 +1,4 @@ -import { MetadataAlgorithm } from '@oceanprotocol/ddo-js' +import { MetadataAlgorithm, ConsumerParameter } from '@oceanprotocol/ddo-js' import type { BaseFileObject } from '../fileObject.js' export enum C2DClusterType { // eslint-disable-next-line no-unused-vars @@ -145,6 +145,11 @@ export interface ComputeResult { export type DBComputeJobMetadata = { [key: string]: string | number | boolean } + +export interface ComputeJobTerminationDetails { + OOMKilled: boolean + exitCode: number +} export interface ComputeJob { owner: string did?: string @@ -160,6 +165,7 @@ export interface ComputeJob { agreementId?: string environment?: string metadata?: DBComputeJobMetadata + terminationDetails?: ComputeJobTerminationDetails } export interface ComputeOutput { @@ -181,15 +187,27 @@ export interface ComputeAsset { transferTxId?: string userdata?: { [key: string]: any } } - +export interface ExtendedMetadataAlgorithm extends MetadataAlgorithm { + container: { + // retain existing properties + entrypoint: string + image: string + tag: string + checksum: string + dockerfile?: string // optional + additionalDockerFiles?: { [key: string]: any } + consumerParameters?: ConsumerParameter[] + } +} export interface ComputeAlgorithm { documentId?: string serviceId?: string fileObject?: BaseFileObject - meta?: MetadataAlgorithm + meta?: ExtendedMetadataAlgorithm transferTxId?: string algocustomdata?: { [key: string]: any } userdata?: { [key: string]: any } + envs?: { [key: string]: any } } export interface AlgoChecksums { @@ -236,6 +254,10 @@ export enum C2DStatusNumber { // eslint-disable-next-line no-unused-vars PullImageFailed = 11, // eslint-disable-next-line no-unused-vars + BuildImage = 12, + // eslint-disable-next-line no-unused-vars + BuildImageFailed = 13, + // eslint-disable-next-line no-unused-vars ConfiguringVolumes = 20, // eslint-disable-next-line no-unused-vars VolumeCreationFailed = 21, @@ -254,6 +276,8 @@ export enum C2DStatusNumber { // eslint-disable-next-line no-unused-vars AlgorithmFailed = 41, // eslint-disable-next-line no-unused-vars + DiskQuotaExceeded = 42, + // eslint-disable-next-line no-unused-vars FilteringResults = 50, // eslint-disable-next-line no-unused-vars PublishingResults = 60, @@ -272,6 +296,10 @@ export enum C2DStatusText { // eslint-disable-next-line no-unused-vars PullImageFailed = 'Pulling algorithm image failed', // eslint-disable-next-line no-unused-vars + BuildImage = 'Building algorithm image', + // eslint-disable-next-line no-unused-vars + BuildImageFailed = 'Building algorithm image failed', + // eslint-disable-next-line no-unused-vars ConfiguringVolumes = 'Configuring volumes', // eslint-disable-next-line no-unused-vars VolumeCreationFailed = 'Volume creation failed', @@ -290,6 +318,8 @@ export enum C2DStatusText { // eslint-disable-next-line no-unused-vars AlgorithmFailed = 'Failed to run algorithm', // eslint-disable-next-line no-unused-vars + DiskQuotaExceeded = 'Error: disk quota exceeded', + // eslint-disable-next-line no-unused-vars FilteringResults = 'Filtering results', // eslint-disable-next-line no-unused-vars PublishingResults = 'Publishing results', diff --git a/src/@types/OceanNode.ts b/src/@types/OceanNode.ts index 55b4db5ff..6eb5fba01 100644 --- a/src/@types/OceanNode.ts +++ b/src/@types/OceanNode.ts @@ -146,6 +146,7 @@ export interface StorageTypes { export interface OceanNodeStatus { id: string publicKey: string + friendlyName: string address: string version: string http: boolean diff --git a/src/@types/humanhash.d.ts b/src/@types/humanhash.d.ts new file mode 100644 index 000000000..3d305eb40 --- /dev/null +++ b/src/@types/humanhash.d.ts @@ -0,0 +1,13 @@ +declare module 'humanhash' { + class HumanHasher { + constructor(wordlist?: string[]) + humanize(hexdigest: string, words?: number, separator?: string): string + uuid( + words?: number, + separator?: string, + version?: number + ): { humanhash: string; uuid: string } + } + + export = HumanHasher +} diff --git a/src/components/Indexer/processors/ExchangeActivatedEventProcessor.ts b/src/components/Indexer/processors/ExchangeActivatedEventProcessor.ts index 8db5def61..28fa7a429 100644 --- a/src/components/Indexer/processors/ExchangeActivatedEventProcessor.ts +++ b/src/components/Indexer/processors/ExchangeActivatedEventProcessor.ts @@ -21,24 +21,30 @@ export class ExchangeActivatedEventProcessor extends BaseEventProcessor { signer: Signer, provider: JsonRpcApiProvider ): Promise { - const decodedEventData = await this.getEventData( - provider, - event.transactionHash, - FixedRateExchange.abi, - EVENTS.EXCHANGE_ACTIVATED - ) - INDEXER_LOGGER.logMessage(`event: ${JSON.stringify(event)}`) - INDEXER_LOGGER.logMessage( - `decodedEventData in exchange activated: ${JSON.stringify(decodedEventData)}` - ) - const exchangeId = decodedEventData.args[0].toString() - const freContract = new ethers.Contract(event.address, FixedRateExchange.abi, signer) - const exchange = await freContract.getExchange(exchangeId) - const datatokenAddress = exchange[1] - const datatokenContract = getDtContract(signer, datatokenAddress) - const nftAddress = await datatokenContract.getERC721Address() - const did = getDid(nftAddress, chainId) try { + const decodedEventData = await this.getEventData( + provider, + event.transactionHash, + FixedRateExchange.abi, + EVENTS.EXCHANGE_ACTIVATED + ) + INDEXER_LOGGER.logMessage(`event: ${JSON.stringify(event)}`) + INDEXER_LOGGER.logMessage( + `decodedEventData in exchange activated: ${JSON.stringify(decodedEventData)}` + ) + const exchangeId = decodedEventData.args[0].toString() + const freContract = new ethers.Contract( + event.address, + FixedRateExchange.abi, + signer + ) + const exchange = await freContract.getExchange(exchangeId) + + const datatokenAddress = exchange[1] + const datatokenContract = getDtContract(signer, datatokenAddress) + const nftAddress = await datatokenContract.getERC721Address() + const did = getDid(nftAddress, chainId) + const { ddo: ddoDatabase } = await getDatabase() const ddo = await ddoDatabase.retrieve(did) if (!ddo) { diff --git a/src/components/Indexer/processors/ExchangeCreatedEventProcessor.ts b/src/components/Indexer/processors/ExchangeCreatedEventProcessor.ts index 661e140e3..b9a02c53f 100644 --- a/src/components/Indexer/processors/ExchangeCreatedEventProcessor.ts +++ b/src/components/Indexer/processors/ExchangeCreatedEventProcessor.ts @@ -21,20 +21,26 @@ export class ExchangeCreatedEventProcessor extends BaseEventProcessor { signer: Signer, provider: JsonRpcApiProvider ): Promise { - const decodedEventData = await this.getEventData( - provider, - event.transactionHash, - FixedRateExchange.abi, - EVENTS.EXCHANGE_CREATED - ) - const exchangeId = decodedEventData.args[0].toString() - const freContract = new ethers.Contract(event.address, FixedRateExchange.abi, signer) - const exchange = await freContract.getExchange(exchangeId) - const datatokenAddress = exchange[1] - const datatokenContract = getDtContract(signer, datatokenAddress) - const nftAddress = await datatokenContract.getERC721Address() - const did = getDid(nftAddress, chainId) try { + const decodedEventData = await this.getEventData( + provider, + event.transactionHash, + FixedRateExchange.abi, + EVENTS.EXCHANGE_CREATED + ) + const exchangeId = decodedEventData.args[0].toString() + const freContract = new ethers.Contract( + event.address, + FixedRateExchange.abi, + signer + ) + const exchange = await freContract.getExchange(exchangeId) + + const datatokenAddress = exchange[1] + const datatokenContract = getDtContract(signer, datatokenAddress) + const nftAddress = await datatokenContract.getERC721Address() + const did = getDid(nftAddress, chainId) + const { ddo: ddoDatabase } = await getDatabase() const ddo = await ddoDatabase.retrieve(did) if (!ddo) { diff --git a/src/components/Indexer/processors/ExchangeRateChangedEventProcessor.ts b/src/components/Indexer/processors/ExchangeRateChangedEventProcessor.ts index d7e0a9c84..8c82d5ef8 100644 --- a/src/components/Indexer/processors/ExchangeRateChangedEventProcessor.ts +++ b/src/components/Indexer/processors/ExchangeRateChangedEventProcessor.ts @@ -21,21 +21,26 @@ export class ExchangeRateChangedEventProcessor extends BaseEventProcessor { signer: Signer, provider: JsonRpcApiProvider ): Promise { - const decodedEventData = await this.getEventData( - provider, - event.transactionHash, - FixedRateExchange.abi, - EVENTS.EXCHANGE_RATE_CHANGED - ) - const exchangeId = ethers.toUtf8Bytes(decodedEventData.args[0].toString()) - const newRate = decodedEventData.args[2].toString() - const freContract = new ethers.Contract(event.address, FixedRateExchange.abi, signer) - const exchange = await freContract.getExchange(exchangeId) - const datatokenAddress = exchange[1] - const datatokenContract = getDtContract(signer, datatokenAddress) - const nftAddress = await datatokenContract.getERC721Address() - const did = getDid(nftAddress, chainId) try { + const decodedEventData = await this.getEventData( + provider, + event.transactionHash, + FixedRateExchange.abi, + EVENTS.EXCHANGE_RATE_CHANGED + ) + const exchangeId = ethers.toUtf8Bytes(decodedEventData.args[0].toString()) + const newRate = decodedEventData.args[2].toString() + const freContract = new ethers.Contract( + event.address, + FixedRateExchange.abi, + signer + ) + const exchange = await freContract.getExchange(exchangeId) + const datatokenAddress = exchange[1] + const datatokenContract = getDtContract(signer, datatokenAddress) + const nftAddress = await datatokenContract.getERC721Address() + const did = getDid(nftAddress, chainId) + const { ddo: ddoDatabase } = await getDatabase() const ddo = await ddoDatabase.retrieve(did) if (!ddo) { @@ -102,7 +107,11 @@ export class ExchangeRateChangedEventProcessor extends BaseEventProcessor { ) return savedDDO } catch (err) { - INDEXER_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, `Error retrieving DDO: ${err}`, true) + INDEXER_LOGGER.log( + LOG_LEVELS_STR.LEVEL_ERROR, + `Error processing ExchangeRateChangedEvent: ${err}`, + true + ) } } } diff --git a/src/components/P2P/index.ts b/src/components/P2P/index.ts index af04cf02d..05319df71 100644 --- a/src/components/P2P/index.ts +++ b/src/components/P2P/index.ts @@ -123,7 +123,6 @@ export class OceanP2P extends EventEmitter { this._connections = {} this._protocol = '/ocean/nodes/1.0.0' - this._interval = setInterval(this._flushAdvertiseQueue.bind(this), 60 * 1000) // every 60 seconds this._interval = setInterval(this._flushAdvertiseQueue.bind(this), 60 * 1000) // every 60 seconds // only enable handling of commands if not bootstrap node diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index dc5620f1a..693e3c73c 100644 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -30,12 +30,14 @@ import { Storage } from '../storage/index.js' import Dockerode from 'dockerode' import type { ContainerCreateOptions, HostConfig, VolumeCreateOptions } from 'dockerode' import * as tar from 'tar' +import * as tarStream from 'tar-stream' import { createWriteStream, existsSync, mkdirSync, rmSync, writeFileSync, + appendFileSync, statSync, createReadStream } from 'fs' @@ -56,6 +58,7 @@ export class C2DEngineDocker extends C2DEngine { public docker: Dockerode private cronTimer: any private cronTime: number = 2000 + private jobImageSizes: Map = new Map() public constructor(clusterConfig: C2DClusterInfo, db: C2DDatabase, escrow: Escrow) { super(clusterConfig, db, escrow) @@ -110,10 +113,11 @@ export class C2DEngineDocker extends C2DEngine { } // console.log(sysinfo) let fees: ComputeEnvFeesStructure = null - const supportedChains: number[] = [] - for (const chain of Object.keys(config.supportedNetworks)) { - supportedChains.push(parseInt(chain)) + if (config.supportedNetworks) { + for (const chain of Object.keys(config.supportedNetworks)) { + supportedChains.push(parseInt(chain)) + } } for (const feeChain of Object.keys(envConfig.fees)) { // for (const feeConfig of envConfig.fees) { @@ -163,7 +167,7 @@ export class C2DEngineDocker extends C2DEngine { consumerAddress: config.keys.ethAddress, platform: { architecture: sysinfo.Architecture, - os: sysinfo.OperatingSystem + os: sysinfo.OSType }, fees }) @@ -184,9 +188,9 @@ export class C2DEngineDocker extends C2DEngine { this.envs[0].resources.push({ id: 'ram', type: 'ram', - total: sysinfo.MemTotal, - max: sysinfo.MemTotal, - min: 1e9 + total: Math.floor(sysinfo.MemTotal / 1024 / 1024 / 1024), + max: Math.floor(sysinfo.MemTotal / 1024 / 1024 / 1024), + min: 1 }) if (envConfig.resources) { @@ -308,48 +312,33 @@ export class C2DEngineDocker extends C2DEngine { ): Promise { try { const info = drc.default.parseRepoAndRef(image) - /** - * info: { - index: { name: 'docker.io', official: true }, - official: true, - remoteName: 'library/node', - localName: 'node', - canonicalName: 'docker.io/node', - digest: 'sha256:1155995dda741e93afe4b1c6ced2d01734a6ec69865cc0997daf1f4db7259a36' - } - */ const client = drc.createClientV2({ name: info.localName }) - const tagOrDigest = info.tag || info.digest - - // try get manifest from registry - return await new Promise((resolve, reject) => { - client.getManifest( - { ref: tagOrDigest, maxSchemaVersion: 2 }, - function (err: any, manifest: any) { - client.close() - if (manifest) { - return resolve({ - valid: checkManifestPlatform(manifest.platform, platform) - }) - } + const ref = info.tag || info.digest - if (err) { - CORE_LOGGER.error( - `Unable to get Manifest for image ${image}: ${err.message}` - ) - reject(err) - } - } - ) + const manifest = await new Promise((resolve, reject) => { + client.getManifest({ ref, maxSchemaVersion: 2 }, (err: any, result: any) => { + client.close() + err ? reject(err) : resolve(result) + }) }) - } catch (err) { - // show all aggregated errors, if present - const aggregated = err.errors && err.errors.length > 0 - aggregated ? CORE_LOGGER.error(JSON.stringify(err.errors)) : CORE_LOGGER.error(err) + + const platforms = Array.isArray(manifest.manifests) + ? manifest.manifests.map((entry: any) => entry.platform) + : [manifest.platform] + + const isValidPlatform = platforms.some((entry: any) => + checkManifestPlatform(entry, platform) + ) + + return { valid: isValidPlatform } + } catch (err: any) { + CORE_LOGGER.error(`Unable to get Manifest for image ${image}: ${err.message}`) + if (err.errors?.length) CORE_LOGGER.error(JSON.stringify(err.errors)) + return { valid: false, status: 404, - reason: aggregated ? JSON.stringify(err.errors) : err.message + reason: err.errors?.length ? JSON.stringify(err.errors) : err.message } } } @@ -372,18 +361,6 @@ export class C2DEngineDocker extends C2DEngine { // TO DO - iterate over resources and get default runtime const isFree: boolean = !(payment && payment.lockTx) - // C2D - Check image, check arhitecture, etc - const image = getAlgorithmImage(algorithm) - // ex: node@sha256:1155995dda741e93afe4b1c6ced2d01734a6ec69865cc0997daf1f4db7259a36 - if (!image) { - // send a 500 with the error message - throw new Error( - `Unable to extract docker image ${image} from algoritm: ${JSON.stringify( - algorithm - )}` - ) - } - if (metadata && Object.keys(metadata).length > 0) { const metadataSize = JSON.stringify(metadata).length if (metadataSize > 1024) { @@ -400,9 +377,29 @@ export class C2DEngineDocker extends C2DEngine { if (!env) { throw new Error(`Invalid environment ${environment}`) } - const validation = await C2DEngineDocker.checkDockerImage(image, env.platform) - if (!validation.valid) - throw new Error(`Unable to validate docker image ${image}: ${validation.reason}`) + // C2D - Check image, check arhitecture, etc + const image = getAlgorithmImage(algorithm, jobId) + // ex: node@sha256:1155995dda741e93afe4b1c6ced2d01734a6ec69865cc0997daf1f4db7259a36 + if (!image) { + // send a 500 with the error message + throw new Error( + `Unable to extract docker image ${image} from algoritm: ${JSON.stringify( + algorithm + )}` + ) + } + let additionalDockerFiles: { [key: string]: any } = null + if ( + algorithm.meta && + algorithm.meta.container && + algorithm.meta.container.additionalDockerFiles + ) { + additionalDockerFiles = JSON.parse( + JSON.stringify(algorithm.meta.container.additionalDockerFiles) + ) + // make sure that we don't keep them in the db structure + algorithm.meta.container.additionalDockerFiles = null + } const job: DBComputeJob = { clusterHash: this.getC2DConfig().hash, containerImage: image, @@ -430,15 +427,37 @@ export class C2DEngineDocker extends C2DEngine { algoStopTimestamp: '0', payment, metadata, - additionalViewers + additionalViewers, + terminationDetails: { exitCode: null, OOMKilled: null } + } + + if (algorithm.meta.container && algorithm.meta.container.dockerfile) { + // we need to build the image + job.status = C2DStatusNumber.BuildImage + job.statusText = C2DStatusText.BuildImage + } else { + // already built, we need to validate it + const validation = await C2DEngineDocker.checkDockerImage(image, env.platform) + console.log('Validation: ', validation) + if (!validation.valid) + throw new Error( + `Cannot find image ${image} for ${env.platform.architecture}. Maybe it does not exist or it's build for other arhitectures.` + ) + job.status = C2DStatusNumber.PullImage + job.statusText = C2DStatusText.PullImage } + await this.makeJobFolders(job) // make sure we actually were able to insert on DB const addedId = await this.db.newJob(job) if (!addedId) { return [] } - + if (algorithm.meta.container && algorithm.meta.container.dockerfile) { + this.buildImage(job, additionalDockerFiles) + } else { + this.pullImage(job) + } // only now set the timer if (!this.cronTimer) { this.setNewTimer() @@ -708,7 +727,6 @@ export class C2DEngineDocker extends C2DEngine { ): Promise | null { try { const container = await this.docker.createContainer(containerInfo) - console.log('container: ', container) return container } catch (e) { CORE_LOGGER.error(`Unable to create docker container: ${e.message}`) @@ -727,6 +745,16 @@ export class C2DEngineDocker extends C2DEngine { } } + private async inspectContainer(container: Dockerode.Container): Promise { + try { + const data = await container.inspect() + return data.State + } catch (e) { + CORE_LOGGER.error(`Unable to inspect docker container: ${e.message}`) + return null + } + } + private async createDockerVolume( volume: VolumeCreateOptions, retry: boolean = false @@ -763,7 +791,7 @@ export class C2DEngineDocker extends C2DEngine { // - monitor running containers and stop them if over limits // - monitor disc space and clean up /* steps: - - instruct docker to pull image + - wait until image is ready - create volume - after image is ready, create the container - download assets & algo into temp folder @@ -776,65 +804,6 @@ export class C2DEngineDocker extends C2DEngine { - delete the container - delete the volume */ - if (job.status === C2DStatusNumber.JobStarted) { - // pull docker image - try { - const pullStream = await this.docker.pull(job.containerImage) - await new Promise((resolve, reject) => { - let wroteStatusBanner = false - this.docker.modem.followProgress( - pullStream, - (err: any, res: any) => { - // onFinished - if (err) return reject(err) - CORE_LOGGER.info('############# Pull docker image complete ##############') - resolve(res) - }, - (progress: any) => { - // onProgress - if (!wroteStatusBanner) { - wroteStatusBanner = true - CORE_LOGGER.info('############# Pull docker image status: ##############') - } - // only write the status banner once, its cleaner - CORE_LOGGER.info(progress.status) - } - ) - }) - } catch (err) { - CORE_LOGGER.error( - `Unable to pull docker image: ${job.containerImage}: ${err.message}` - ) - job.status = C2DStatusNumber.PullImageFailed - job.statusText = C2DStatusText.PullImageFailed - job.isRunning = false - job.dateFinished = String(Date.now() / 1000) - await this.db.updateJob(job) - await this.cleanupJob(job) - return - } - - job.status = C2DStatusNumber.PullImage - job.statusText = C2DStatusText.PullImage - await this.db.updateJob(job) - return // now we wait until image is ready - } - if (job.status === C2DStatusNumber.PullImage) { - try { - const imageInfo = await this.docker.getImage(job.containerImage) - console.log('imageInfo', imageInfo) - const details = await imageInfo.inspect() - console.log('details:', details) - job.status = C2DStatusNumber.ConfiguringVolumes - job.statusText = C2DStatusText.ConfiguringVolumes - await this.db.updateJob(job) - // now we can move forward - } catch (e) { - // not ready yet - CORE_LOGGER.error(`Unable to inspect docker image: ${e.message}`) - } - return - } if (job.status === C2DStatusNumber.ConfiguringVolumes) { // create the volume & create container // TO DO C2D: Choose driver & size @@ -884,7 +853,7 @@ export class C2DEngineDocker extends C2DEngine { // ram const ramSize = this.getResourceRequest(job.resources, 'ram') if (ramSize && ramSize > 0) { - hostConfig.Memory = ramSize + hostConfig.Memory = ramSize * 1024 * 1024 * 1024 // config is in GB, docker wants bytes // set swap to same memory value means no swap (otherwise it use like 2X mem) hostConfig.MemorySwap = hostConfig.Memory } @@ -934,6 +903,13 @@ export class C2DEngineDocker extends C2DEngine { ) containerInfo.Entrypoint = newEntrypoint.split(' ') } + if (job.algorithm.envs) { + const envVars: string[] = [] + for (const key of Object.keys(job.algorithm.envs)) { + envVars.push(`${key}=${job.algorithm.envs[key]}`) + } + containerInfo.Env = envVars + } const container = await this.createDockerContainer(containerInfo, true) if (container) { console.log('Container created: ', container) @@ -999,6 +975,9 @@ export class C2DEngineDocker extends C2DEngine { job.isStarted = true job.algoStartTimestamp = String(Date.now() / 1000) await this.db.updateJob(job) + CORE_LOGGER.info(`Container started successfully for job ${job.jobId}`) + + await this.measureContainerBaseSize(job, container) return } catch (e) { // container failed to start @@ -1028,7 +1007,12 @@ export class C2DEngineDocker extends C2DEngine { } } } else { - // is running, we need to stop it.. + const canContinue = await this.monitorDiskUsage(job) + if (!canContinue) { + // Job was terminated due to disk quota exceeded + return + } + console.log('running, need to stop it?') const timeNow = Date.now() / 1000 const expiry = parseFloat(job.algoStartTimestamp) + job.maxJobDuration @@ -1091,6 +1075,15 @@ export class C2DEngineDocker extends C2DEngine { await this.cleanupJob(job) return } + const state = await this.inspectContainer(container) + if (state) { + job.terminationDetails.OOMKilled = state.OOMKilled + job.terminationDetails.exitCode = state.ExitCode + } else { + job.terminationDetails.OOMKilled = null + job.terminationDetails.exitCode = null + } + const outputsArchivePath = this.getC2DConfig().tempFolder + '/' + job.jobId + '/data/outputs/outputs.tar' try { @@ -1120,6 +1113,8 @@ export class C2DEngineDocker extends C2DEngine { // - delete volume // - delete container + this.jobImageSizes.delete(job.jobId) + // payments if (!job.isFree && job.payment) { let txId = null @@ -1182,7 +1177,7 @@ export class C2DEngineDocker extends C2DEngine { await container.remove() } } catch (e) { - console.error('Container not found! ' + e.message) + // console.error('Container not found! ' + e.message) } try { const volume = await this.docker.getVolume(job.jobId + '-volume') @@ -1194,7 +1189,17 @@ export class C2DEngineDocker extends C2DEngine { } } } catch (e) { - console.error('Container volume not found! ' + e.message) + // console.error('Container volume not found! ' + e.message) + } + if (job.algorithm.meta.container && job.algorithm.meta.container.dockerfile) { + const image = getAlgorithmImage(job.algorithm, job.jobId) + if (image) { + try { + await this.docker.getImage(image).remove({ force: true }) + } catch (e) { + console.log('Could not delete image: ' + image + ' : ' + e.message) + } + } } try { // remove folders @@ -1230,6 +1235,252 @@ export class C2DEngineDocker extends C2DEngine { }) } + private getDiskQuota(job: DBComputeJob): number { + if (!job.resources) return 0 + + const diskResource = job.resources.find((resource) => resource.id === 'disk') + return diskResource ? diskResource.amount : 0 + } + + // Inspect the real runtime size of the container + private async measureContainerBaseSize( + job: DBComputeJob, + container: Dockerode.Container + ): Promise { + try { + if (this.jobImageSizes.has(job.jobId)) { + CORE_LOGGER.debug(`Using cached base size for job ${job.jobId.slice(-8)}`) + return + } + + // Wait for container filesystem to stabilize + await new Promise((resolve) => setTimeout(resolve, 3000)) + + const actualBaseSize = await this.getContainerDiskUsage(container.id, '/') + this.jobImageSizes.set(job.jobId, actualBaseSize) + + CORE_LOGGER.info( + `Base container ${job.containerImage} runtime size: ${( + actualBaseSize / + 1024 / + 1024 / + 1024 + ).toFixed(2)}GB` + ) + } catch (error) { + CORE_LOGGER.error(`Failed to measure base container size: ${error.message}`) + this.jobImageSizes.set(job.jobId, 0) + } + } + + private async getContainerDiskUsage( + containerName: string, + path: string = '/data' + ): Promise { + try { + const container = this.docker.getContainer(containerName) + const containerInfo = await container.inspect() + if (!containerInfo.State.Running) { + CORE_LOGGER.debug( + `Container ${containerName} is not running, cannot check disk usage` + ) + return 0 + } + + const exec = await container.exec({ + Cmd: ['du', '-sb', path], + AttachStdout: true, + AttachStderr: true + }) + + const stream = await exec.start({ Detach: false, Tty: false }) + + const chunks: Buffer[] = [] + for await (const chunk of stream) { + chunks.push(chunk as Buffer) + } + + const output = Buffer.concat(chunks).toString() + + const match = output.match(/(\d+)\s/) + return match ? parseInt(match[1], 10) : 0 + } catch (error) { + CORE_LOGGER.error( + `Failed to get container disk usage for ${containerName}: ${error.message}` + ) + return 0 + } + } + + private async monitorDiskUsage(job: DBComputeJob): Promise { + const diskQuota = this.getDiskQuota(job) + if (diskQuota <= 0) return true + + const containerName = job.jobId + '-algoritm' + const totalUsage = await this.getContainerDiskUsage(containerName, '/') + const baseImageSize = this.jobImageSizes.get(job.jobId) || 0 + const algorithmUsage = Math.max(0, totalUsage - baseImageSize) + + const usageGB = (algorithmUsage / 1024 / 1024 / 1024).toFixed(2) + const quotaGB = diskQuota.toFixed(1) + const usagePercent = ( + (algorithmUsage / 1024 / 1024 / 1024 / diskQuota) * + 100 + ).toFixed(1) + + CORE_LOGGER.info( + `Job ${job.jobId.slice(-8)} disk: ${usageGB}GB / ${quotaGB}GB (${usagePercent}%)` + ) + + if (algorithmUsage / 1024 / 1024 / 1024 > diskQuota) { + CORE_LOGGER.warn( + `DISK QUOTA EXCEEDED - Stopping job ${job.jobId}: ${usageGB}GB used, ${quotaGB}GB allowed` + ) + + try { + const container = this.docker.getContainer(containerName) + await container.stop() + CORE_LOGGER.info(`Container stopped for job ${job.jobId}`) + } catch (e) { + CORE_LOGGER.warn(`Could not stop container: ${e.message}`) + } + + job.status = C2DStatusNumber.DiskQuotaExceeded + job.statusText = C2DStatusText.DiskQuotaExceeded + job.isRunning = false + job.isStarted = false + job.algoStopTimestamp = String(Date.now() / 1000) + job.dateFinished = String(Date.now() / 1000) + + await this.db.updateJob(job) + CORE_LOGGER.info(`Job ${job.jobId} terminated - DISK QUOTA EXCEEDED`) + + return false + } + + return true + } + + private async pullImage(originaljob: DBComputeJob) { + const job = JSON.parse(JSON.stringify(originaljob)) as DBComputeJob + const imageLogFile = + this.getC2DConfig().tempFolder + '/' + job.jobId + '/data/logs/image.log' + try { + const pullStream = await this.docker.pull(job.containerImage) + await new Promise((resolve, reject) => { + let wroteStatusBanner = false + this.docker.modem.followProgress( + pullStream, + (err: any, res: any) => { + // onFinished + if (err) { + appendFileSync(imageLogFile, String(err.message)) + return reject(err) + } + const logText = `Successfully pulled image: ${job.containerImage}` + CORE_LOGGER.debug(logText) + appendFileSync(imageLogFile, logText + '\n') + resolve(res) + }, + (progress: any) => { + // onProgress + if (!wroteStatusBanner) { + wroteStatusBanner = true + CORE_LOGGER.debug('############# Pull docker image status: ##############') + } + // only write the status banner once, its cleaner + let logText = '' + if (progress.id) logText += progress.id + ' : ' + progress.status + else logText = progress.status + CORE_LOGGER.debug("Pulling image for jobId '" + job.jobId + "': " + logText) + console.log(progress) + appendFileSync(imageLogFile, logText + '\n') + } + ) + }) + job.status = C2DStatusNumber.ConfiguringVolumes + job.statusText = C2DStatusText.ConfiguringVolumes + this.db.updateJob(job) + } catch (err) { + const logText = `Unable to pull docker image: ${job.containerImage}: ${err.message}` + CORE_LOGGER.error(logText) + appendFileSync(imageLogFile, logText) + job.status = C2DStatusNumber.PullImageFailed + job.statusText = C2DStatusText.PullImageFailed + job.isRunning = false + job.dateFinished = String(Date.now() / 1000) + await this.db.updateJob(job) + await this.cleanupJob(job) + } + } + + private async buildImage( + originaljob: DBComputeJob, + additionalDockerFiles: { [key: string]: any } + ) { + const job = JSON.parse(JSON.stringify(originaljob)) as DBComputeJob + const imageLogFile = + this.getC2DConfig().tempFolder + '/' + job.jobId + '/data/logs/image.log' + try { + const pack = tarStream.pack() + + // Append the Dockerfile to the tar archive + pack.entry({ name: 'Dockerfile' }, job.algorithm.meta.container.dockerfile) + // Append any additional files to the tar archive + if (additionalDockerFiles) { + for (const filePath of Object.keys(additionalDockerFiles)) { + pack.entry({ name: filePath }, additionalDockerFiles[filePath]) + } + } + pack.finalize() + + // Build the image using the tar stream as context + const buildStream = await this.docker.buildImage(pack, { + t: job.containerImage + }) + + // Optional: listen to build output + buildStream.on('data', (data) => { + try { + const text = JSON.parse(data.toString('utf8')) + CORE_LOGGER.debug( + "Building image for jobId '" + job.jobId + "': " + text.stream.trim() + ) + appendFileSync(imageLogFile, String(text.stream)) + } catch (e) { + // console.log('non json build data: ', data.toString('utf8')) + } + }) + + await new Promise((resolve, reject) => { + buildStream.on('end', () => { + CORE_LOGGER.debug(`Image '${job.containerImage}' built successfully.`) + + resolve() + }) + buildStream.on('error', (err) => { + CORE_LOGGER.debug(`Error building image '${job.containerImage}':` + err.message) + appendFileSync(imageLogFile, String(err.message)) + reject(err) + }) + }) + job.status = C2DStatusNumber.ConfiguringVolumes + job.statusText = C2DStatusText.ConfiguringVolumes + this.db.updateJob(job) + } catch (err) { + CORE_LOGGER.error( + `Unable to build docker image: ${job.containerImage}: ${err.message}` + ) + appendFileSync(imageLogFile, String(err.message)) + job.status = C2DStatusNumber.BuildImageFailed + job.statusText = C2DStatusText.BuildImageFailed + job.isRunning = false + job.dateFinished = String(Date.now() / 1000) + await this.db.updateJob(job) + await this.cleanupJob(job) + } + } + private async uploadData( job: DBComputeJob ): Promise<{ status: C2DStatusNumber; statusText: C2DStatusText }> { @@ -1243,7 +1494,7 @@ export class C2DEngineDocker extends C2DEngine { const configLogPath = jobFolderPath + '/data/logs/configuration.log' try { - writeFileSync( + appendFileSync( configLogPath, "Writing algocustom data to '/data/inputs/algoCustomData.json'\n" ) @@ -1258,7 +1509,7 @@ export class C2DEngineDocker extends C2DEngine { if (job.algorithm.meta.rawcode && job.algorithm.meta.rawcode.length > 0) { // we have the code, just write it - writeFileSync(configLogPath, `Writing raw algo code to ${fullAlgoPath}\n`) + appendFileSync(configLogPath, `Writing raw algo code to ${fullAlgoPath}\n`) writeFileSync(fullAlgoPath, job.algorithm.meta.rawcode) } else { // do we have a files object? @@ -1270,7 +1521,7 @@ export class C2DEngineDocker extends C2DEngine { storage = Storage.getStorageClass(job.algorithm.fileObject, config) } catch (e) { CORE_LOGGER.error(`Unable to get storage class for algorithm: ${e.message}`) - writeFileSync( + appendFileSync( configLogPath, `Unable to get storage class for algorithm: ${e.message}\n` ) @@ -1292,7 +1543,7 @@ export class C2DEngineDocker extends C2DEngine { storage = Storage.getStorageClass(decryptedFileObject, config) } catch (e) { CORE_LOGGER.error(`Unable to decrypt algorithm files object: ${e.message}`) - writeFileSync( + appendFileSync( configLogPath, `Unable to decrypt algorithm files object: ${e.message}\n` ) @@ -1308,7 +1559,7 @@ export class C2DEngineDocker extends C2DEngine { 'algorithm file object seems to be missing, checking "serviceId" and "documentId"...' ) const { serviceId, documentId } = job.algorithm - writeFileSync( + appendFileSync( configLogPath, `Using ${documentId} and serviceId ${serviceId} to get algorithm files.\n` ) @@ -1323,7 +1574,7 @@ export class C2DEngineDocker extends C2DEngine { CORE_LOGGER.error( `Could not find service with ID ${serviceId} in DDO ${documentId}` ) - writeFileSync( + appendFileSync( configLogPath, `Could not find service with ID ${serviceId} in DDO ${documentId}\n` ) @@ -1338,7 +1589,7 @@ export class C2DEngineDocker extends C2DEngine { storage = Storage.getStorageClass(decryptedFileObject, config) } catch (e) { CORE_LOGGER.error(`Unable to decrypt algorithm files object: ${e.message}`) - writeFileSync( + appendFileSync( configLogPath, `Unable to decrypt algorithm files object: ${e.message}\n` ) @@ -1359,7 +1610,7 @@ export class C2DEngineDocker extends C2DEngine { CORE_LOGGER.info( 'Could not extract any files object from the compute algorithm, skipping...' ) - writeFileSync( + appendFileSync( configLogPath, 'Could not extract any files object from the compute algorithm, skipping...\n' ) @@ -1369,7 +1620,7 @@ export class C2DEngineDocker extends C2DEngine { CORE_LOGGER.error( 'Unable to write algorithm to path: ' + fullAlgoPath + ': ' + e.message ) - writeFileSync( + appendFileSync( configLogPath, 'Unable to write algorithm to path: ' + fullAlgoPath + ': ' + e.message + '\n' ) @@ -1385,7 +1636,7 @@ export class C2DEngineDocker extends C2DEngine { let storage = null let fileInfo = null console.log('checking now asset: ', i) - writeFileSync(configLogPath, `Downloading asset ${i} to /data/inputs/\n`) + appendFileSync(configLogPath, `Downloading asset ${i} to /data/inputs/\n`) // without this check it would break if no fileObject is present if (asset.fileObject) { try { @@ -1404,7 +1655,7 @@ export class C2DEngineDocker extends C2DEngine { }) } catch (e) { CORE_LOGGER.error(`Unable to get storage class for asset: ${e.message}`) - writeFileSync( + appendFileSync( configLogPath, `Unable to get storage class for asset: ${e.message}\n` ) @@ -1416,7 +1667,7 @@ export class C2DEngineDocker extends C2DEngine { } else { // we need to go the hard way const { serviceId, documentId } = asset - writeFileSync( + appendFileSync( configLogPath, `Using ${documentId} and serviceId ${serviceId} for this asset.\n` ) @@ -1436,7 +1687,7 @@ export class C2DEngineDocker extends C2DEngine { }) } catch (e) { CORE_LOGGER.error(`Unable to get storage class for asset: ${e.message}`) - writeFileSync( + appendFileSync( configLogPath, `Unable to get storage class for asset: ${e.message}\n` ) @@ -1450,7 +1701,7 @@ export class C2DEngineDocker extends C2DEngine { if (storage && fileInfo) { const fullPath = jobFolderPath + '/data/inputs/' + fileInfo[0].name - writeFileSync(configLogPath, `Downloading asset to ${fullPath}\n`) + appendFileSync(configLogPath, `Downloading asset to ${fullPath}\n`) try { await pipeline( (await storage.getReadableStream()).stream, @@ -1460,7 +1711,7 @@ export class C2DEngineDocker extends C2DEngine { CORE_LOGGER.error( 'Unable to write input data to path: ' + fullPath + ': ' + e.message ) - writeFileSync( + appendFileSync( configLogPath, 'Unable to write input data to path: ' + fullPath + ': ' + e.message + '\n' ) @@ -1473,14 +1724,14 @@ export class C2DEngineDocker extends C2DEngine { CORE_LOGGER.info( 'Could not extract any files object from the compute asset, skipping...' ) - writeFileSync( + appendFileSync( configLogPath, 'Could not extract any files object from the compute asset, skipping...\n' ) } } CORE_LOGGER.info('All good with data provisioning, will start uploading it...') - writeFileSync( + appendFileSync( configLogPath, 'All good with data provisioning, will start uploading it...\n' ) @@ -1513,7 +1764,7 @@ export class C2DEngineDocker extends C2DEngine { console.log('Done uploading') } catch (e) { - writeFileSync( + appendFileSync( configLogPath, 'Data upload to container failed: ' + e.message + '\n' ) @@ -1524,11 +1775,11 @@ export class C2DEngineDocker extends C2DEngine { } } else { CORE_LOGGER.debug('No data to upload, empty tar.gz') - writeFileSync(configLogPath, `No data to upload, empty tar.gz\n`) + appendFileSync(configLogPath, `No data to upload, empty tar.gz\n`) } } catch (e) { CORE_LOGGER.debug(e.message) - writeFileSync(configLogPath, `Error creating data archive: ${e.message}\n`) + appendFileSync(configLogPath, `Error creating data archive: ${e.message}\n`) return { status: C2DStatusNumber.DataProvisioningFailed, statusText: C2DStatusText.DataProvisioningFailed @@ -1600,10 +1851,13 @@ export class C2DEngineDocker extends C2DEngine { // this uses the docker engine, but exposes only one env, the free one -export function getAlgorithmImage(algorithm: ComputeAlgorithm): string { +export function getAlgorithmImage(algorithm: ComputeAlgorithm, jobId: string): string { if (!algorithm.meta || !algorithm.meta.container) { return null } + if (algorithm.meta.container.dockerfile) { + return jobId.toLowerCase() + '-image:latest' + } let { image } = algorithm.meta.container if (algorithm.meta.container.checksum) image = image + '@' + algorithm.meta.container.checksum @@ -1619,6 +1873,9 @@ export function checkManifestPlatform( envPlatform?: RunningPlatform ): boolean { if (!manifestPlatform || !envPlatform) return true // skips if not present + if (envPlatform.architecture === 'amd64') envPlatform.architecture = 'x86_64' // x86_64 is compatible with amd64 + if (manifestPlatform.architecture === 'amd64') manifestPlatform.architecture = 'x86_64' // x86_64 is compatible with amd64 + if ( envPlatform.architecture !== manifestPlatform.architecture || envPlatform.os !== manifestPlatform.os diff --git a/src/components/core/compute/initialize.ts b/src/components/core/compute/initialize.ts index b1bd8eb6d..e6274cb30 100644 --- a/src/components/core/compute/initialize.ts +++ b/src/components/core/compute/initialize.ts @@ -33,7 +33,7 @@ import { C2DEngineDocker, getAlgorithmImage } from '../../c2d/compute_engine_doc import { Credentials, DDOManager } from '@oceanprotocol/ddo-js' import { areKnownCredentialTypes, checkCredentials } from '../../../utils/credentials.js' import { PolicyServer } from '../../policyServer/index.js' -import { getAlgoChecksums, validateAlgoForDataset } from './utils.js' +import { generateUniqueID, getAlgoChecksums, validateAlgoForDataset } from './utils.js' export class ComputeInitializeHandler extends CommandHandler { validate(command: ComputeInitializeCommand): ValidateParams { @@ -366,7 +366,7 @@ export class ComputeInitializeHandler extends CommandHandler { } } if (hasDockerImages) { - const algoImage = getAlgorithmImage(task.algorithm) + const algoImage = getAlgorithmImage(task.algorithm, generateUniqueID(task)) if (algoImage) { const validation: ValidateParams = await C2DEngineDocker.checkDockerImage( algoImage, diff --git a/src/components/core/compute/startCompute.ts b/src/components/core/compute/startCompute.ts index ea8b55b07..53911c24f 100644 --- a/src/components/core/compute/startCompute.ts +++ b/src/components/core/compute/startCompute.ts @@ -6,7 +6,7 @@ import { PaidComputeStartCommand } from '../../../@types/commands.js' import { CommandHandler } from '../handler/handler.js' -import { getAlgoChecksums, validateAlgoForDataset } from './utils.js' +import { generateUniqueID, getAlgoChecksums, validateAlgoForDataset } from './utils.js' import { ValidateParams, buildInvalidRequestMessage, @@ -35,7 +35,6 @@ import { Credentials, DDOManager } from '@oceanprotocol/ddo-js' import { getNonceAsNumber } from '../utils/nonceHandler.js' import { PolicyServer } from '../../policyServer/index.js' import { areKnownCredentialTypes, checkCredentials } from '../../../utils/credentials.js' -import { generateUniqueID } from '../../database/sqliteCompute.js' export class PaidComputeStartHandler extends CommandHandler { validate(command: PaidComputeStartCommand): ValidateParams { diff --git a/src/components/core/compute/utils.ts b/src/components/core/compute/utils.ts index 55f664d53..6966a3e3c 100644 --- a/src/components/core/compute/utils.ts +++ b/src/components/core/compute/utils.ts @@ -14,6 +14,16 @@ import { createHash } from 'crypto' import { FindDdoHandler } from '../../core/handler/ddoHandler.js' import { DDOManager, VersionedDDO } from '@oceanprotocol/ddo-js' +export function generateUniqueID(jobStructure: any): string { + const timestamp = + BigInt(Date.now()) * 1_000_000n + (process.hrtime.bigint() % 1_000_000n) + const random = Math.random() + const jobId = createHash('sha256') + .update(JSON.stringify(jobStructure) + timestamp.toString() + random.toString()) + .digest('hex') + return jobId +} + export async function getAlgoChecksums( algoDID: string, algoServiceId: string, diff --git a/src/components/core/handler/policyServer.ts b/src/components/core/handler/policyServer.ts index 0d11a611a..dd90e83aa 100644 --- a/src/components/core/handler/policyServer.ts +++ b/src/components/core/handler/policyServer.ts @@ -10,6 +10,7 @@ import { buildInvalidRequestMessage, validateCommandParameters } from '../../httpRoutes/validateCommands.js' +import { CORE_LOGGER } from '../../../utils/logging/common.js' import { PolicyServer } from '../../policyServer/index.js' @@ -28,7 +29,18 @@ export class PolicyServerPassthroughHandler extends CommandHandler { if (this.shouldDenyTaskHandling(validationResponse)) { return validationResponse } - + task.policyServerPassthrough.ddo = null + // resolve DDO first + try { + task.policyServerPassthrough.ddo = await this.getOceanNode() + .getDatabase() + .ddo.retrieve(task.policyServerPassthrough.documentId) + } catch (error) { + // just log it + CORE_LOGGER.warn( + `PolicyServerPassthroughHandler: DDO not found for documentId ${task.policyServerPassthrough.documentId}: ${error.message}` + ) + } // policyServer check const policyServer = new PolicyServer() const policyStatus = await policyServer.passThrough(task.policyServerPassthrough) diff --git a/src/components/core/utils/statusHandler.ts b/src/components/core/utils/statusHandler.ts index 848f25266..ce9016840 100644 --- a/src/components/core/utils/statusHandler.ts +++ b/src/components/core/utils/statusHandler.ts @@ -14,6 +14,7 @@ import { OceanNode } from '../../../OceanNode.js' import { typesenseSchemas } from '../../database/TypesenseSchemas.js' import { SupportedNetwork } from '../../../@types/blockchain.js' import { getAdminAddresses } from '../../../utils/auth.js' +import HumanHasher from 'humanhash' const supportedStorageTypes: StorageTypes = { url: true, @@ -106,9 +107,11 @@ export async function status( // no previous status? if (!nodeStatus) { + const publicKeyHex = Buffer.from(config.keys.publicKey).toString('hex') nodeStatus = { id: nodeId && nodeId !== undefined ? nodeId : config.keys.peerId.toString(), // get current node ID - publicKey: Buffer.from(config.keys.publicKey).toString('hex'), + publicKey: publicKeyHex, + friendlyName: new HumanHasher().humanize(publicKeyHex), address: config.keys.ethAddress, version: process.env.npm_package_version, http: config.hasHttp, diff --git a/src/components/database/C2DDatabase.ts b/src/components/database/C2DDatabase.ts index 842c9a29f..b7b57c897 100644 --- a/src/components/database/C2DDatabase.ts +++ b/src/components/database/C2DDatabase.ts @@ -9,7 +9,7 @@ import { AbstractDatabase } from './BaseDatabase.js' import { OceanNode } from '../../OceanNode.js' import { getDatabase } from '../../utils/database.js' import { getConfiguration } from '../../utils/index.js' - +import { generateUniqueID } from '../core/compute/utils.js' export class C2DDatabase extends AbstractDatabase { private provider: SQLiteCompute @@ -32,6 +32,7 @@ export class C2DDatabase extends AbstractDatabase { } async newJob(job: DBComputeJob): Promise { + if (!job.jobId) job.jobId = generateUniqueID(job) const jobId = await this.provider.newJob(job) return jobId } diff --git a/src/components/database/ElasticSearchDatabase.ts b/src/components/database/ElasticSearchDatabase.ts index b92155863..9fb75a060 100644 --- a/src/components/database/ElasticSearchDatabase.ts +++ b/src/components/database/ElasticSearchDatabase.ts @@ -503,37 +503,6 @@ export class ElasticsearchDdoDatabase extends AbstractDdoDatabase { return schema } - async validateDDO(ddo: Record): Promise { - const ddoInstance = DDOManager.getDDOClass(ddo) - const ddoData = ddoInstance.getDDOData() - if ('indexedMetadata' in ddoData && ddoData?.indexedMetadata?.nft?.state !== 0) { - // Skipping validation for short DDOs as it currently doesn't work - // TODO: DDO validation needs to be updated to consider the fields required by the schema - // See github issue: https://github.com/oceanprotocol/ocean-node/issues/256 - return true - } - - const validation = await ddoInstance.validate() - if (validation[0] === true) { - DATABASE_LOGGER.logMessageWithEmoji( - `Validation of DDO with did: ${ddo.id} has passed`, - true, - GENERIC_EMOJIS.EMOJI_OCEAN_WAVE, - LOG_LEVELS_STR.LEVEL_INFO - ) - return true - } else { - DATABASE_LOGGER.logMessageWithEmoji( - `Validation of DDO with schema version ${ddo.version} failed with errors: ` + - JSON.stringify(validation[1]), - true, - GENERIC_EMOJIS.EMOJI_CROSS_MARK, - LOG_LEVELS_STR.LEVEL_ERROR - ) - return false - } - } - async search(query: Record): Promise { const results = [] const maxPerPage = query.size || 100 diff --git a/src/components/database/sqliteCompute.ts b/src/components/database/sqliteCompute.ts index f968e3c9c..ad65e4b65 100644 --- a/src/components/database/sqliteCompute.ts +++ b/src/components/database/sqliteCompute.ts @@ -7,7 +7,6 @@ import { } from '../../@types/C2D/C2D.js' import sqlite3, { RunResult } from 'sqlite3' import { DATABASE_LOGGER } from '../../utils/logging/common.js' -import { createHash } from 'crypto' interface ComputeDatabaseProvider { newJob(job: DBComputeJob): Promise @@ -18,16 +17,6 @@ interface ComputeDatabaseProvider { getFinishedJobs(): Promise } -export function generateUniqueID(jobStructure: any): string { - const timestamp = - BigInt(Date.now()) * 1_000_000n + (process.hrtime.bigint() % 1_000_000n) - const random = Math.random() - const jobId = createHash('sha256') - .update(JSON.stringify(jobStructure) + timestamp.toString() + random.toString()) - .digest('hex') - return jobId -} - function getInternalStructure(job: DBComputeJob): any { const internalBlob = { clusterHash: job.clusterHash, @@ -46,7 +35,8 @@ function getInternalStructure(job: DBComputeJob): any { algoStartTimestamp: job.algoStartTimestamp, algoStopTimestamp: job.algoStopTimestamp, metadata: job.metadata, - additionalViewers: job.additionalViewers + additionalViewers: job.additionalViewers, + terminationDetails: job.terminationDetails } return internalBlob } @@ -146,25 +136,6 @@ export class SQLiteCompute implements ComputeDatabaseProvider { ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?); ` - let jobId: string - if (!job.jobId) { - const jobStructure = { - assets: job.assets, - algorithm: job.algorithm, - output: {}, - environment: job.environment, - owner: job.owner, - maxJobDuration: job.maxJobDuration, - chainId: job.payment?.chainId || null, - agreementId: job.agreementId, - resources: job.resources, - metadata: job.metadata - } - jobId = generateUniqueID(jobStructure) - job.jobId = jobId - } else { - jobId = job.jobId - } return new Promise((resolve, reject) => { this.db.run( @@ -172,7 +143,7 @@ export class SQLiteCompute implements ComputeDatabaseProvider { [ job.owner, job.did, - jobId, + job.jobId, job.dateCreated || String(Date.now() / 1000), // seconds from epoch, job.status || C2DStatusNumber.JobStarted, job.statusText || C2DStatusText.JobStarted, @@ -188,8 +159,8 @@ export class SQLiteCompute implements ComputeDatabaseProvider { DATABASE_LOGGER.error('Could not insert C2D job on DB: ' + err.message) reject(err) } else { - DATABASE_LOGGER.info('Successfully inserted job with id:' + jobId) - resolve(jobId) + DATABASE_LOGGER.info('Successfully inserted job with id:' + job.jobId) + resolve(job.jobId) } } ) @@ -329,7 +300,7 @@ export class SQLiteCompute implements ComputeDatabaseProvider { }) resolve(filtered) } else { - // DATABASE_LOGGER.info('Could not find any running C2D jobs!') + DATABASE_LOGGER.info('Could not find any running C2D jobs!') resolve([]) } } diff --git a/src/helpers/scripts/setupNodeEnv.sh b/src/helpers/scripts/setupNodeEnv.sh index 0e2bc572f..4ecf02edd 100755 --- a/src/helpers/scripts/setupNodeEnv.sh +++ b/src/helpers/scripts/setupNodeEnv.sh @@ -255,7 +255,7 @@ if [ "$enable_compute" == 'y' ]; then echo " You can customize this in your .env file for production use." echo "" - DOCKER_COMPUTE_ENV="[{\"socketPath\":\"/var/run/docker.sock\",\"resources\":[{\"id\":\"disk\",\"total\":1000000000}],\"storageExpiry\":604800,\"maxJobDuration\":36000,\"fees\":{\"1\":[{\"feeToken\":\"0x123\",\"prices\":[{\"id\":\"cpu\",\"price\":1}]}]},\"free\":{\"maxJobDuration\":360000,\"maxJobs\":3,\"resources\":[{\"id\":\"cpu\",\"max\":1},{\"id\":\"ram\",\"max\":1000000000},{\"id\":\"disk\",\"max\":1000000000}]}}]" + DOCKER_COMPUTE_ENV="[{\"socketPath\":\"/var/run/docker.sock\",\"resources\":[{\"id\":\"disk\",\"total\":10}],\"storageExpiry\":604800,\"maxJobDuration\":36000,\"fees\":{\"1\":[{\"feeToken\":\"0x123\",\"prices\":[{\"id\":\"cpu\",\"price\":1}]}]},\"free\":{\"maxJobDuration\":360000,\"maxJobs\":3,\"resources\":[{\"id\":\"cpu\",\"max\":1},{\"id\":\"ram\",\"max\":1},{\"id\":\"disk\",\"max\":1}]}}]" REPLACE_STR="DOCKER_COMPUTE_ENVIRONMENTS='$DOCKER_COMPUTE_ENV'" if [ "$(uname)" == "Darwin" ]; then diff --git a/src/test/.env.test2 b/src/test/.env.test2 new file mode 100644 index 000000000..8f16fafb8 --- /dev/null +++ b/src/test/.env.test2 @@ -0,0 +1,2 @@ +PRIVATE_KEY='0xc594c6e5def4bab63ac29eed19a134c130388f74f019bc74b8f4389df2837a58' +CONFIG_PATH="$HOME/config.json" diff --git a/src/test/config.json b/src/test/config.json new file mode 100644 index 000000000..95d0a5a77 --- /dev/null +++ b/src/test/config.json @@ -0,0 +1,96 @@ +{ + "authorizedDecrypters": [], + "authorizedDecryptersList": [], + "allowedValidators": [], + "allowedValidatorsList": [], + "authorizedPublishers": [], + "authorizedPublishersList": [], + "keys": {}, + "hasIndexer": true, + "hasHttp": true, + "hasP2P": true, + "p2pConfig": { + "bootstrapNodes": [], + "bootstrapTimeout": 20000, + "bootstrapTagName": "bootstrap", + "bootstrapTagValue": 50, + "bootstrapTTL": 0, + "enableIPV4": true, + "enableIPV6": true, + "ipV4BindAddress": "0.0.0.0", + "ipV4BindTcpPort": 8000, + "ipV4BindWsPort": 0, + "ipV6BindAddress": "::1", + "ipV6BindTcpPort": 0, + "ipV6BindWsPort": 0, + "announceAddresses": [], + "pubsubPeerDiscoveryInterval": 10000, + "dhtMaxInboundStreams": 500, + "dhtMaxOutboundStreams": 500, + "dhtFilter": null, + "mDNSInterval": 20000, + "connectionsMaxParallelDials": 15, + "connectionsDialTimeout": 30000, + "upnp": true, + "autoNat": true, + "enableCircuitRelayServer": false, + "enableCircuitRelayClient": false, + "circuitRelays": 0, + "announcePrivateIp": false, + "filterAnnouncedAddresses": [ + "127.0.0.0/8", + "10.0.0.0/8", + "172.16.0.0/12", + "192.168.0.0/16", + "100.64.0.0/10", + "169.254.0.0/16", + "192.0.0.0/24", + "192.0.2.0/24", + "198.51.100.0/24", + "203.0.113.0/24", + "224.0.0.0/4", + "240.0.0.0/4" + ], + "minConnections": 1, + "maxConnections": 300, + "autoDialPeerRetryThreshold": 7200000, + "autoDialConcurrency": 5, + "maxPeerAddrsToDial": 5, + "autoDialInterval": 5000, + "enableNetworkStats": false + }, + "hasControlPanel": true, + "httpPort": 8001, + "dbConfig": { + "url": "http://localhost:9200", + "username": "", + "password": "", + "dbType": "elasticsearch" + }, + "supportedNetworks": { + "8996": { + "rpc": "http://127.0.0.1:8545", + "chainId": 8996, + "network": "development", + "chunkSize": 100 + } + }, + "indexingNetworks": [ + 8996 + ], + "feeStrategy": {}, + "c2dClusters": [], + "c2dNodeUri": "", + "accountPurgatoryUrl": "", + "assetPurgatoryUrl": "", + "allowedAdmins": [], + "allowedAdminsList": [], + "rateLimit": {}, + "maxConnections": 300, + "denyList": [], + "unsafeURLs": [], + "isBootstrap": false, + "claimDurationTimeout": 600, + "validateUnsignedDDO": true, + "jwtSecret": "ocean-node-secret" +} \ No newline at end of file diff --git a/src/test/data/commands.ts b/src/test/data/commands.ts index b9a0156da..af0ab7a38 100644 --- a/src/test/data/commands.ts +++ b/src/test/data/commands.ts @@ -1,6 +1,6 @@ export const freeComputeStartPayload = { command: 'freeStartCompute', - consumerAddress: '0xC7EC1970B09224B317c52d92f37F5e1E4fF6B687', + consumerAddress: '0xeB5ae11175008E8f178d57d0152678a863FbB887', environment: '', nonce: '1', signature: '0x123', diff --git a/src/test/integration/algorithmsAccess.test.ts b/src/test/integration/algorithmsAccess.test.ts index 8682d7957..45ed7e0bd 100644 --- a/src/test/integration/algorithmsAccess.test.ts +++ b/src/test/integration/algorithmsAccess.test.ts @@ -99,11 +99,11 @@ describe('Trusted algorithms Flow', () => { '0xc594c6e5def4bab63ac29eed19a134c130388f74f019bc74b8f4389df2837a58', JSON.stringify(['0xe2DD09d719Da89e5a3D0F2549c7E24566e947260']), `${homedir}/.ocean/ocean-contracts/artifacts/address.json`, - '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":1000000000}],"storageExpiry":604800,"maxJobDuration":3600,"fees":{"' + + '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"storageExpiry":604800,"maxJobDuration":3600,"fees":{"' + DEVELOPMENT_CHAIN_ID + '":[{"feeToken":"' + paymentToken + - '","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1000000000},{"id":"disk","max":1000000000}]}}]' + '","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]' ] ) ) diff --git a/src/test/integration/compute.test.ts b/src/test/integration/compute.test.ts index eab2a6fa4..ebdb0d2d5 100644 --- a/src/test/integration/compute.test.ts +++ b/src/test/integration/compute.test.ts @@ -145,11 +145,11 @@ describe('Compute', () => { '0xc594c6e5def4bab63ac29eed19a134c130388f74f019bc74b8f4389df2837a58', JSON.stringify(['0xe2DD09d719Da89e5a3D0F2549c7E24566e947260']), `${homedir}/.ocean/ocean-contracts/artifacts/address.json`, - '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":1000000000}],"storageExpiry":604800,"maxJobDuration":3600,"fees":{"' + + '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"storageExpiry":604800,"maxJobDuration":3600,"fees":{"' + DEVELOPMENT_CHAIN_ID + '":[{"feeToken":"' + paymentToken + - '","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1000000000},{"id":"disk","max":1000000000}]}}]' + '","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]' ] ) ) @@ -1081,14 +1081,7 @@ describe('Compute', () => { const response = await handler.handle(command) assert(response.status.httpStatus === 500, 'Failed to get 500 response') assert(response.stream === null, 'Should not get stream') - assert( - response.status.error.includes( - freeComputeStartPayload.algorithm.meta.container.image - ), - 'Should have image error' - ) }) - // algo and checksums related describe('C2D algo and checksums related', () => { it('should publish AlgoDDO', async () => { @@ -1270,6 +1263,7 @@ describe('Compute', () => { expect(algoChecksums.container).to.equal( 'ba8885fcc7d366f058d6c3bb0b7bfe191c5f85cb6a4ee3858895342436c23504' ) + expect(algoChecksums.serviceId).to.equal(algoDDOTest.services[0].id) } else expect(expectedTimeoutFailure(this.test.title)).to.be.equal(wasTimeout) }) diff --git a/src/test/integration/credentials.test.ts b/src/test/integration/credentials.test.ts index b1c18ab49..1ee0728f8 100644 --- a/src/test/integration/credentials.test.ts +++ b/src/test/integration/credentials.test.ts @@ -125,11 +125,11 @@ describe('[Credentials Flow] - Should run a complete node flow.', () => { await publisherAccount.getAddress() // signer 0 ]), `${homedir}/.ocean/ocean-contracts/artifacts/address.json`, - '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":1000000000}],"storageExpiry":604800,"maxJobDuration":3600,"fees":{"' + + '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"storageExpiry":604800,"maxJobDuration":3600,"fees":{"' + DEVELOPMENT_CHAIN_ID + '":[{"feeToken":"' + paymentToken + - '","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1000000000},{"id":"disk","max":1000000000}]}}]' + '","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]' ] ) ) diff --git a/src/test/unit/compute.test.ts b/src/test/unit/compute.test.ts index 8ef3ffae3..bc53cf9cf 100644 --- a/src/test/unit/compute.test.ts +++ b/src/test/unit/compute.test.ts @@ -8,7 +8,7 @@ import { ComputeAlgorithm, ComputeAsset, // ComputeEnvironment, - ComputeJob, + // ComputeJob, DBComputeJob, RunningPlatform } from '../../@types/C2D/C2D.js' @@ -29,8 +29,8 @@ import { } from '../utils/utils.js' import { OceanNodeConfig } from '../../@types/OceanNode.js' import { ENVIRONMENT_VARIABLES } from '../../utils/constants.js' -import { completeDBComputeJob, dockerImageManifest } from '../data/assets.js' -import { omitDBComputeFieldsFromComputeJob } from '../../components/c2d/index.js' +import { dockerImageManifest } from '../data/assets.js' +// import { omitDBComputeFieldsFromComputeJob } from '../../components/c2d/index.js' import { checkManifestPlatform } from '../../components/c2d/compute_engine_docker.js' describe('Compute Jobs Database', () => { @@ -52,7 +52,7 @@ describe('Compute Jobs Database', () => { envOverrides = buildEnvOverrideConfig( [ENVIRONMENT_VARIABLES.DOCKER_COMPUTE_ENVIRONMENTS], [ - '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":1000000000}],"storageExpiry":604800,"maxJobDuration":3600,"fees":{"1":[{"feeToken":"0x123","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1000000000},{"id":"disk","max":1000000000}]}}]' + '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"storageExpiry":604800,"maxJobDuration":3600,"fees":{"1":[{"feeToken":"0x123","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]' ] ) envOverrides = await setupEnvironment(TEST_ENV_CONFIG_FILE, envOverrides) @@ -197,25 +197,27 @@ describe('Compute Jobs Database', () => { expect(convertStringToArray(str)).to.deep.equal(expectedArray) }) - it('should convert DBComputeJob to ComputeJob and omit internal DB data', () => { - const source: any = completeDBComputeJob - const output: ComputeJob = omitDBComputeFieldsFromComputeJob(source as DBComputeJob) - - expect(Object.prototype.hasOwnProperty.call(output, 'clusterHash')).to.be.equal(false) - expect(Object.prototype.hasOwnProperty.call(output, 'configlogURL')).to.be.equal( - false - ) - expect(Object.prototype.hasOwnProperty.call(output, 'publishlogURL')).to.be.equal( - false - ) - expect(Object.prototype.hasOwnProperty.call(output, 'algologURL')).to.be.equal(false) - expect(Object.prototype.hasOwnProperty.call(output, 'outputsURL')).to.be.equal(false) - expect(Object.prototype.hasOwnProperty.call(output, 'isRunning')).to.be.equal(false) - expect(Object.prototype.hasOwnProperty.call(output, 'isStarted')).to.be.equal(false) - expect(Object.prototype.hasOwnProperty.call(output, 'containerImage')).to.be.equal( - false - ) - }) + // it('should convert DBComputeJob to ComputeJob and omit internal DB data', () => { + // const source: any = completeDBComputeJob + // const output: ComputeJob = omitDBComputeFieldsFromComputeJob(source as DBComputeJob) + // console.log('output: ', JSON.stringify(output, null, 2)) + // expect(Object.prototype.hasOwnProperty.call(output, 'clusterHash')).to.be.equal(false) + // expect(Object.prototype.hasOwnProperty.call(output, 'configlogURL')).to.be.equal( + // false + // ) + // expect(Object.prototype.hasOwnProperty.call(output, 'publishlogURL')).to.be.equal( + // false + // ) + // expect(Object.prototype.hasOwnProperty.call(output, 'algologURL')).to.be.equal(false) + // expect(Object.prototype.hasOwnProperty.call(output, 'outputsURL')).to.be.equal(false) + // expect(Object.prototype.hasOwnProperty.call(output, 'algorithm')).to.be.equal(false) + // expect(Object.prototype.hasOwnProperty.call(output, 'assets')).to.be.equal(false) + // expect(Object.prototype.hasOwnProperty.call(output, 'isRunning')).to.be.equal(false) + // expect(Object.prototype.hasOwnProperty.call(output, 'isStarted')).to.be.equal(false) + // expect(Object.prototype.hasOwnProperty.call(output, 'containerImage')).to.be.equal( + // false + // ) + // }) it('should check manifest platform against local platform env', () => { const arch = os.machine() // ex: arm diff --git a/src/test/unit/config.test.ts b/src/test/unit/config.test.ts new file mode 100644 index 000000000..fb83efbb6 --- /dev/null +++ b/src/test/unit/config.test.ts @@ -0,0 +1,62 @@ +import { expect } from 'chai' +import { OceanNodeConfig } from '../../@types/OceanNode.js' +import { getConfiguration, loadConfigFromEnv } from '../../utils/config.js' +import { + OverrideEnvConfig, + TEST_ENV_CONFIG_PATH, + buildEnvOverrideConfig, + setupEnvironment +} from '../utils/utils.js' +import { ENVIRONMENT_VARIABLES } from '../../utils/constants.js' + +let config: OceanNodeConfig +describe('Should validate configuration from JSON', () => { + let envOverrides: OverrideEnvConfig[] + before(async () => { + envOverrides = buildEnvOverrideConfig( + [ENVIRONMENT_VARIABLES.DB_TYPE, ENVIRONMENT_VARIABLES.DB_URL], + ['typesense', 'http://localhost:8108/?apiKey=xyz'] + ) + envOverrides = await setupEnvironment(TEST_ENV_CONFIG_PATH, envOverrides) + config = await getConfiguration(true) + }) + + it('should get indexer networks from config', () => { + expect(config.indexingNetworks.length).to.be.equal(1) + expect(config.indexingNetworks[0]).to.be.equal(8996) + expect(config.supportedNetworks['8996'].chainId).to.be.equal(8996) + expect(config.supportedNetworks['8996'].rpc).to.be.equal('http://127.0.0.1:8545') + expect(config.supportedNetworks['8996'].network).to.be.equal('development') + expect(config.supportedNetworks['8996'].chunkSize).to.be.equal(100) + }) + + it('should have indexer', () => { + expect(config.hasIndexer).to.be.equal(true) + expect(config.dbConfig).to.not.be.equal(null) + // it is exported in the env vars, so it should overwrite the config.json + expect(config.dbConfig.dbType).to.be.equal('typesense') + const configFile = loadConfigFromEnv() + expect(config.dbConfig.dbType).to.not.be.equal(configFile.dbConfig.dbType) + expect(config.dbConfig.url).to.be.equal('http://localhost:8108/?apiKey=xyz') + }) + + it('should have HTTP', () => { + expect(config.hasHttp).to.be.equal(true) + expect(config.httpPort).to.be.equal(8001) + }) + + it('should have P2P', () => { + expect(config.hasP2P).to.be.equal(true) + expect(config.p2pConfig).to.not.be.equal(null) + expect(config.p2pConfig.bootstrapNodes).to.not.be.equal(null) + expect(config.p2pConfig.bootstrapNodes.length).to.be.equal(0) + }) + it('should have defaults set', () => { + expect(config.isBootstrap).to.be.equal(false) + expect(config.validateUnsignedDDO).to.be.equal(true) + }) + after(() => { + delete process.env.CONFIG_PATH + delete process.env.PRIVATE_KEY + }) +}) diff --git a/src/test/unit/crypt.test.ts b/src/test/unit/crypt.test.ts index d6f1f10ad..5624ee001 100644 --- a/src/test/unit/crypt.test.ts +++ b/src/test/unit/crypt.test.ts @@ -1,8 +1,28 @@ import { expect } from 'chai' import { decrypt, encrypt } from '../../utils/crypt.js' import { EncryptMethod } from '../../@types/fileObject.js' +import { + buildEnvOverrideConfig, + OverrideEnvConfig, + setupEnvironment, + tearDownEnvironment, + TEST_ENV_CONFIG_FILE +} from '../utils/utils.js' +import { ENVIRONMENT_VARIABLES } from '../../utils/constants.js' +import { homedir } from 'os' describe('crypt', () => { + let envOverrides: OverrideEnvConfig[] + before(async () => { + envOverrides = buildEnvOverrideConfig( + [ENVIRONMENT_VARIABLES.RPCS, ENVIRONMENT_VARIABLES.ADDRESS_FILE], + [ + '{ "8996":{ "rpc":"http://172.0.0.1:8545", "chainId": 8996, "network": "development", "chunkSize": 100 }}', + `${homedir}/.ocean/ocean-contracts/artifacts/address.json` + ] + ) + envOverrides = await setupEnvironment(TEST_ENV_CONFIG_FILE, envOverrides) + }) it('should encrypt/decrypt AES', async () => { const data = Uint8Array.from(Buffer.from('ocean')) const encryptedData = await encrypt(data, EncryptMethod.AES) @@ -15,4 +35,7 @@ describe('crypt', () => { const decryptedData = await decrypt(encryptedData, EncryptMethod.ECIES) expect(Uint8Array.from(decryptedData)).to.deep.equal(data) }) + after(async () => { + await tearDownEnvironment(envOverrides) + }) }) diff --git a/src/test/unit/indexer/indexer.test.ts b/src/test/unit/indexer/indexer.test.ts index d255d4416..2b2ae6178 100644 --- a/src/test/unit/indexer/indexer.test.ts +++ b/src/test/unit/indexer/indexer.test.ts @@ -91,4 +91,8 @@ describe('OceanIndexer', () => { await tearDownEnvironment(envOverrides) sandbox.restore() }) + after(async () => { + await tearDownEnvironment(envOverrides) + sandbox.restore() + }) }) diff --git a/src/test/utils/utils.ts b/src/test/utils/utils.ts index dfcacfee3..8376d6ae1 100644 --- a/src/test/utils/utils.ts +++ b/src/test/utils/utils.ts @@ -13,6 +13,7 @@ const __dirname = path.dirname(__filename) // relative to test/utils (default value, but can use other paths) export const TEST_ENV_CONFIG_FILE = '../.env.test' +export const TEST_ENV_CONFIG_PATH = '../.env.test2' // use this if we need to override the default configuration while testing export interface OverrideEnvConfig { name: string // name of the var diff --git a/src/utils/config.ts b/src/utils/config.ts index 99c1dbfff..7c10b95ce 100644 --- a/src/utils/config.ts +++ b/src/utils/config.ts @@ -35,7 +35,129 @@ import { create256Hash } from './crypt.js' import { isDefined } from './util.js' import { fileURLToPath } from 'url' import path from 'path' +import fs from 'fs' +import os from 'os' +import { z } from 'zod' +const AccessListContractSchema = z.any() +const OceanNodeKeysSchema = z.any() + +const OceanNodeDBConfigSchema = z.any() +const FeeStrategySchema = z.any() +const RPCSSchema = z.any() +const C2DClusterInfoSchema = z.any() +const DenyListSchema = z.any() + +const OceanNodeP2PConfigSchema = z.object({ + bootstrapNodes: z.array(z.string()).optional().default(defaultBootstrapAddresses), + bootstrapTimeout: z.number().int().optional().default(2000), + bootstrapTagName: z.string().optional().default('bootstrap'), + bootstrapTagValue: z.number().int().optional().default(50), + enableIPV4: z.boolean().optional().default(true), + enableIPV6: z.boolean().optional().default(true), + ipV4BindAddress: z.string().optional().default('0.0.0.0'), + ipV4BindTcpPort: z.number().int().optional().default(0), + ipV4BindWsPort: z.number().int().optional().default(0), + ipV6BindAddress: z.string().optional().default('::1'), + ipV6BindTcpPort: z.number().int().optional().default(0), + ipV6BindWsPort: z.number().int().optional().default(0), + pubsubPeerDiscoveryInterval: z.number().int().optional().default(1000), + dhtMaxInboundStreams: z.number().int().optional().default(500), + dhtMaxOutboundStreams: z.number().int().optional().default(500), + mDNSInterval: z.number().int().optional().default(20e3), + connectionsMaxParallelDials: z.number().int().optional().default(15), + connectionsDialTimeout: z.number().int().optional().default(30e3), + upnp: z.boolean().optional().default(true), + autoNat: z.boolean().optional().default(true), + enableCircuitRelayServer: z.boolean().optional().default(false), + enableCircuitRelayClient: z.boolean().optional().default(false), + circuitRelays: z.number().int().optional().default(0), + announcePrivateIp: z.boolean().optional().default(false), + filterAnnouncedAddresses: z + .array(z.string()) + .optional() + .default([ + '127.0.0.0/8', + '10.0.0.0/8', + '172.16.0.0/12', + '192.168.0.0/16', + '100.64.0.0/10', + '169.254.0.0/16', + '192.0.0.0/24', + '192.0.2.0/24', + '198.51.100.0/24', + '203.0.113.0/24', + '224.0.0.0/4', + '240.0.0.0/4' + ]), + minConnections: z.number().int().optional().default(1), + maxConnections: z.number().int().optional().default(300), + autoDialPeerRetryThreshold: z.number().int().optional().default(120000), + autoDialConcurrency: z.number().int().optional().default(5), + maxPeerAddrsToDial: z.number().int().optional().default(5), + autoDialInterval: z.number().int().optional().default(5000), + enableNetworkStats: z.boolean().optional().default(false) +}) + +export const OceanNodeConfigSchema = z.object({ + authorizedDecrypters: z.array(z.string()), + authorizedDecryptersList: AccessListContractSchema.nullable(), + allowedValidators: z.array(z.string()), + allowedValidatorsList: AccessListContractSchema.nullable(), + authorizedPublishers: z.array(z.string()), + authorizedPublishersList: AccessListContractSchema.nullable(), + + keys: OceanNodeKeysSchema, + + hasP2P: z.boolean(), + p2pConfig: OceanNodeP2PConfigSchema.nullable(), + hasIndexer: z.boolean(), + hasHttp: z.boolean(), + hasControlPanel: z.boolean(), + + dbConfig: OceanNodeDBConfigSchema.optional(), + + httpPort: z.number().int(), + rateLimit: z.union([z.number(), z.object({})]).optional(), + feeStrategy: FeeStrategySchema, + + supportedNetworks: RPCSSchema.optional(), + + claimDurationTimeout: z.number().int().default(600), + indexingNetworks: RPCSSchema.optional(), + + c2dClusters: z.array(C2DClusterInfoSchema), + c2dNodeUri: z.string(), + accountPurgatoryUrl: z.string(), + assetPurgatoryUrl: z.string(), + + allowedAdmins: z.array(z.string()).optional(), + allowedAdminsList: AccessListContractSchema.nullable().optional(), + + codeHash: z.string().optional(), + maxConnections: z.number().optional(), + denyList: DenyListSchema.optional(), + unsafeURLs: z.array(z.string()).optional().default([ + // AWS and GCP + '^.*(169.254.169.254).*', + // GCP + '^.*(metadata.google.internal).*', + '^.*(http://metadata).*', + // Azure + '^.*(http://169.254.169.254).*', + // Oracle Cloud + '^.*(http://192.0.0.192).*', + // Alibaba Cloud + '^.*(http://100.100.100.200).*', + // k8s ETCD + '^.*(127.0.0.1).*' + ]), + isBootstrap: z.boolean().optional().default(false), + validateUnsignedDDO: z.boolean().optional().default(true), + jwtSecret: z.string().optional() +}) + +export type OceanNodeConfigParsed = z.infer // usefull for lazy loading and avoid boilerplate on other places let previousConfiguration: OceanNodeConfig = null @@ -663,7 +785,11 @@ export async function getConfiguration( isStartup: boolean = false ): Promise { if (!previousConfiguration || forceReload) { - previousConfiguration = await getEnvConfig(isStartup) + if (!existsEnvironmentVariable(ENVIRONMENT_VARIABLES.CONFIG_PATH)) { + previousConfiguration = await getEnvConfig(isStartup) + } else { + previousConfiguration = buildMergedConfig() + } } if (!previousConfiguration.codeHash) { const __filename = fileURLToPath(import.meta.url) @@ -674,6 +800,296 @@ export async function getConfiguration( return previousConfiguration } +export function loadConfigFromEnv(envVar: string = 'CONFIG_PATH'): OceanNodeConfig { + let configPath = process.env[envVar] + if (!configPath) { + if (!fs.existsSync(path.join(process.cwd(), 'config.json'))) { + throw new Error( + `Config file not found. Neither environment variable "${envVar}" is set nor does ${configPath} exist.` + ) + } + configPath = path.join(process.cwd(), 'config.json') + } + // Expand $HOME if present + if (configPath.startsWith('$HOME')) { + const home = process.env.HOME || os.homedir() + if (!home) { + throw new Error( + `"${envVar}" contains $HOME but HOME is not set in the environment.` + ) + } + configPath = path.join(home, configPath.slice('$HOME'.length)) + } + + if (!path.isAbsolute(configPath)) { + throw new Error( + `Environment variable "${envVar}" must be an absolute path. Got: ${configPath}` + ) + } + + if (!fs.existsSync(configPath)) { + throw new Error(`Config file not found at path: ${configPath}`) + } + + const privateKey = process.env.PRIVATE_KEY + if (!privateKey || privateKey.length !== 66) { + // invalid private key + CONFIG_LOGGER.logMessageWithEmoji( + 'Invalid PRIVATE_KEY env variable..', + true, + GENERIC_EMOJIS.EMOJI_CROSS_MARK, + LOG_LEVELS_STR.LEVEL_ERROR + ) + return null + } + + const rawData = fs.readFileSync(configPath, 'utf-8') + let config: OceanNodeConfig + + try { + config = JSON.parse(rawData) + } catch (err) { + throw new Error(`Invalid JSON in config file: ${configPath}. Error: ${err.message}`) + } + if (!previousConfiguration) { + previousConfiguration = config + } else if (configChanged(previousConfiguration, config)) { + CONFIG_LOGGER.warn( + 'Detected Ocean Node Configuration change... This might have unintended effects' + ) + } + return config +} + +const parseJsonEnv = (env: string | undefined, fallback: T): T => { + try { + return env ? JSON.parse(env) : fallback + } catch { + return fallback + } +} + +export function buildMergedConfig(): OceanNodeConfig { + const baseConfig = loadConfigFromEnv() + + let dhtFilterOption + switch (parseInt(process.env.P2P_DHT_FILTER, 0)) { + case 1: + dhtFilterOption = dhtFilterMethod.filterPrivate + break + case 2: + dhtFilterOption = dhtFilterMethod.filterPublic + break + default: + dhtFilterOption = dhtFilterMethod.filterNone + } + + const privateKey = process.env.PRIVATE_KEY + if (!privateKey || privateKey.length !== 66) { + // invalid private key + CONFIG_LOGGER.logMessageWithEmoji( + 'Invalid PRIVATE_KEY env variable..', + true, + GENERIC_EMOJIS.EMOJI_CROSS_MARK, + LOG_LEVELS_STR.LEVEL_ERROR + ) + return null + } + + const overrides: Partial = { + ...(process.env.JWT_SECRET && { jwtSecret: process.env.JWT_SECRET }), + ...(process.env.DB_URL && { + dbConfig: { + url: process.env.DB_URL, + username: process.env.DB_USERNAME ?? baseConfig.dbConfig?.username ?? '', + password: process.env.DB_PASSWORD ?? baseConfig.dbConfig?.password ?? '', + dbType: process.env.DB_TYPE ?? baseConfig.dbConfig?.dbType ?? 'elasticsearch' + } + }), + authorizedDecrypters: process.env.AUTHORIZED_DECRYPTERS + ? getAuthorizedDecrypters(true) + : baseConfig.authorizedDecrypters, + + authorizedDecryptersList: process.env.AUTHORIZED_DECRYPTERS_LIST + ? getAuthorizedDecryptersList(true) + : baseConfig.authorizedDecryptersList, + + allowedValidators: process.env.ALLOWED_VALIDATORS + ? getAllowedValidators(true) + : baseConfig.allowedValidators, + + allowedValidatorsList: process.env.ALLOWED_VALIDATORS_LIST + ? getAllowedValidatorsList(true) + : baseConfig.allowedValidatorsList, + + authorizedPublishers: process.env.ALLOWED_ADMINS + ? getAuthorizedPublishers(true) + : baseConfig.authorizedPublishers, + + authorizedPublishersList: process.env.ALLOWED_ADMINS_LIST + ? getAuthorizedPublishersList(true) + : baseConfig.authorizedPublishersList, + + ...(process.env.HTTP_API_PORT && { httpPort: Number(process.env.HTTP_API_PORT) }), + + p2pConfig: { + ...baseConfig.p2pConfig, + + bootstrapNodes: parseJsonEnv( + process.env.P2P_BOOTSTRAP_NODES, + baseConfig.p2pConfig?.bootstrapNodes ?? [] + ), + bootstrapTimeout: process.env.P2P_BOOTSTRAP_TIMEOUT + ? parseInt(process.env.P2P_BOOTSTRAP_TIMEOUT, 10) + : baseConfig.p2pConfig?.bootstrapTimeout, + bootstrapTagName: + process.env.P2P_BOOTSTRAP_TAGNAME ?? baseConfig.p2pConfig?.bootstrapTagName, + bootstrapTagValue: process.env.P2P_BOOTSTRAP_TAGVALUE + ? parseInt(process.env.P2P_BOOTSTRAP_TAGVALUE, 10) + : baseConfig.p2pConfig?.bootstrapTagValue, + bootstrapTTL: process.env.P2P_BOOTSTRAP_TTL + ? parseInt(process.env.P2P_BOOTSTRAP_TTL, 10) + : baseConfig.p2pConfig?.bootstrapTTL, + + enableIPV4: process.env.P2P_ENABLE_IPV4 + ? process.env.P2P_ENABLE_IPV4 === 'true' + : baseConfig.p2pConfig?.enableIPV4, + enableIPV6: process.env.P2P_ENABLE_IPV6 + ? process.env.P2P_ENABLE_IPV6 === 'true' + : baseConfig.p2pConfig?.enableIPV6, + + ipV4BindAddress: + process.env.P2P_IP_V4_BIND_ADDRESS ?? baseConfig.p2pConfig?.ipV4BindAddress, + ipV4BindTcpPort: process.env.P2P_IP_V4_BIND_TCP_PORT + ? parseInt(process.env.P2P_IP_V4_BIND_TCP_PORT, 10) + : baseConfig.p2pConfig?.ipV4BindTcpPort, + ipV4BindWsPort: process.env.P2P_IP_V4_BIND_WS_PORT + ? parseInt(process.env.P2P_IP_V4_BIND_WS_PORT, 10) + : baseConfig.p2pConfig?.ipV4BindWsPort, + + ipV6BindAddress: + process.env.P2P_IP_V6_BIND_ADDRESS ?? baseConfig.p2pConfig?.ipV6BindAddress, + ipV6BindTcpPort: process.env.P2P_IP_V6_BIND_TCP_PORT + ? parseInt(process.env.P2P_IP_V6_BIND_TCP_PORT, 10) + : baseConfig.p2pConfig?.ipV6BindTcpPort, + ipV6BindWsPort: process.env.P2P_IP_V6_BIND_WS_PORT + ? parseInt(process.env.P2P_IP_V6_BIND_WS_PORT, 10) + : baseConfig.p2pConfig?.ipV6BindWsPort, + + announceAddresses: parseJsonEnv( + process.env.P2P_ANNOUNCE_ADDRESSES, + baseConfig.p2pConfig?.announceAddresses ?? [] + ), + pubsubPeerDiscoveryInterval: process.env.P2P_PUBSUB_PEER_DISCOVERY_INTERVAL + ? parseInt(process.env.P2P_PUBSUB_PEER_DISCOVERY_INTERVAL, 10) + : baseConfig.p2pConfig?.pubsubPeerDiscoveryInterval, + + dhtMaxInboundStreams: process.env.P2P_DHT_MAX_INBOUND_STREAMS + ? parseInt(process.env.P2P_DHT_MAX_INBOUND_STREAMS, 10) + : baseConfig.p2pConfig?.dhtMaxInboundStreams, + dhtMaxOutboundStreams: process.env.P2P_DHT_MAX_OUTBOUND_STREAMS + ? parseInt(process.env.P2P_DHT_MAX_OUTBOUND_STREAMS, 10) + : baseConfig.p2pConfig?.dhtMaxOutboundStreams, + dhtFilter: dhtFilterOption ?? baseConfig.p2pConfig?.dhtFilter, + + mDNSInterval: process.env.P2P_MDNS_INTERVAL + ? parseInt(process.env.P2P_MDNS_INTERVAL, 10) + : baseConfig.p2pConfig?.mDNSInterval, + + connectionsMaxParallelDials: process.env.P2P_CONNECTIONS_MAX_PARALLEL_DIALS + ? parseInt(process.env.P2P_CONNECTIONS_MAX_PARALLEL_DIALS, 10) + : baseConfig.p2pConfig?.connectionsMaxParallelDials, + connectionsDialTimeout: process.env.P2P_CONNECTIONS_DIAL_TIMEOUT + ? parseInt(process.env.P2P_CONNECTIONS_DIAL_TIMEOUT, 10) + : baseConfig.p2pConfig?.connectionsDialTimeout, + + upnp: process.env.P2P_ENABLE_UPNP + ? process.env.P2P_ENABLE_UPNP === 'true' + : baseConfig.p2pConfig?.upnp, + autoNat: process.env.P2P_ENABLE_AUTONAT + ? process.env.P2P_ENABLE_AUTONAT === 'true' + : baseConfig.p2pConfig?.autoNat, + + enableCircuitRelayServer: process.env.P2P_ENABLE_CIRCUIT_RELAY_SERVER + ? process.env.P2P_ENABLE_CIRCUIT_RELAY_SERVER === 'true' + : baseConfig.p2pConfig?.enableCircuitRelayServer, + enableCircuitRelayClient: process.env.P2P_ENABLE_CIRCUIT_RELAY_CLIENT + ? process.env.P2P_ENABLE_CIRCUIT_RELAY_CLIENT === 'true' + : baseConfig.p2pConfig?.enableCircuitRelayClient, + + circuitRelays: process.env.P2P_CIRCUIT_RELAYS + ? parseInt(process.env.P2P_CIRCUIT_RELAYS, 10) + : baseConfig.p2pConfig?.circuitRelays, + announcePrivateIp: process.env.P2P_ANNOUNCE_PRIVATE + ? process.env.P2P_ANNOUNCE_PRIVATE === 'true' + : baseConfig.p2pConfig?.announcePrivateIp, + + filterAnnouncedAddresses: parseJsonEnv( + process.env.P2P_FILTER_ANNOUNCED_ADDRESSES, + baseConfig.p2pConfig?.filterAnnouncedAddresses ?? [] + ), + + minConnections: process.env.P2P_MIN_CONNECTIONS + ? parseInt(process.env.P2P_MIN_CONNECTIONS, 10) + : baseConfig.p2pConfig?.minConnections, + maxConnections: process.env.P2P_MAX_CONNECTIONS + ? parseInt(process.env.P2P_MAX_CONNECTIONS, 10) + : baseConfig.p2pConfig?.maxConnections, + + autoDialPeerRetryThreshold: process.env.P2P_AUTODIAL_PEER_RETRY_THRESHOLD + ? parseInt(process.env.P2P_AUTODIAL_PEER_RETRY_THRESHOLD, 10) + : baseConfig.p2pConfig?.autoDialPeerRetryThreshold, + autoDialConcurrency: process.env.P2P_AUTODIAL_CONCURRENCY + ? parseInt(process.env.P2P_AUTODIAL_CONCURRENCY, 10) + : baseConfig.p2pConfig?.autoDialConcurrency, + maxPeerAddrsToDial: process.env.P2P_MAX_PEER_ADDRS_TO_DIAL + ? parseInt(process.env.P2P_MAX_PEER_ADDRS_TO_DIAL, 10) + : baseConfig.p2pConfig?.maxPeerAddrsToDial, + autoDialInterval: process.env.P2P_AUTODIAL_INTERVAL + ? parseInt(process.env.P2P_AUTODIAL_INTERVAL, 10) + : baseConfig.p2pConfig?.autoDialInterval, + + enableNetworkStats: process.env.P2P_ENABLE_NETWORK_STATS + ? process.env.P2P_ENABLE_NETWORK_STATS === 'true' + : baseConfig.p2pConfig?.enableNetworkStats + }, + + ...(process.env.CONTROL_PANEL && { + hasControlPanel: process.env.CONTROL_PANEL !== 'false' + }), + ...(process.env.RPCS && { + supportedNetworks: parseJsonEnv( + process.env.RPCS, + baseConfig.supportedNetworks ?? {} + ) + }), + ...(process.env.C2D_NODE_URI && { c2dNodeUri: process.env.C2D_NODE_URI }), + ...(process.env.ACCOUNT_PURGATORY_URL && { + accountPurgatoryUrl: process.env.ACCOUNT_PURGATORY_URL + }), + ...(process.env.ASSET_PURGATORY_URL && { + assetPurgatoryUrl: process.env.ASSET_PURGATORY_URL + }), + ...(process.env.UNSAFE_URLS && { + unsafeURLs: parseJsonEnv(process.env.UNSAFE_URLS, baseConfig.unsafeURLs ?? []) + }), + ...(process.env.IS_BOOTSTRAP && { isBootstrap: process.env.IS_BOOTSTRAP === 'true' }), + ...(process.env.ESCROW_CLAIM_TIMEOUT && { + claimDurationTimeout: parseInt(process.env.ESCROW_CLAIM_TIMEOUT, 10) + }), + ...(process.env.VALIDATE_UNSIGNED_DDO && { + validateUnsignedDDO: process.env.VALIDATE_UNSIGNED_DDO === 'true' + }) + } + + const merged = { + ...baseConfig, + ...overrides + } + + return OceanNodeConfigSchema.parse(merged) as OceanNodeConfig +} + // we can just use the lazy version above "getConfiguration()" and specify if we want to reload from .env variables async function getEnvConfig(isStartup?: boolean): Promise { const privateKey = process.env.PRIVATE_KEY diff --git a/src/utils/constants.ts b/src/utils/constants.ts index edd438f28..0c4eee128 100644 --- a/src/utils/constants.ts +++ b/src/utils/constants.ts @@ -190,6 +190,11 @@ export const ENVIRONMENT_VARIABLES: Record = { value: process.env.HTTP_API_PORT, required: false }, + CONFIG_PATH: { + name: 'CONFIG_PATH', + value: process.env.CONFIG_PATH, + required: false + }, PRIVATE_KEY: { name: 'PRIVATE_KEY', value: process.env.PRIVATE_KEY, required: true }, // used on test environments (ci) NODE1_PRIVATE_KEY: {