Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13,003 changes: 13,001 additions & 2 deletions package-lock.json

Large diffs are not rendered by default.

6 changes: 1 addition & 5 deletions src/components/Auth/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@ import jwt from 'jsonwebtoken'
import { checkNonce, NonceResponse } from '../core/utils/nonceHandler.js'
import { OceanNode } from '../../OceanNode.js'
import { getConfiguration } from '../../utils/index.js'

export interface CommonValidation {
valid: boolean
error: string
}
import { CommonValidation } from '../../utils/validators.js'

export interface AuthValidation {
token?: string
Expand Down
48 changes: 26 additions & 22 deletions src/components/P2P/handleProtocolCommands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,10 @@ import { GENERIC_EMOJIS, LOG_LEVELS_STR } from '../../utils/logging/Logger.js'
import StreamConcat from 'stream-concat'
import { BaseHandler } from '../core/handler/handler.js'
import { getConfiguration } from '../../utils/index.js'
import { checkConnectionsRateLimit } from '../httpRoutes/requestValidator.js'
import { CONNECTIONS_RATE_INTERVAL } from '../../utils/constants.js'
import { RequestLimiter } from '../../OceanNode.js'

// hold data about last request made
const connectionsData: RequestLimiter = {
lastRequestTime: Date.now(),
requester: '',
numRequests: 0
}
import {
checkGlobalConnectionsRateLimit,
checkRequestsRateLimit
} from '../../utils/validators.js'

export class ReadableString extends Readable {
private sent = false
Expand Down Expand Up @@ -94,22 +88,32 @@ export async function handleProtocolCommands(otherPeerConnection: any) {
}
}
// check connections rate limit
const requestTime = Date.now()
if (requestTime - connectionsData.lastRequestTime > CONNECTIONS_RATE_INTERVAL) {
// last one was more than 1 minute ago? reset counter
connectionsData.numRequests = 0
const now = Date.now()

const rateLimitCheck = checkRequestsRateLimit(remoteAddr, configuration, now)
if (!rateLimitCheck.valid) {
P2P_LOGGER.warn(
`Incoming request denied to peer: ${remotePeer} (rate limit exceeded)`
)
if (connectionStatus === 'open') {
statusStream = new ReadableString(
JSON.stringify(buildWrongCommandStatus(403, 'Rate limit exceeded'))
)
try {
await pipe(statusStream, otherPeerConnection.stream.sink)
} catch (e) {
P2P_LOGGER.error(e)
}
}
await closeStreamConnection(otherPeerConnection.connection, remotePeer)
return
}
// always increment counter
connectionsData.numRequests += 1
// update time and requester information
connectionsData.lastRequestTime = requestTime
connectionsData.requester = remoteAddr

// check global rate limits (not ip related)
const requestRateValidation = checkConnectionsRateLimit(configuration, connectionsData)
if (!requestRateValidation.valid) {
const connectionsRateValidation = checkGlobalConnectionsRateLimit(configuration, now)
if (!connectionsRateValidation.valid) {
P2P_LOGGER.warn(
`Incoming request denied to peer: ${remotePeer} (rate limit exceeded)`
`Exceeded limit of connections per minute ${configuration.maxConnections}: ${connectionsRateValidation.error}`
)
if (connectionStatus === 'open') {
statusStream = new ReadableString(
Expand Down
85 changes: 73 additions & 12 deletions src/components/c2d/compute_engine_docker.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/* eslint-disable security/detect-non-literal-fs-filename */
import { Readable } from 'stream'
import os from 'os'
import {
C2DStatusNumber,
C2DStatusText,
Expand Down Expand Up @@ -175,7 +176,8 @@ export class C2DEngineDocker extends C2DEngine {
type: 'cpu',
total: sysinfo.NCPU,
max: sysinfo.NCPU,
min: 1
min: 1,
description: os.cpus()[0].model
})
this.envs[0].resources.push({
id: 'ram',
Expand Down Expand Up @@ -878,13 +880,31 @@ export class C2DEngineDocker extends C2DEngine {
}
}
if (job.status === C2DStatusNumber.RunningAlgorithm) {
const container = await this.docker.getContainer(job.jobId + '-algoritm')
const details = await container.inspect()
console.log('Container inspect')
console.log(details)
let container
let details
try {
container = await this.docker.getContainer(job.jobId + '-algoritm')
console.log(`Container retrieved: ${JSON.stringify(container)}`)
details = await container.inspect()
console.log('Container inspect')
console.log(details)
} catch (e) {
console.error(
'Could not retrieve container: ' +
e.message +
'\nBack to configuring volumes to create the container...'
)
job.isStarted = false
job.status = C2DStatusNumber.ConfiguringVolumes
job.statusText = C2DStatusText.ConfiguringVolumes
job.isRunning = false
await this.db.updateJob(job)
return
}

if (job.isStarted === false) {
// make sure is not started
if (details.State.Running === false) {
if (details && details.State.Running === false) {
try {
await container.start()
job.isStarted = true
Expand Down Expand Up @@ -962,14 +982,35 @@ export class C2DEngineDocker extends C2DEngine {
// get output
job.status = C2DStatusNumber.JobFinished
job.statusText = C2DStatusText.JobFinished
const container = await this.docker.getContainer(job.jobId + '-algoritm')
let container
try {
container = await this.docker.getContainer(job.jobId + '-algoritm')
console.log(`Container retrieved: ${JSON.stringify(container)}`)
} catch (e) {
console.error('Could not retrieve container: ' + e.message)
job.isRunning = false
job.dateFinished = String(Date.now() / 1000)
try {
const algoLogFile =
this.getC2DConfig().tempFolder + '/' + job.jobId + '/data/logs/algorithm.log'
writeFileSync(algoLogFile, String(e.message))
} catch (e) {
console.log('Failed to write')
console.log(e)
}
await this.db.updateJob(job)
await this.cleanupJob(job)
return
}
const outputsArchivePath =
this.getC2DConfig().tempFolder + '/' + job.jobId + '/data/outputs/outputs.tar'
try {
await pipeline(
await container.getArchive({ path: '/data/outputs' }),
createWriteStream(outputsArchivePath)
)
if (container) {
await pipeline(
await container.getArchive({ path: '/data/outputs' }),
createWriteStream(outputsArchivePath)
)
}
} catch (e) {
console.log(e)
job.status = C2DStatusNumber.ResultsUploadFailed
Expand Down Expand Up @@ -1051,6 +1092,10 @@ export class C2DEngineDocker extends C2DEngine {
}
await container.remove()
}
} catch (e) {
console.error('Container not found! ' + e.message)
}
try {
const volume = await this.docker.getVolume(job.jobId + '-volume')
if (volume) {
try {
Expand All @@ -1059,17 +1104,33 @@ export class C2DEngineDocker extends C2DEngine {
console.log(e)
}
}
} catch (e) {
console.error('Container volume not found! ' + e.message)
}
try {
// remove folders
rmSync(this.getC2DConfig().tempFolder + '/' + job.jobId + '/data/inputs', {
recursive: true,
force: true
})
} catch (e) {
console.error(
`Could not delete inputs from path ${this.getC2DConfig().tempFolder} for job ID ${
job.jobId
}! ` + e.message
)
}
try {
rmSync(this.getC2DConfig().tempFolder + '/' + job.jobId + '/data/transformations', {
recursive: true,
force: true
})
} catch (e) {
console.log(e)
console.error(
`Could not delete algorithms from path ${
this.getC2DConfig().tempFolder
} for job ID ${job.jobId}! ` + e.message
)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/components/core/admin/adminHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import {
} from '../../httpRoutes/validateCommands.js'
import { validateAdminSignature } from '../../../utils/auth.js'
import { BaseHandler } from '../handler/handler.js'
import { CommonValidation } from '../../httpRoutes/requestValidator.js'
import { P2PCommandResponse } from '../../../@types/OceanNode.js'
import { ReadableString } from '../../P2P/handleProtocolCommands.js'
import { CommonValidation } from '../../../utils/validators.js'

export abstract class AdminCommandHandler
extends BaseHandler
Expand Down
21 changes: 15 additions & 6 deletions src/components/database/C2DDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ export class C2DDatabase extends AbstractDatabase {
return await this.provider.getRunningJobs(engine, environment)
}

async getAllFinishedJobs(): Promise<DBComputeJob[]> {
return await this.provider.getAllFinishedJobs()
}

async deleteJob(jobId: string): Promise<boolean> {
return await this.provider.deleteJob(jobId)
}
Expand Down Expand Up @@ -117,18 +121,23 @@ export class C2DDatabase extends AbstractDatabase {
*/
async cleanOrphanJobs(existingEnvironments: ComputeEnvironment[]) {
const c2dDatabase = await (await getDatabase()).c2d
const finishedOrExpired: DBComputeJob[] = await this.provider.getFinishedJobs()
const envIds: string[] = existingEnvironments.map((env) => {
return env.id
})
let cleaned = 0
for (const job of finishedOrExpired) {
if (job.environment && !envIds.includes(job.environment)) {

const envIds: string[] = existingEnvironments
.filter((env: any) => env && typeof env.id === 'string')
.map((env: any) => env.id)

// Get all finished jobs from DB, not just from known environments
const allJobs: DBComputeJob[] = await c2dDatabase.getAllFinishedJobs()

for (const job of allJobs) {
if (!job.environment || !envIds.includes(job.environment)) {
if (await c2dDatabase.deleteJob(job.jobId)) {
cleaned++
}
}
}

DATABASE_LOGGER.info('Cleaned ' + cleaned + ' orphan C2D jobs')
return cleaned
}
Expand Down
30 changes: 30 additions & 0 deletions src/components/database/sqliteCompute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@
jobId = generateUniqueID(jobStructure)
job.jobId = jobId
} else {
jobId = job.jobId

Check warning on line 165 in src/components/database/sqliteCompute.ts

View workflow job for this annotation

GitHub Actions / lint

Use object destructuring
}

return new Promise<string>((resolve, reject) => {
Expand Down Expand Up @@ -336,6 +336,36 @@
})
}

getAllFinishedJobs(): Promise<DBComputeJob[]> {
const selectSQL = `
SELECT * FROM ${this.schema.name} WHERE dateFinished IS NOT NULL OR results IS NOT NULL
`

return new Promise<DBComputeJob[]>((resolve, reject) => {
this.db.all(selectSQL, (err, rows: any[] | undefined) => {
if (err) {
DATABASE_LOGGER.error(err.message)
reject(err)
} else {
if (rows && rows.length > 0) {
const all: DBComputeJob[] = rows.map((row) => {
const body = generateJSONFromBlob(row.body)
delete row.body
const maxJobDuration = row.expireTimestamp
delete row.expireTimestamp
const job: DBComputeJob = { ...row, ...body, maxJobDuration }
return job
})
resolve(all)
} else {
DATABASE_LOGGER.info('Could not find any running C2D jobs!')
resolve([])
}
}
})
})
}

getFinishedJobs(environment?: ComputeEnvironment): Promise<DBComputeJob[]> {
// get jobs that already finished (have results), for this environment, and clear storage + job if expired
const selectSQL = `
Expand Down
2 changes: 1 addition & 1 deletion src/components/httpRoutes/logs.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import express from 'express'
import { validateAdminSignature } from '../../utils/auth.js'
import { HTTP_LOGGER } from '../../utils/logging/common.js'
import { CommonValidation } from './requestValidator.js'
import { CommonValidation } from '../../utils/validators.js'

export const logRoutes = express.Router()

Expand Down
Loading
Loading