Skip to content
Open
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions app/controllers/JobController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,8 @@ class JobController @Inject()(jobDAO: JobDAO,
def list: Action[AnyContent] = sil.SecuredAction.async { implicit request =>
for {
_ <- Fox.fromBool(wkconf.Features.jobsEnabled) ?~> "job.disabled"
jobs <- jobDAO.findAll
jobsJsonList <- Fox.serialCombined(jobs.sortBy(_.created).reverse)(jobService.publicWrites)
} yield Ok(Json.toJson(jobsJsonList))
jobsCompact <- jobDAO.findAllCompact
} yield Ok(Json.toJson(jobsCompact.map(_.enrich)))
}

def get(id: ObjectId): Action[AnyContent] = sil.SecuredAction.async { implicit request =>
Expand Down
161 changes: 97 additions & 64 deletions app/models/job/Job.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import com.scalableminds.util.tools.{Fox, JsonHelper}
import com.scalableminds.webknossos.schema.Tables._
import models.job.JobState.JobState
import models.job.JobCommand.JobCommand
import play.api.libs.json.{JsObject, Json}
import play.api.libs.json.{JsObject, Json, OFormat}
import slick.jdbc.PostgresProfile.api._
import slick.jdbc.TransactionIsolation.Serializable
import slick.lifted.Rep
Expand All @@ -22,19 +22,21 @@ case class Job(
_owner: ObjectId,
_dataStore: String,
command: JobCommand,
commandArgs: JsObject = Json.obj(),
args: JsObject = Json.obj(),
state: JobState = JobState.PENDING,
manualState: Option[JobState] = None,
_worker: Option[ObjectId] = None,
_voxelyticsWorkflowHash: Option[String] = None,
latestRunId: Option[String] = None,
returnValue: Option[String] = None,
retriedBySuperUser: Boolean = false,
started: Option[Long] = None,
ended: Option[Long] = None,
started: Option[Instant] = None,
ended: Option[Instant] = None,
created: Instant = Instant.now,
isDeleted: Boolean = false
) {
) extends JobResultLinks {
protected def id: ObjectId = _id

def isEnded: Boolean = {
val relevantState = manualState.getOrElse(state)
relevantState == JobState.SUCCESS || state == JobState.FAILURE
Expand All @@ -44,58 +46,12 @@ case class Job(
for {
e <- ended
s <- started
} yield (e - s).millis
} yield e - s

private def effectiveState: JobState = manualState.getOrElse(state)
def effectiveState: JobState = manualState.getOrElse(state)

def exportFileName: Option[String] = argAsStringOpt("export_file_name")

def datasetName: Option[String] = argAsStringOpt("dataset_name")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these lines were moved to a trait, see below


def datasetId: Option[String] = argAsStringOpt("dataset_id")

private def argAsStringOpt(key: String) = (commandArgs \ key).toOption.flatMap(_.asOpt[String])
private def argAsBooleanOpt(key: String) = (commandArgs \ key).toOption.flatMap(_.asOpt[Boolean])

def resultLink(organizationId: String): Option[String] =
if (effectiveState != JobState.SUCCESS) None
else {
command match {
case JobCommand.convert_to_wkw | JobCommand.compute_mesh_file =>
datasetId.map { datasetId =>
val datasetNameMaybe = datasetName.map(name => s"$name-").getOrElse("")
Some(s"/datasets/$datasetNameMaybe$datasetId/view")
}.getOrElse(datasetName.map(name => s"datasets/$organizationId/$name/view"))
case JobCommand.export_tiff | JobCommand.render_animation =>
Some(s"/api/jobs/${this._id}/export")
case JobCommand.infer_neurons if this.argAsBooleanOpt("do_evaluation").getOrElse(false) =>
returnValue.map { resultAnnotationLink =>
resultAnnotationLink
}
case JobCommand.infer_nuclei | JobCommand.infer_neurons | JobCommand.materialize_volume_annotation |
JobCommand.infer_with_model | JobCommand.infer_mitochondria | JobCommand.align_sections |
JobCommand.infer_instances =>
// There exist jobs with dataset name as return value, some with directoryName, and newest with datasetId
// Construct links that work in either case.
returnValue.map { datasetIdentifier =>
ObjectId
.fromStringSync(datasetIdentifier)
.map { asObjectId =>
s"/datasets/$asObjectId/view"
}
.getOrElse(s"/datasets/$organizationId/$datasetIdentifier/view")
}
case _ => None
}
}

def resultLinkPublic(organizationId: String, webknossosPublicUrl: String): Option[String] =
for {
resultLink <- resultLink(organizationId)
resultLinkPublic = if (resultLink.startsWith("/")) s"$webknossosPublicUrl$resultLink"
else s"$resultLink"
} yield resultLinkPublic

def resultLinkSlackFormatted(organizationId: String, webknossosPublicUrl: String): String =
(for {
resultLink <- resultLinkPublic(organizationId, webknossosPublicUrl)
Expand All @@ -108,6 +64,37 @@ case class Job(
}.getOrElse("")
}

case class JobCompactInfo(
id: ObjectId,
command: JobCommand,
organizationId: String,
ownerFirstName: String,
ownerLastName: String,
ownerEmail: String,
args: JsObject,
state: JobState,
returnValue: Option[String],
resultLink: Option[String],
voxelyticsWorkflowHash: Option[String],
created: Instant,
started: Option[Instant],
ended: Option[Instant],
creditCost: Option[scala.math.BigDecimal]
) extends JobResultLinks {

protected def effectiveState: JobState = state

def enrich: JobCompactInfo = this.copy(
resultLink = constructResultLink(organizationId),
args = args - "webknossos_token" - "user_auth_token"
)

}

object JobCompactInfo {
implicit val jsonFormat: OFormat[JobCompactInfo] = Json.format[JobCompactInfo]
}

class JobDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionContext)
extends SQLDAO[Job, JobsRow, Jobs](sqlClient) {
protected val collection = Jobs
Expand Down Expand Up @@ -135,8 +122,8 @@ class JobDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionContext)
r.latestrunid,
r.returnvalue,
r.retriedbysuperuser,
r.started.map(_.getTime),
r.ended.map(_.getTime),
r.started.map(Instant.fromSql),
r.ended.map(Instant.fromSql),
Instant.fromSql(r.created),
r.isdeleted
)
Expand All @@ -159,16 +146,62 @@ class JobDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionContext)
)
"""

private def listAccessQ(requestingUserId: ObjectId) =
q"""_owner = $requestingUserId OR
((SELECT u._organization FROM webknossos.users_ u WHERE u._id = _owner) IN (SELECT _organization FROM webknossos.users_ WHERE _id = $requestingUserId AND isAdmin))
"""
private def listAccessQ(requestingUserId: ObjectId, prefix: SqlToken) =
q"""${prefix}_owner = $requestingUserId OR
((SELECT u._organization FROM webknossos.users_ u WHERE u._id = ${prefix}_owner) IN (SELECT _organization FROM webknossos.users_ WHERE _id = $requestingUserId AND isAdmin))
"""

override def findAll(implicit ctx: DBAccessContext): Fox[List[Job]] =
def findAllCompact(implicit ctx: DBAccessContext): Fox[Seq[JobCompactInfo]] =
for {
accessQuery <- accessQueryFromAccessQ(listAccessQ)
r <- run(q"SELECT $columns FROM $existingCollectionName WHERE $accessQuery ORDER BY created".as[JobsRow])
parsed <- parseAll(r)
accessQuery <- accessQueryFromAccessQWithPrefix(listAccessQ, q"j.")
rows <- run(
q"""
SELECT j._id, j.command, u._organization, u.firstName, u.lastName, mu.email, j.commandArgs, COALESCE(j.manualState, j.state),
j.returnValue, j._voxelytics_workflowHash, j.created, j.started, j.ended, ct.credit_delta
FROM webknossos.jobs_ j
JOIN webknossos.users_ u on j._owner = u._id
JOIN webknossos.multiusers_ mu on u._multiUser = mu._id
LEFT JOIN webknossos.credit_transactions_ ct ON j._id = ct._paid_job
WHERE $accessQuery
ORDER BY j.created
""".as[(ObjectId,
String,
String,
String,
String,
String,
String,
String,
Option[String],
Option[String],
Instant,
Option[Instant],
Option[Instant],
Option[scala.math.BigDecimal])])
parsed <- Fox.serialCombined(rows) { row =>
for {
command <- JobCommand.fromString(row._2).toFox
effectiveState <- JobState.fromString(row._8).toFox
commandArgs <- JsonHelper.parseAs[JsObject](row._7).toFox
} yield
JobCompactInfo(
id = row._1,
command = command,
organizationId = row._3,
ownerFirstName = row._4,
ownerLastName = row._5,
ownerEmail = row._6,
args = commandArgs,
state = effectiveState,
returnValue = row._9,
resultLink = None, // To be filled by calling “enrich”
voxelyticsWorkflowHash = row._10,
created = row._11,
started = row._12,
ended = row._13,
creditCost = row._14.map(_ * -1) // delta is negative, so cost should be positive.
)
}
} yield parsed

override def findOne(jobId: ObjectId)(implicit ctx: DBAccessContext): Fox[Job] =
Expand Down Expand Up @@ -250,7 +283,7 @@ class JobDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionContext)
created, isDeleted
)
VALUES(
${j._id}, ${j._owner}, ${j._dataStore}, ${j.command}, ${j.commandArgs},
${j._id}, ${j._owner}, ${j._dataStore}, ${j.command}, ${j.args},
${j.state}, ${j.manualState}, ${j._worker},
${j.latestRunId}, ${j.returnValue}, ${j.started}, ${j.ended},
${j.created}, ${j.isDeleted})""".asUpdate)
Expand Down
62 changes: 62 additions & 0 deletions app/models/job/JobResultLinks.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package models.job

