Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions docs/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -1400,19 +1400,20 @@ starts a free compute job and returns jobId if succesfull

#### Parameters

| name | type | required | description |
| ----------------- | ------ | -------- | ---------------------------------------------------------------- |
| command | string | v | command name |
| node | string | | if not present it means current node |
| consumerAddress | string | v | consumer address |
| signature | string | v | signature (msg=String(nonce) ) |
| nonce | string | v | nonce for the request |
| datasets | object | | list of ComputeAsset to be used as inputs |
| algorithm | object | | ComputeAlgorithm definition |
| environment | string | v | compute environment to use |
| resources | object | | optional list of required resources |
| metadata | object | | optional metadata for the job, data provided by the user |
| additionalViewers | object | | optional array of addresses that are allowed to fetch the result |
| name | type | required | description |
| ----------------- | ------ | -------- | ----------------------------------------------------------------------------- |
| command | string | v | command name |
| node | string | | if not present it means current node |
| consumerAddress | string | v | consumer address |
| signature | string | v | signature (msg=String(nonce) ) |
| nonce | string | v | nonce for the request |
| datasets | object | | list of ComputeAsset to be used as inputs |
| algorithm | object | | ComputeAlgorithm definition |
| environment | string | v | compute environment to use |
| resources | object | | optional list of required resources |
| metadata | object | | optional metadata for the job, data provided by the user |
| additionalViewers | object | | optional array of addresses that are allowed to fetch the result |
| queueMaxWaitTime | number | | optional max time in seconds a job can wait in the queue before being started |

#### Request

Expand Down
13 changes: 13 additions & 0 deletions src/@types/C2D/C2D.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ export interface ComputeEnvironment extends ComputeEnvironmentBaseConfig {
runningJobs: number
runningfreeJobs?: number
consumerAddress: string // v1
queuedJobs: number
queuedFreeJobs: number
queMaxWaitTime: number
queMaxWaitTimeFree: number
}

