diff --git a/src/components/core/compute/getStatus.ts b/src/components/core/compute/getStatus.ts index 946931286..666289e05 100644 --- a/src/components/core/compute/getStatus.ts +++ b/src/components/core/compute/getStatus.ts @@ -53,9 +53,13 @@ export class ComputeGetStatusHandler extends CommandHandler { } } else { engines = await this.getOceanNode().getC2DEngines().getAllEngines() + CORE_LOGGER.logMessage( + 'ComputeGetStatusCommand: No jobId provided, querying all C2D clusters' + ) } for (const engine of engines) { + CORE_LOGGER.logMessage(`ComputeGetStatusCommand: Querying engine`) const jobs = await engine.getComputeJobStatus( task.consumerAddress, task.agreementId, diff --git a/src/components/core/compute/initialize.ts b/src/components/core/compute/initialize.ts index 0f4bbf2f0..d4c431f67 100644 --- a/src/components/core/compute/initialize.ts +++ b/src/components/core/compute/initialize.ts @@ -33,6 +33,7 @@ import { C2DEngineDocker, getAlgorithmImage } from '../../c2d/compute_engine_doc import { Credentials, DDOManager } from '@oceanprotocol/ddo-js' import { areKnownCredentialTypes, checkCredentials } from '../../../utils/credentials.js' import { PolicyServer } from '../../policyServer/index.js' +import { getAlgoChecksums, validateAlgoForDataset } from './utils.js' export class ComputeInitializeHandler extends CommandHandler { validate(command: ComputeInitializeCommand): ValidateParams { @@ -88,6 +89,28 @@ export class ComputeInitializeHandler extends CommandHandler { } } } + + const algoChecksums = await getAlgoChecksums( + task.algorithm.documentId, + task.algorithm.serviceId, + node + ) + + const isRawCodeAlgorithm = task.algorithm.meta?.rawcode + const hasValidChecksums = algoChecksums.container && algoChecksums.files + + if (!isRawCodeAlgorithm && !hasValidChecksums) { + const errorMessage = + 'Failed to retrieve algorithm checksums. Both container and files checksums are required.' + CORE_LOGGER.error(errorMessage) + return { + stream: null, + status: { + httpStatus: 500, + error: errorMessage + } + } + } if (engine === null) { return { stream: null, @@ -201,7 +224,8 @@ export class ComputeInitializeHandler extends CommandHandler { const { chainId: ddoChainId, nftAddress, - credentials + credentials, + metadata } = ddoInstance.getDDOFields() const isOrdable = isOrderingAllowedForAsset(ddo) if (!isOrdable.isOrdable) { @@ -214,6 +238,30 @@ export class ComputeInitializeHandler extends CommandHandler { } } } + if (metadata.type !== 'algorithm') { + const index = task.datasets.findIndex( + (d) => d.documentId === ddoInstance.getDid() + ) + const safeIndex = index === -1 ? 0 : index + const validAlgoForDataset = await validateAlgoForDataset( + task.algorithm.documentId, + algoChecksums, + ddoInstance, + task.datasets[safeIndex].serviceId, + node + ) + if (!validAlgoForDataset) { + return { + stream: null, + status: { + httpStatus: 400, + error: `Algorithm ${ + task.algorithm.documentId + } not allowed to run on the dataset: ${ddoInstance.getDid()}` + } + } + } + } // check credentials (DDO level) let accessGrantedDDOLevel: boolean if (credentials) { diff --git a/src/test/data/assets.ts b/src/test/data/assets.ts index 2dae5c965..3758f079f 100644 --- a/src/test/data/assets.ts +++ b/src/test/data/assets.ts @@ -190,8 +190,14 @@ export const computeAssetWithCredentials = { compute: { allowRawAlgorithm: false, allowNetworkAccess: true, - publisherTrustedAlgorithmPublishers: [] as any, - publisherTrustedAlgorithms: [] as any + publisherTrustedAlgorithmPublishers: ['*'] as any, + publisherTrustedAlgorithms: [ + { + did: '*', + filesChecksum: '*', + containerSectionChecksum: '*' + } + ] as any } } ], @@ -287,6 +293,78 @@ export const algoAssetWithCredentials = { } export const computeAsset = { + '@context': ['https://w3id.org/did/v1'], + id: '', + nftAddress: '', + version: '4.1.0', + chainId: 8996, + metadata: { + created: '2021-12-20T14:35:20Z', + updated: '2021-12-20T14:35:20Z', + type: 'dataset', + name: 'cli fixed asset', + description: 'asset published using ocean.js cli tool', + tags: ['test'], + author: 'oceanprotocol', + license: 'https://market.oceanprotocol.com/terms', + additionalInformation: { + termsAndConditions: true + } + }, + services: [ + { + id: '1155995dda741e93afe4b1c6ced2d01734a6ec69865cc0997daf1f4db7259a36', + type: 'compute', + files: { + files: [ + { + type: 'url', + url: 'https://raw.githubusercontent.com/oceanprotocol/testdatasets/main/shs_dataset_test.txt', + method: 'GET' + } + ] + }, + datatokenAddress: '', + serviceEndpoint: 'https://v4.provider.oceanprotocol.com', + timeout: 86400, + compute: { + allowRawAlgorithm: false, + allowNetworkAccess: true, + publisherTrustedAlgorithmPublishers: ['*'] as any, + publisherTrustedAlgorithms: [ + { + did: '*', + filesChecksum: '*', + containerSectionChecksum: '*' + } + ] as any + } + } + ], + event: {}, + nft: { + address: '', + name: 'Ocean Data NFT', + symbol: 'OCEAN-NFT', + state: 5, + tokenURI: '', + owner: '', + created: '' + }, + purgatory: { + state: false + }, + datatokens: [] as any, + stats: { + allocated: 0, + orders: 0, + price: { + value: '0' + } + } +} + +export const computeAssetWithNoAccess = { '@context': ['https://w3id.org/did/v1'], id: '', nftAddress: '', diff --git a/src/test/integration/algorithmsAccess.test.ts b/src/test/integration/algorithmsAccess.test.ts index f731187ea..1116e1790 100644 --- a/src/test/integration/algorithmsAccess.test.ts +++ b/src/test/integration/algorithmsAccess.test.ts @@ -28,7 +28,7 @@ import { expectedTimeoutFailure, waitToIndex } from './testUtils.js' import { streamToObject } from '../../utils/util.js' import { ethers, hexlify, JsonRpcProvider, Signer } from 'ethers' import { publishAsset, orderAsset } from '../utils/assets.js' -import { computeAsset, algoAsset } from '../data/assets.js' +import { algoAsset, computeAssetWithNoAccess } from '../data/assets.js' import { RPCS } from '../../@types/blockchain.js' import { DEFAULT_TEST_TIMEOUT, @@ -137,7 +137,10 @@ describe('Trusted algorithms Flow', () => { // let's publish assets & algos it('should publish compute datasets & algos', async function () { this.timeout(DEFAULT_TEST_TIMEOUT * 2) - publishedComputeDataset = await publishAsset(computeAsset, publisherAccount) + publishedComputeDataset = await publishAsset( + computeAssetWithNoAccess, + publisherAccount + ) publishedAlgoDataset = await publishAsset(algoAsset, publisherAccount) const computeDatasetResult = await waitToIndex( publishedComputeDataset.ddo.id, @@ -196,7 +199,7 @@ describe('Trusted algorithms Flow', () => { firstEnv = computeEnvironments[0] }) - it('Initialize compute without orders transaction IDs', async () => { + it('should not initialize compute without orders transaction IDs because algorithm is not trusted by dataset', async () => { const dataset: ComputeAsset = { documentId: publishedComputeDataset.ddo.id, serviceId: publishedComputeDataset.ddo.services[0].id @@ -231,110 +234,15 @@ describe('Trusted algorithms Flow', () => { ) console.log(resp) assert(resp, 'Failed to get response') - assert(resp.status.httpStatus === 200, 'Failed to get 200 response') - assert(resp.stream, 'Failed to get stream') - expect(resp.stream).to.be.instanceOf(Readable) - initializeResponse = (await streamToObject( - resp.stream as Readable - )) as ProviderComputeInitializeResults - }) - - it('should start an order on dataset', async function () { - const orderTxReceipt = await orderAsset( - publishedComputeDataset.ddo, - 0, - consumerAccount, - firstEnv.consumerAddress, // for compute, consumer is always address of compute env - publisherAccount, - oceanNode, - initializeResponse.datasets[0].providerFee - ) - assert(orderTxReceipt, 'order transaction failed') - datasetOrderTxId = orderTxReceipt.hash - assert(datasetOrderTxId, 'transaction id not found') - }) - it('should start an order on algorithm', async function () { - const orderTxReceipt = await orderAsset( - publishedAlgoDataset.ddo, - 0, - consumerAccount, - firstEnv.consumerAddress, // for compute, consumer is always address of compute env - publisherAccount, - oceanNode, - initializeResponse.algorithm.providerFee - ) - assert(orderTxReceipt, 'order transaction failed') - algoOrderTxId = orderTxReceipt.hash - assert(algoOrderTxId, 'transaction id not found') - }) - it('should not start a compute job because algorithm is not trusted by dataset', async () => { - let balance = await paymentTokenContract.balanceOf(await consumerAccount.getAddress()) - const nonce = Date.now().toString() - const message = String( - (await consumerAccount.getAddress()) + publishedComputeDataset.ddo.id + nonce - ) - // sign message/nonce - const consumerMessage = ethers.solidityPackedKeccak256( - ['bytes'], - [ethers.hexlify(ethers.toUtf8Bytes(message))] - ) - const messageHashBytes = ethers.toBeArray(consumerMessage) - const signature = await wallet.signMessage(messageHashBytes) - const startComputeTask: PaidComputeStartCommand = { - command: PROTOCOL_COMMANDS.COMPUTE_START, - consumerAddress: await consumerAccount.getAddress(), - signature, - nonce, - environment: firstEnv.id, - datasets: [ - { - documentId: publishedComputeDataset.ddo.id, - serviceId: publishedComputeDataset.ddo.services[0].id, - transferTxId: datasetOrderTxId - } - ], - algorithm: { - documentId: publishedAlgoDataset.ddo.id, - serviceId: publishedAlgoDataset.ddo.services[0].id, - transferTxId: algoOrderTxId, - meta: publishedAlgoDataset.ddo.metadata.algorithm - }, - output: {}, - payment: { - chainId: DEVELOPMENT_CHAIN_ID, - token: paymentToken - }, - maxJobDuration: computeJobDuration - } - // let's put funds in escrow & create an auth - balance = await paymentTokenContract.balanceOf(await consumerAccount.getAddress()) - await paymentTokenContract - .connect(consumerAccount) - .approve(initializeResponse.payment.escrowAddress, balance) - await escrowContract - .connect(consumerAccount) - .deposit(initializeResponse.payment.token, balance) - await escrowContract - .connect(consumerAccount) - .authorize( - initializeResponse.payment.token, - firstEnv.consumerAddress, - balance, - computeJobDuration, - 10 - ) - - const response = await new PaidComputeStartHandler(oceanNode).handle(startComputeTask) - console.log('response: ', response) - assert(response, 'Failed to get response') - assert(response.status.httpStatus === 400, 'Failed to get 400 response') + assert(resp.status.httpStatus === 400, 'Failed to get 400 response') assert( - response.status.error === + resp.status.error === `Algorithm ${publishedAlgoDataset.ddo.id} not allowed to run on the dataset: ${publishedComputeDataset.ddo.id}`, 'Inconsistent error message' ) - assert(response.stream === null, 'Failed to get stream') + assert(resp.stream === null, 'Failed to get stream') }) + it('should add the algorithm to the dataset trusted algorithm list', async function () { this.timeout(DEFAULT_TEST_TIMEOUT * 5) const algoChecksums = await getAlgoChecksums( @@ -389,7 +297,99 @@ describe('Trusted algorithms Flow', () => { 'Algorithm DID mismatch in trusted algorithms' ) }) + + it('Initialize compute without orders transaction IDs', async () => { + const dataset: ComputeAsset = { + documentId: publishedComputeDataset.ddo.id, + serviceId: publishedComputeDataset.ddo.services[0].id + } + const algorithm: ComputeAlgorithm = { + documentId: publishedAlgoDataset.ddo.id, + serviceId: publishedAlgoDataset.ddo.services[0].id + } + const getEnvironmentsTask = { + command: PROTOCOL_COMMANDS.COMPUTE_GET_ENVIRONMENTS + } + const response = await new ComputeGetEnvironmentsHandler(oceanNode).handle( + getEnvironmentsTask + ) + computeEnvironments = await streamToObject(response.stream as Readable) + firstEnv = computeEnvironments[0] + + const initializeComputeTask: ComputeInitializeCommand = { + datasets: [dataset], + algorithm, + environment: firstEnv.id, + payment: { + chainId: DEVELOPMENT_CHAIN_ID, + token: paymentToken + }, + maxJobDuration: computeJobDuration, + consumerAddress: firstEnv.consumerAddress, + command: PROTOCOL_COMMANDS.COMPUTE_INITIALIZE + } + const resp = await new ComputeInitializeHandler(oceanNode).handle( + initializeComputeTask + ) + console.log(resp) + assert(resp, 'Failed to get response') + assert(resp.status.httpStatus === 200, 'Failed to get 200 response') + assert(resp.stream, 'Failed to get stream') + expect(resp.stream).to.be.instanceOf(Readable) + initializeResponse = (await streamToObject( + resp.stream as Readable + )) as ProviderComputeInitializeResults + }) + + it('should start an order on dataset', async function () { + const orderTxReceipt = await orderAsset( + publishedComputeDataset.ddo, + 0, + consumerAccount, + firstEnv.consumerAddress, // for compute, consumer is always address of compute env + publisherAccount, + oceanNode, + initializeResponse.datasets[0].providerFee + ) + assert(orderTxReceipt, 'order transaction failed') + datasetOrderTxId = orderTxReceipt.hash + assert(datasetOrderTxId, 'transaction id not found') + }) + it('should start an order on algorithm', async function () { + const orderTxReceipt = await orderAsset( + publishedAlgoDataset.ddo, + 0, + consumerAccount, + firstEnv.consumerAddress, // for compute, consumer is always address of compute env + publisherAccount, + oceanNode, + initializeResponse.algorithm.providerFee + ) + assert(orderTxReceipt, 'order transaction failed') + algoOrderTxId = orderTxReceipt.hash + assert(algoOrderTxId, 'transaction id not found') + }) + it('should start a compute job', async () => { + // let's put funds in escrow & create an auth + const balance = await paymentTokenContract.balanceOf( + await consumerAccount.getAddress() + ) + await paymentTokenContract + .connect(consumerAccount) + .approve(initializeResponse.payment.escrowAddress, balance) + await escrowContract + .connect(consumerAccount) + .deposit(initializeResponse.payment.token, balance) + await escrowContract + .connect(consumerAccount) + .authorize( + initializeResponse.payment.token, + firstEnv.consumerAddress, + balance, + computeJobDuration, + 10 + ) const locks = await oceanNode.escrow.getLocks( DEVELOPMENT_CHAIN_ID, paymentToken, diff --git a/src/test/unit/indexer/indexer.test.ts b/src/test/unit/indexer/indexer.test.ts index 885d3f409..865f12dd8 100644 --- a/src/test/unit/indexer/indexer.test.ts +++ b/src/test/unit/indexer/indexer.test.ts @@ -83,4 +83,8 @@ describe('OceanIndexer', () => { await tearDownEnvironment(envOverrides) sandbox.restore() }) + after(async () => { + await tearDownEnvironment(envOverrides) + sandbox.restore() + }) })