import com.scalableminds.util.objectid.ObjectId
import models.job.JobCommand.JobCommand
import models.job.JobState.JobState
import play.api.libs.json.JsObject

trait JobResultLinks {
def args: JsObject
def command: JobCommand
def returnValue: Option[String]
protected def id: ObjectId

protected def effectiveState: JobState

def datasetName: Option[String] = argAsStringOpt("dataset_name")

def datasetId: Option[String] = argAsStringOpt("dataset_id")

protected def argAsStringOpt(key: String): Option[String] = (args \ key).toOption.flatMap(_.asOpt[String])

protected def argAsBooleanOpt(key: String): Option[Boolean] = (args \ key).toOption.flatMap(_.asOpt[Boolean])

def constructResultLink(organizationId: String): Option[String] =
if (effectiveState != JobState.SUCCESS) None
else {
command match {
case JobCommand.convert_to_wkw | JobCommand.compute_mesh_file =>
datasetId.map { datasetId =>
val datasetNameMaybe = datasetName.map(name => s"$name-").getOrElse("")
Some(s"/datasets/$datasetNameMaybe$datasetId/view")
}.getOrElse(datasetName.map(name => s"/datasets/$organizationId/$name/view"))
case JobCommand.export_tiff | JobCommand.render_animation =>
Some(s"/api/jobs/${this.id}/export")
case JobCommand.infer_neurons if this.argAsBooleanOpt("do_evaluation").getOrElse(false) =>
returnValue.map { resultAnnotationLink =>
resultAnnotationLink
}
case JobCommand.infer_nuclei | JobCommand.infer_neurons | JobCommand.materialize_volume_annotation |
JobCommand.infer_with_model | JobCommand.infer_mitochondria | JobCommand.align_sections |
JobCommand.infer_instances =>
// There exist jobs with dataset name as return value, some with directoryName, and newest with datasetId
// Construct links that work in either case.
returnValue.map { datasetIdentifier =>
ObjectId
.fromStringSync(datasetIdentifier)
.map { asObjectId =>
s"/datasets/$asObjectId/view"
}
.getOrElse(s"/datasets/$organizationId/$datasetIdentifier/view")
}
case _ => None
}
}

def resultLinkPublic(organizationId: String, webknossosPublicUrl: String): Option[String] =
for {
resultLink <- constructResultLink(organizationId)
resultLinkPublic = if (resultLink.startsWith("/")) s"$webknossosPublicUrl$resultLink"
else s"$resultLink"
} yield resultLinkPublic
}
49 changes: 25 additions & 24 deletions app/models/job/JobService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import models.organization.{CreditTransactionService, OrganizationDAO}
import models.user.{MultiUserDAO, User, UserDAO, UserService}
import com.scalableminds.util.tools.Full
import org.apache.pekko.actor.ActorSystem
import play.api.libs.json.{JsObject, Json}
import play.api.libs.json.{JsObject, JsValue, Json}
import security.WkSilhouetteEnvironment
import telemetry.SlackNotificationService
import utils.WkConf
Expand Down Expand Up @@ -158,7 +158,7 @@ class JobService @Inject()(wkConf: WkConf,
if (job.state == JobState.FAILURE && job.command == JobCommand.convert_to_wkw) {
logger.info(
s"WKW conversion job ${job._id} failed. Deleting dataset from the database, freeing the directoryName...")
val commandArgs = job.commandArgs.value
val commandArgs = job.args.value
for {
datasetDirectoryName <- commandArgs.get("dataset_directory_name").map(_.as[String]).toFox
organizationId <- commandArgs.get("organization_id").map(_.as[String]).toFox
Expand All @@ -168,31 +168,32 @@ class JobService @Inject()(wkConf: WkConf,
} yield ()
} else Fox.successful(())

def publicWrites(job: Job)(implicit ctx: DBAccessContext): Fox[JsObject] =
def publicWrites(job: Job)(implicit ctx: DBAccessContext): Fox[JsValue] =
for {
owner <- userDAO.findOne(job._owner) ?~> "user.notFound"
organization <- organizationDAO.findOne(owner._organization) ?~> "organization.notFound"
resultLink = job.resultLink(organization._id)
ownerJson <- userService.compactWrites(owner)
ownerEmail <- userService.emailFor(owner)
creditTransactionBox <- creditTransactionService.findTransactionOfJob(job._id).shiftBox
} yield {
Json.obj(
"id" -> job._id.id,
"owner" -> ownerJson,
"command" -> job.command,
"commandArgs" -> (job.commandArgs - "webknossos_token" - "user_auth_token"),
"state" -> job.state,
"manualState" -> job.manualState,
"latestRunId" -> job.latestRunId,
"returnValue" -> job.returnValue,
"resultLink" -> resultLink,
"voxelyticsWorkflowHash" -> job._voxelyticsWorkflowHash,
"created" -> job.created,
"started" -> job.started,
"ended" -> job.ended,
"creditCost" -> creditTransactionBox.toOption.map(t => (t.creditDelta * -1).toString)
} yield
Json.toJson(
JobCompactInfo(
id = job._id,
command = job.command,
organizationId = organization._id,
ownerFirstName = owner.firstName,
ownerLastName = owner.lastName,
ownerEmail = ownerEmail,
args = job.args - "webknossos_token" - "user_auth_token",
state = job.effectiveState,
returnValue = job.returnValue,
resultLink = job.constructResultLink(organization._id),
voxelyticsWorkflowHash = job._voxelyticsWorkflowHash,
created = job.created,
started = job.started,
ended = job.ended,
creditCost = creditTransactionBox.toOption.map(t => t.creditDelta * -1) // delta is negative, so cost should be positive.
)
)
}

// Only seen by the workers
def parameterWrites(job: Job)(implicit ctx: DBAccessContext): Fox[JsObject] =
Expand All @@ -204,7 +205,7 @@ class JobService @Inject()(wkConf: WkConf,
Json.obj(
"job_id" -> job._id.id,
"command" -> job.command,
"job_kwargs" -> (job.commandArgs ++ Json.obj("user_auth_token" -> userAuthToken.id))
"job_kwargs" -> (job.args ++ Json.obj("user_auth_token" -> userAuthToken.id))
)
}

Expand All @@ -222,7 +223,7 @@ class JobService @Inject()(wkConf: WkConf,
jobBoundingBox: BoundingBox,
creditTransactionComment: String,
user: User,
datastoreName: String)(implicit ctx: DBAccessContext): Fox[JsObject] =
datastoreName: String)(implicit ctx: DBAccessContext): Fox[JsValue] =
for {
costsInCredits <- if (SHOULD_DEDUCE_CREDITS) calculateJobCostInCredits(jobBoundingBox, command)
else Fox.successful(BigDecimal(0))
Expand Down
Loading