diff --git a/src/@types/C2D/C2D.ts b/src/@types/C2D/C2D.ts index 1c890ef47..5ca1f46e4 100644 --- a/src/@types/C2D/C2D.ts +++ b/src/@types/C2D/C2D.ts @@ -129,6 +129,7 @@ export interface C2DDockerConfig { } export type ComputeResultType = + | 'imageLog' | 'algorithmLog' | 'output' | 'configurationLog' diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index 942adc369..dc5620f1a 100644 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -98,6 +98,8 @@ export class C2DEngineDocker extends C2DEngine { // let's build the env. Swarm and k8 will build multiple envs, based on arhitecture const config = await getConfiguration() const envConfig = await this.getC2DConfig().connection + console.log(config) + console.log(envConfig) let sysinfo = null try { sysinfo = await this.docker.info() @@ -473,6 +475,34 @@ export class C2DEngineDocker extends C2DEngine { protected async getResults(jobId: string): Promise { const res: ComputeResult[] = [] let index = 0 + try { + const logStat = statSync( + this.getC2DConfig().tempFolder + '/' + jobId + '/data/logs/image.log' + ) + if (logStat) { + res.push({ + filename: 'image.log', + filesize: logStat.size, + type: 'imageLog', + index + }) + index = index + 1 + } + } catch (e) {} + try { + const logStat = statSync( + this.getC2DConfig().tempFolder + '/' + jobId + '/data/logs/configuration.log' + ) + if (logStat) { + res.push({ + filename: 'configuration.log', + filesize: logStat.size, + type: 'configurationLog', + index + }) + index = index + 1 + } + } catch (e) {} try { const logStat = statSync( this.getC2DConfig().tempFolder + '/' + jobId + '/data/logs/algorithm.log' @@ -501,6 +531,20 @@ export class C2DEngineDocker extends C2DEngine { index = index + 1 } } catch (e) {} + try { + const logStat = statSync( + this.getC2DConfig().tempFolder + '/' + jobId + '/data/logs/publish.log' + ) + if (logStat) { + res.push({ + filename: 'publish.log', + filesize: logStat.size, + type: 'publishLog', + index + }) + index = index + 1 + } + } catch (e) {} return res } @@ -558,6 +602,39 @@ export class C2DEngineDocker extends C2DEngine { } } } + if (i.type === 'configurationLog') { + return { + stream: createReadStream( + this.getC2DConfig().tempFolder + + '/' + + jobId + + '/data/logs/configuration.log' + ), + headers: { + 'Content-Type': 'text/plain' + } + } + } + if (i.type === 'publishLog') { + return { + stream: createReadStream( + this.getC2DConfig().tempFolder + '/' + jobId + '/data/logs/publish.log' + ), + headers: { + 'Content-Type': 'text/plain' + } + } + } + if (i.type === 'imageLog') { + return { + stream: createReadStream( + this.getC2DConfig().tempFolder + '/' + jobId + '/data/logs/image.log' + ), + headers: { + 'Content-Type': 'text/plain' + } + } + } if (i.type === 'output') { return { stream: createReadStream( @@ -1161,25 +1238,15 @@ export class C2DEngineDocker extends C2DEngine { status: C2DStatusNumber.RunningAlgorithm, statusText: C2DStatusText.RunningAlgorithm } - // for testing purposes - // if (!job.algorithm.fileObject) { - // console.log('no file object') - // const file: UrlFileObject = { - // type: 'url', - // url: 'https://raw.githubusercontent.com/oceanprotocol/test-algorithm/master/javascript/algo.js', - // method: 'get' - // } - // job.algorithm.fileObject = file - // } - // download algo - // TODO: we currently DO NOT have a way to set this field unencrypted (once we publish the asset its encrypted) - // So we cannot test this from the CLI for instance... Only Option is to actually send it encrypted - // OR extract the files object from the passed DDO, decrypt it and use it + const jobFolderPath = this.getC2DConfig().tempFolder + '/' + job.jobId + const fullAlgoPath = jobFolderPath + '/data/transformations/algorithm' + const configLogPath = jobFolderPath + '/data/logs/configuration.log' - // console.log(job.algorithm.fileObject) - const fullAlgoPath = - this.getC2DConfig().tempFolder + '/' + job.jobId + '/data/transformations/algorithm' try { + writeFileSync( + configLogPath, + "Writing algocustom data to '/data/inputs/algoCustomData.json'\n" + ) const customdataPath = this.getC2DConfig().tempFolder + '/' + @@ -1191,6 +1258,7 @@ export class C2DEngineDocker extends C2DEngine { if (job.algorithm.meta.rawcode && job.algorithm.meta.rawcode.length > 0) { // we have the code, just write it + writeFileSync(configLogPath, `Writing raw algo code to ${fullAlgoPath}\n`) writeFileSync(fullAlgoPath, job.algorithm.meta.rawcode) } else { // do we have a files object? @@ -1198,17 +1266,41 @@ export class C2DEngineDocker extends C2DEngine { // is it unencrypted? if (job.algorithm.fileObject.type) { // we can get the storage directly - storage = Storage.getStorageClass(job.algorithm.fileObject, config) + try { + storage = Storage.getStorageClass(job.algorithm.fileObject, config) + } catch (e) { + CORE_LOGGER.error(`Unable to get storage class for algorithm: ${e.message}`) + writeFileSync( + configLogPath, + `Unable to get storage class for algorithm: ${e.message}\n` + ) + return { + status: C2DStatusNumber.AlgorithmProvisioningFailed, + statusText: C2DStatusText.AlgorithmProvisioningFailed + } + } } else { // ok, maybe we have this encrypted instead CORE_LOGGER.info( 'algorithm file object seems to be encrypted, checking it...' ) // 1. Decrypt the files object - const decryptedFileObject = await decryptFilesObject(job.algorithm.fileObject) - console.log('decryptedFileObject: ', decryptedFileObject) - // 2. Get default storage settings - storage = Storage.getStorageClass(decryptedFileObject, config) + try { + const decryptedFileObject = await decryptFilesObject( + job.algorithm.fileObject + ) + storage = Storage.getStorageClass(decryptedFileObject, config) + } catch (e) { + CORE_LOGGER.error(`Unable to decrypt algorithm files object: ${e.message}`) + writeFileSync( + configLogPath, + `Unable to decrypt algorithm files object: ${e.message}\n` + ) + return { + status: C2DStatusNumber.AlgorithmProvisioningFailed, + statusText: C2DStatusText.AlgorithmProvisioningFailed + } + } } } else { // no files object, try to get information from documentId and serviceId @@ -1216,25 +1308,49 @@ export class C2DEngineDocker extends C2DEngine { 'algorithm file object seems to be missing, checking "serviceId" and "documentId"...' ) const { serviceId, documentId } = job.algorithm + writeFileSync( + configLogPath, + `Using ${documentId} and serviceId ${serviceId} to get algorithm files.\n` + ) // we can get it from this info if (serviceId && documentId) { const algoDdo = await new FindDdoHandler( OceanNode.getInstance() ).findAndFormatDdo(documentId) - console.log('algo ddo:', algoDdo) // 1. Get the service const service: Service = AssetUtils.getServiceById(algoDdo, serviceId) - - // 2. Decrypt the files object - const decryptedFileObject = await decryptFilesObject(service.files) - console.log('decryptedFileObject: ', decryptedFileObject) - // 4. Get default storage settings - storage = Storage.getStorageClass(decryptedFileObject, config) + if (!service) { + CORE_LOGGER.error( + `Could not find service with ID ${serviceId} in DDO ${documentId}` + ) + writeFileSync( + configLogPath, + `Could not find service with ID ${serviceId} in DDO ${documentId}\n` + ) + return { + status: C2DStatusNumber.AlgorithmProvisioningFailed, + statusText: C2DStatusText.AlgorithmProvisioningFailed + } + } + try { + // 2. Decrypt the files object + const decryptedFileObject = await decryptFilesObject(service.files) + storage = Storage.getStorageClass(decryptedFileObject, config) + } catch (e) { + CORE_LOGGER.error(`Unable to decrypt algorithm files object: ${e.message}`) + writeFileSync( + configLogPath, + `Unable to decrypt algorithm files object: ${e.message}\n` + ) + return { + status: C2DStatusNumber.AlgorithmProvisioningFailed, + statusText: C2DStatusText.AlgorithmProvisioningFailed + } + } } } if (storage) { - console.log('fullAlgoPath', fullAlgoPath) await pipeline( (await storage.getReadableStream()).stream, createWriteStream(fullAlgoPath) @@ -1243,12 +1359,20 @@ export class C2DEngineDocker extends C2DEngine { CORE_LOGGER.info( 'Could not extract any files object from the compute algorithm, skipping...' ) + writeFileSync( + configLogPath, + 'Could not extract any files object from the compute algorithm, skipping...\n' + ) } } } catch (e) { CORE_LOGGER.error( 'Unable to write algorithm to path: ' + fullAlgoPath + ': ' + e.message ) + writeFileSync( + configLogPath, + 'Unable to write algorithm to path: ' + fullAlgoPath + ': ' + e.message + '\n' + ) return { status: C2DStatusNumber.AlgorithmProvisioningFailed, statusText: C2DStatusText.AlgorithmProvisioningFailed @@ -1260,53 +1384,73 @@ export class C2DEngineDocker extends C2DEngine { const asset = job.assets[i] let storage = null let fileInfo = null - console.log('checking now asset: ', asset) + console.log('checking now asset: ', i) + writeFileSync(configLogPath, `Downloading asset ${i} to /data/inputs/\n`) // without this check it would break if no fileObject is present if (asset.fileObject) { - if (asset.fileObject.type) { - storage = Storage.getStorageClass(asset.fileObject, config) - } else { - CORE_LOGGER.info('asset file object seems to be encrypted, checking it...') - // get the encrypted bytes - const filesObject: any = await decryptFilesObject(asset.fileObject) - storage = Storage.getStorageClass(filesObject, config) - } + try { + if (asset.fileObject.type) { + storage = Storage.getStorageClass(asset.fileObject, config) + } else { + CORE_LOGGER.info('asset file object seems to be encrypted, checking it...') + // get the encrypted bytes + const filesObject: any = await decryptFilesObject(asset.fileObject) + storage = Storage.getStorageClass(filesObject, config) + } - // we need the file info for the name (but could be something else here) - fileInfo = await storage.getFileInfo({ - type: storage.getStorageType(asset.fileObject) - }) + // we need the file info for the name (but could be something else here) + fileInfo = await storage.getFileInfo({ + type: storage.getStorageType(asset.fileObject) + }) + } catch (e) { + CORE_LOGGER.error(`Unable to get storage class for asset: ${e.message}`) + writeFileSync( + configLogPath, + `Unable to get storage class for asset: ${e.message}\n` + ) + return { + status: C2DStatusNumber.DataProvisioningFailed, + statusText: C2DStatusText.DataProvisioningFailed + } + } } else { // we need to go the hard way const { serviceId, documentId } = asset + writeFileSync( + configLogPath, + `Using ${documentId} and serviceId ${serviceId} for this asset.\n` + ) if (serviceId && documentId) { // need to get the file - const ddo = await new FindDdoHandler(OceanNode.getInstance()).findAndFormatDdo( - documentId - ) - - // 2. Get the service - const service: Service = AssetUtils.getServiceById(ddo, serviceId) - // 3. Decrypt the url - const decryptedFileObject = await decryptFilesObject(service.files) - console.log('decryptedFileObject: ', decryptedFileObject) - storage = Storage.getStorageClass(decryptedFileObject, config) - - fileInfo = await storage.getFileInfo({ - type: storage.getStorageType(decryptedFileObject) - }) + try { + const ddo = await new FindDdoHandler( + OceanNode.getInstance() + ).findAndFormatDdo(documentId) + // 2. Get the service + const service: Service = AssetUtils.getServiceById(ddo, serviceId) + // 3. Decrypt the url + const decryptedFileObject = await decryptFilesObject(service.files) + storage = Storage.getStorageClass(decryptedFileObject, config) + fileInfo = await storage.getFileInfo({ + type: storage.getStorageType(decryptedFileObject) + }) + } catch (e) { + CORE_LOGGER.error(`Unable to get storage class for asset: ${e.message}`) + writeFileSync( + configLogPath, + `Unable to get storage class for asset: ${e.message}\n` + ) + return { + status: C2DStatusNumber.DataProvisioningFailed, + statusText: C2DStatusText.DataProvisioningFailed + } + } } } if (storage && fileInfo) { - const fullPath = - this.getC2DConfig().tempFolder + - '/' + - job.jobId + - '/data/inputs/' + - fileInfo[0].name - - console.log('asset full path: ' + fullPath) + const fullPath = jobFolderPath + '/data/inputs/' + fileInfo[0].name + writeFileSync(configLogPath, `Downloading asset to ${fullPath}\n`) try { await pipeline( (await storage.getReadableStream()).stream, @@ -1316,6 +1460,10 @@ export class C2DEngineDocker extends C2DEngine { CORE_LOGGER.error( 'Unable to write input data to path: ' + fullPath + ': ' + e.message ) + writeFileSync( + configLogPath, + 'Unable to write input data to path: ' + fullPath + ': ' + e.message + '\n' + ) return { status: C2DStatusNumber.DataProvisioningFailed, statusText: C2DStatusText.DataProvisioningFailed @@ -1325,13 +1473,20 @@ export class C2DEngineDocker extends C2DEngine { CORE_LOGGER.info( 'Could not extract any files object from the compute asset, skipping...' ) + writeFileSync( + configLogPath, + 'Could not extract any files object from the compute asset, skipping...\n' + ) } } CORE_LOGGER.info('All good with data provisioning, will start uploading it...') + writeFileSync( + configLogPath, + 'All good with data provisioning, will start uploading it...\n' + ) // now, we have to create a tar arhive - const folderToTar = this.getC2DConfig().tempFolder + '/' + job.jobId + '/data' - const destination = - this.getC2DConfig().tempFolder + '/' + job.jobId + '/tarData/upload.tar.gz' + const folderToTar = jobFolderPath + '/data' + const destination = jobFolderPath + '/tarData/upload.tar.gz' try { tar.create( { @@ -1343,7 +1498,6 @@ export class C2DEngineDocker extends C2DEngine { ['./'] ) // check if tar.gz actually exists - console.log('Start uploading') if (existsSync(destination)) { // now, upload it to the container @@ -1359,8 +1513,10 @@ export class C2DEngineDocker extends C2DEngine { console.log('Done uploading') } catch (e) { - console.log('Data upload failed') - console.log(e) + writeFileSync( + configLogPath, + 'Data upload to container failed: ' + e.message + '\n' + ) return { status: C2DStatusNumber.DataUploadFailed, statusText: C2DStatusText.DataUploadFailed @@ -1368,20 +1524,26 @@ export class C2DEngineDocker extends C2DEngine { } } else { CORE_LOGGER.debug('No data to upload, empty tar.gz') + writeFileSync(configLogPath, `No data to upload, empty tar.gz\n`) } } catch (e) { CORE_LOGGER.debug(e.message) + writeFileSync(configLogPath, `Error creating data archive: ${e.message}\n`) + return { + status: C2DStatusNumber.DataProvisioningFailed, + statusText: C2DStatusText.DataProvisioningFailed + } } - rmSync(this.getC2DConfig().tempFolder + '/' + job.jobId + '/data/inputs', { + rmSync(jobFolderPath + '/data/inputs', { recursive: true, force: true }) - rmSync(this.getC2DConfig().tempFolder + '/' + job.jobId + '/data/transformations', { + rmSync(jobFolderPath + '/data/transformations', { recursive: true, force: true }) - rmSync(this.getC2DConfig().tempFolder + '/' + job.jobId + '/tarData', { + rmSync(jobFolderPath + '/tarData', { recursive: true, force: true }) diff --git a/src/components/storage/index.ts b/src/components/storage/index.ts index d214bbffb..4ffb0a623 100644 --- a/src/components/storage/index.ts +++ b/src/components/storage/index.ts @@ -65,21 +65,27 @@ export abstract class Storage { static getStorageClass( file: any, config: OceanNodeConfig - ): UrlStorage | IpfsStorage | ArweaveStorage | S3Storage { - const { type } = file - switch ( - type?.toLowerCase() // case insensitive - ) { - case FileObjectType.URL: - return new UrlStorage(file, config) - case FileObjectType.IPFS: - return new IpfsStorage(file, config) - case FileObjectType.ARWEAVE: - return new ArweaveStorage(file, config) - case FileObjectType.S3: - return new S3Storage(file, config) - default: - throw new Error(`Invalid storage type: ${type}`) + ): UrlStorage | IpfsStorage | ArweaveStorage { + if (!file) { + throw new Error('Empty file object') + } + try { + const { type } = file + switch ( + type?.toLowerCase() // case insensitive + ) { + case FileObjectType.URL: + return new UrlStorage(file, config) + case FileObjectType.IPFS: + return new IpfsStorage(file, config) + case FileObjectType.ARWEAVE: + return new ArweaveStorage(file, config) + default: + throw new Error(`Invalid storage type: ${type}`) + } + } catch (err) { + console.error('Error in getStorageClass: ', err) + throw err } }