export interface C2DDockerConfig {
Expand Down Expand Up @@ -174,6 +178,7 @@ export interface ComputeJob {
environment?: string
metadata?: DBComputeJobMetadata
terminationDetails?: ComputeJobTerminationDetails
queueMaxWaitTime: number // max time in seconds a job can wait in the queue before being started
}

export interface ComputeOutput {
Expand Down Expand Up @@ -260,6 +265,10 @@ export enum C2DStatusNumber {
// eslint-disable-next-line no-unused-vars
JobStarted = 0,
// eslint-disable-next-line no-unused-vars
JobQueued = 1,
// eslint-disable-next-line no-unused-vars
JobQueuedExpired = 2,
// eslint-disable-next-line no-unused-vars
PullImage = 10,
// eslint-disable-next-line no-unused-vars
PullImageFailed = 11,
Expand Down Expand Up @@ -302,6 +311,10 @@ export enum C2DStatusText {
// eslint-disable-next-line no-unused-vars
JobStarted = 'Job started',
// eslint-disable-next-line no-unused-vars
JobQueued = 'Job queued',
// eslint-disable-next-line no-unused-vars
JobQueuedExpired = 'Job expired in queue',
// eslint-disable-next-line no-unused-vars
PullImage = 'Pulling algorithm image',
// eslint-disable-next-line no-unused-vars
PullImageFailed = 'Pulling algorithm image failed',
Expand Down
6 changes: 4 additions & 2 deletions src/@types/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,17 +185,18 @@ export interface ComputeGetEnvironmentsCommand extends Command {
export interface ComputePayment {
chainId: number
token: string
resources?: ComputeResourceRequest[]
resources?: ComputeResourceRequest[] // only used in initializeCompute
}
export interface ComputeInitializeCommand extends Command {
datasets: [ComputeAsset]
datasets: ComputeAsset[]
algorithm: ComputeAlgorithm
environment: string
payment: ComputePayment
consumerAddress: string
signature?: string
maxJobDuration: number
policyServer?: any // object to pass to policy server
queueMaxWaitTime?: number // max time in seconds a job can wait in the queue before being started
}

export interface FreeComputeStartCommand extends Command {
Expand All @@ -211,6 +212,7 @@ export interface FreeComputeStartCommand extends Command {
policyServer?: any // object to pass to policy server
metadata?: DBComputeJobMetadata
additionalViewers?: string[] // addresses of additional addresses that can get results
queueMaxWaitTime?: number // max time in seconds a job can wait in the queue before being started
}
export interface PaidComputeStartCommand extends FreeComputeStartCommand {
payment: ComputePayment
Expand Down
44 changes: 34 additions & 10 deletions src/components/c2d/compute_engine_base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ export abstract class C2DEngine {
payment: DBComputeJobPayment,
jobId: string,
metadata?: DBComputeJobMetadata,
additionalViewers?: string[]
additionalViewers?: string[],
queueMaxWaitTime?: number
): Promise<ComputeJob[]>

public abstract stopComputeJob(
Expand Down Expand Up @@ -233,22 +234,45 @@ export abstract class C2DEngine {
const jobs = await this.db.getRunningJobs(this.getC2DConfig().hash)
let totalJobs = 0
let totalFreeJobs = 0
let queuedJobs = 0
let queuedFreeJobs = 0
let maxWaitTime = 0
let maxWaitTimeFree = 0
for (const job of jobs) {
if (job.environment === env.id) {
totalJobs++
if (job.isFree) totalFreeJobs++

for (const resource of job.resources) {
if (!(resource.id in usedResources)) usedResources[resource.id] = 0
usedResources[resource.id] += resource.amount
if (job.queueMaxWaitTime === 0) {
totalJobs++
if (job.isFree) totalFreeJobs++

for (const resource of job.resources) {
if (!(resource.id in usedResources)) usedResources[resource.id] = 0
usedResources[resource.id] += resource.amount
if (job.isFree) {
if (!(resource.id in usedFreeResources)) usedFreeResources[resource.id] = 0
usedFreeResources[resource.id] += resource.amount
}
}
} else {
// queued job
queuedJobs++
maxWaitTime += job.maxJobDuration
if (job.isFree) {
if (!(resource.id in usedFreeResources)) usedFreeResources[resource.id] = 0
usedFreeResources[resource.id] += resource.amount
queuedFreeJobs++
maxWaitTimeFree += job.maxJobDuration
}
}
}
}
return { totalJobs, totalFreeJobs, usedResources, usedFreeResources }
return {
totalJobs,
totalFreeJobs,
usedResources,
usedFreeResources,
queuedJobs,
queuedFreeJobs,
maxWaitTime,
maxWaitTimeFree
}
}

// overridden by each engine if required
Expand Down
106 changes: 88 additions & 18 deletions src/components/c2d/compute_engine_docker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,11 @@ export class C2DEngineDocker extends C2DEngine {
addresses: [],
accessLists: []
},
fees
fees,
queuedJobs: 0,
queuedFreeJobs: 0,
queMaxWaitTime: 0,
queMaxWaitTimeFree: 0
})
if (`access` in envConfig) this.envs[0].access = envConfig.access

Expand Down Expand Up @@ -287,10 +291,22 @@ export class C2DEngineDocker extends C2DEngine {
// if (systemInfo.Runtimes) computeEnv.runtimes = systemInfo.Runtimes
// if (systemInfo.DefaultRuntime)
// computeEnv.defaultRuntime = systemInfo.DefaultRuntime
const { totalJobs, totalFreeJobs, usedResources, usedFreeResources } =
await this.getUsedResources(computeEnv)
const {
totalJobs,
totalFreeJobs,
usedResources,
usedFreeResources,
queuedJobs,
queuedFreeJobs,
maxWaitTime,
maxWaitTimeFree
} = await this.getUsedResources(computeEnv)
computeEnv.runningJobs = totalJobs
computeEnv.runningfreeJobs = totalFreeJobs
computeEnv.queuedJobs = queuedJobs
computeEnv.queuedFreeJobs = queuedFreeJobs
computeEnv.queMaxWaitTime = maxWaitTime
computeEnv.queMaxWaitTimeFree = maxWaitTimeFree
for (let i = 0; i < computeEnv.resources.length; i++) {
if (computeEnv.resources[i].id in usedResources)
computeEnv.resources[i].inUse = usedResources[computeEnv.resources[i].id]
Expand Down Expand Up @@ -365,7 +381,8 @@ export class C2DEngineDocker extends C2DEngine {
payment: DBComputeJobPayment,
jobId: string,
metadata?: DBComputeJobMetadata,
additionalViewers?: string[]
additionalViewers?: string[],
queueMaxWaitTime?: number
): Promise<ComputeJob[]> {
if (!this.docker) return []
// TO DO - iterate over resources and get default runtime
Expand Down Expand Up @@ -409,6 +426,9 @@ export class C2DEngineDocker extends C2DEngine {
)
// make sure that we don't keep them in the db structure
algorithm.meta.container.additionalDockerFiles = null
if (queueMaxWaitTime && queueMaxWaitTime > 0) {
throw new Error(`additionalDockerFiles cannot be used with queued jobs`)
}
}
const job: DBComputeJob = {
clusterHash: this.getC2DConfig().hash,
Expand All @@ -417,8 +437,14 @@ export class C2DEngineDocker extends C2DEngine {
jobId,
dateCreated: String(Date.now() / 1000),
dateFinished: null,
status: C2DStatusNumber.JobStarted,
statusText: C2DStatusText.JobStarted,
status:
queueMaxWaitTime && queueMaxWaitTime > 0
? C2DStatusNumber.JobQueued
: C2DStatusNumber.JobStarted,
statusText:
queueMaxWaitTime && queueMaxWaitTime > 0
? C2DStatusText.JobQueued
: C2DStatusText.JobStarted,
results: [],
algorithm,
assets,
Expand All @@ -439,13 +465,16 @@ export class C2DEngineDocker extends C2DEngine {
metadata,
additionalViewers,
terminationDetails: { exitCode: null, OOMKilled: null },
algoDuration: 0
algoDuration: 0,
queueMaxWaitTime: queueMaxWaitTime || 0
}

if (algorithm.meta.container && algorithm.meta.container.dockerfile) {
// we need to build the image
job.status = C2DStatusNumber.BuildImage
job.statusText = C2DStatusText.BuildImage
// we need to build the image if job is not queued
if (queueMaxWaitTime === 0) {
job.status = C2DStatusNumber.BuildImage
job.statusText = C2DStatusText.BuildImage
}
} else {
// already built, we need to validate it
const validation = await C2DEngineDocker.checkDockerImage(image, env.platform)
Expand All @@ -454,8 +483,10 @@ export class C2DEngineDocker extends C2DEngine {
throw new Error(
`Cannot find image ${image} for ${env.platform.architecture}. Maybe it does not exist or it's build for other arhitectures.`
)
job.status = C2DStatusNumber.PullImage
job.statusText = C2DStatusText.PullImage
if (queueMaxWaitTime === 0) {
job.status = C2DStatusNumber.PullImage
job.statusText = C2DStatusText.PullImage
}
}

await this.makeJobFolders(job)
Expand All @@ -464,10 +495,12 @@ export class C2DEngineDocker extends C2DEngine {
if (!addedId) {
return []
}
if (algorithm.meta.container && algorithm.meta.container.dockerfile) {
this.buildImage(job, additionalDockerFiles)
} else {
this.pullImage(job)
if (queueMaxWaitTime === 0) {
if (algorithm.meta.container && algorithm.meta.container.dockerfile) {
this.buildImage(job, additionalDockerFiles)
} else {
this.pullImage(job)
}
}
// only now set the timer
if (!this.cronTimer) {
Expand Down Expand Up @@ -611,8 +644,7 @@ export class C2DEngineDocker extends C2DEngine {
}
if (
jobs[0].owner !== consumerAddress &&
jobs[0].additionalViewers &&
!jobs[0].additionalViewers.includes(consumerAddress)
(!jobs[0].additionalViewers || !jobs[0].additionalViewers.includes(consumerAddress))
) {
// consumerAddress is not the owner and not in additionalViewers
throw new Error(
Expand Down Expand Up @@ -815,6 +847,44 @@ export class C2DEngineDocker extends C2DEngine {
- delete the container
- delete the volume
*/
if (job.status === C2DStatusNumber.JobQueued) {
// check if we can start the job now
const now = String(Date.now() / 1000)
if (job.queueMaxWaitTime < parseFloat(now) - parseFloat(job.dateCreated)) {
job.status = C2DStatusNumber.JobQueuedExpired
job.statusText = C2DStatusText.JobQueuedExpired
job.isRunning = false
job.dateFinished = now
await this.db.updateJob(job)
await this.cleanupJob(job)
return
}
// check if resources are available now
try {
const env = await this.getComputeEnvironment(
job.payment && job.payment.chainId ? job.payment.chainId : null,
job.environment,
null
)
await this.checkIfResourcesAreAvailable(job.resources, env, true)
} catch (err) {
// resources are still not available
return
}
// resources are now available, let's start the job
const { algorithm } = job
if (algorithm.meta.container && algorithm.meta.container.dockerfile) {
job.status = C2DStatusNumber.BuildImage
job.statusText = C2DStatusText.BuildImage
this.buildImage(job, null)
} else {
job.status = C2DStatusNumber.PullImage
job.statusText = C2DStatusText.PullImage
this.pullImage(job)
}
await this.db.updateJob(job)
}

if (job.status === C2DStatusNumber.ConfiguringVolumes) {
// create the volume & create container
// TO DO C2D: Choose driver & size
Expand Down
7 changes: 6 additions & 1 deletion src/components/core/compute/initialize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ export class ComputeInitializeHandler extends CommandHandler {
if (this.shouldDenyTaskHandling(validationResponse)) {
return validationResponse
}
if (!task.queueMaxWaitTime) {
task.queueMaxWaitTime = 0
}
let engine
let env
let resourcesNeeded
Expand Down Expand Up @@ -193,7 +196,9 @@ export class ComputeInitializeHandler extends CommandHandler {
escrowAddress,
payee: env.consumerAddress,
chainId: task.payment.chainId,
minLockSeconds: engine.escrow.getMinLockTime(task.maxJobDuration),
minLockSeconds: engine.escrow.getMinLockTime(
task.maxJobDuration + task.queueMaxWaitTime
),
token: task.payment.token,
amount: await engine.escrow.getPaymentAmountInWei(
cost,
Expand Down
Loading
Loading