-
Notifications
You must be signed in to change notification settings - Fork 37
SB-27408 | Assessment archival to update existing requests #499
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
kumarks1122
wants to merge
25
commits into
Sunbird-Ed:release-4.5.0
Choose a base branch
from
kumarks1122:release-4.5.0-archival-changes
base: release-4.5.0
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 15 commits
Commits
Show all changes
25 commits
Select commit
Hold shift + click to select a range
d320064
Issue feat SB-27408: Initial commit of Base Archival Job Implementation
manjudr 9a4cbe2
Issue SB-24793 feat: Assessment archived data:: Base Archival job Imp…
utk14 d9c6e57
Issue SB-24793 feat: Assessment archived data:: Base Archival job Imp…
utk14 a0cdd69
Issue SB-24793 feat: Assessment archived data implemetation
utk14 029125b
Issue SB-24793 feat: Assessment archived data implemetation
utk14 9139b7c
Merge pull request #497 from manjudr/assessment-archival-refactor-cha…
manjudr 78797d1
Issue #SB-27408 | Assessment archival to update existing requests
kumarks1122 862e306
Issue #SB-27408 | Assessment archival to create and update requests
kumarks1122 9f1567e
Issue #SB-27408 | Assessment archival test case and fixes added
kumarks1122 0de04e7
Issue #SB-27408 | Assessment archival Base and sub class changes
kumarks1122 4d5cab2
Issue #SB-27408 | Assessment archival changes added
kumarks1122 a60875f
Issue #SB-27408 | Assessment archival changes added
kumarks1122 78ee433
Issue SB-24793 feat: Review comments resolved
utk14 23c1d7b
merge conflicts resolved
utk14 8c39db1
Issue #SB-27408 | Test case fixes added
kumarks1122 e9f71dd
Issue #SB-27408 | PR Review changes added
kumarks1122 970b857
Issue SB-24793 feat: Review comments resolved
utk14 ff71892
merge conflicts resolved
utk14 0401987
Issue #SB-27408 | Archival Metrics changes added
kumarks1122 6b3c627
Issue #SB-27408 | Fixes added
kumarks1122 6ae0f3f
Issue #SB-27408 | Testcase Fixes added
kumarks1122 22c88a5
Issue #SB-27408 | Testcase Fixes added
kumarks1122 c69817e
Issue SB-24793 feat: Added batchfilters and search query support
utk14 4d3fac1
Merge conflicts resolved
utk14 0af8df3
Issue SB-24793 feat: Added batchfilters and search query support
utk14 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
103 changes: 103 additions & 0 deletions
103
data-products/src/main/scala/org/sunbird/analytics/archival/AssessmentArchivalJob.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,103 @@ | ||
| package org.sunbird.analytics.archival | ||
|
|
||
| import org.apache.spark.sql.functions.{col, concat, lit, to_json, to_timestamp, weekofyear, year} | ||
| import org.apache.spark.sql.types.StructType | ||
| import org.apache.spark.sql.{DataFrame, SparkSession} | ||
| import org.ekstep.analytics.framework.conf.AppConf | ||
| import org.ekstep.analytics.framework.util.{JSONUtils, JobLogger} | ||
| import org.ekstep.analytics.framework.{FrameworkContext, JobConfig, Level} | ||
| import org.sunbird.analytics.archival.util.ArchivalRequest | ||
|
|
||
| import java.util.concurrent.atomic.AtomicInteger | ||
|
|
||
| case class Period(year: Int, weekOfYear: Int) | ||
| case class BatchPartition(batchId: String, period: Period) | ||
|
|
||
| object AssessmentArchivalJob extends optional.Application with BaseArchivalJob { | ||
|
|
||
| private val partitionCols = List("batch_id", "year", "week_of_year") | ||
| private val columnWithOrder = List("course_id", "batch_id", "user_id", "content_id", "attempt_id", "created_on", "grand_total", "last_attempted_on", "total_max_score", "total_score", "updated_on", "question") | ||
|
|
||
| override def getClassName = "org.sunbird.analytics.archival.AssessmentArchivalJob" | ||
| override def jobName = "AssessmentArchivalJob" | ||
| override def jobId: String = "assessment-archival" | ||
| override def getReportPath = "assessment-archival/" | ||
| override def getReportKey = "assessment" | ||
| override def dateColumn = "updated_on" | ||
|
|
||
| override def archiveData(requestConfig: Request, requests: Array[ArchivalRequest])(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): List[ArchivalRequest] = { | ||
|
|
||
| val archivalKeyspace = requestConfig.keyspace.getOrElse(AppConf.getConfig("sunbird.courses.keyspace")) | ||
| val batchId: String = requestConfig.batchId.getOrElse("") | ||
| val collId: String = requestConfig.collectionId.getOrElse("") | ||
| val date: String = requestConfig.date.getOrElse(null) | ||
|
|
||
| var data = loadData(Map("table" -> requestConfig.archivalTable, "keyspace" -> archivalKeyspace, "cluster" -> "LMSCluster"), cassandraUrl, new StructType()) | ||
|
|
||
| data = if (batchId.nonEmpty && collId.nonEmpty) { | ||
kumarks1122 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| data.filter(col("batch_id") === batchId && col("course_id") === collId).persist() | ||
| } else if (batchId.nonEmpty) { | ||
| data.filter(col("batch_id") === batchId).persist() | ||
| } else { | ||
| data | ||
| } | ||
|
|
||
| try { | ||
| val dataDF = generatePeriodInData(data) | ||
| val filteredDF = dataFilter(requests, dataDF) | ||
| val archiveBatchList = filteredDF.groupBy(partitionCols.head, partitionCols.tail: _*).count().collect() | ||
| val batchesToArchive: Map[String, Array[BatchPartition]] = archiveBatchList.map(f => BatchPartition(f.get(0).asInstanceOf[String], Period(f.get(1).asInstanceOf[Int], f.get(2).asInstanceOf[Int]))).groupBy(_.batchId) | ||
|
|
||
| archiveBatches(batchesToArchive, filteredDF, requestConfig) | ||
| } catch { | ||
| case ex: Exception => | ||
| ex.printStackTrace() | ||
| List() | ||
| } | ||
| } | ||
|
|
||
| def archiveBatches(batchesToArchive: Map[String, Array[BatchPartition]], data: DataFrame, requestConfig: Request)(implicit config: JobConfig): List[ArchivalRequest] = { | ||
| batchesToArchive.flatMap(batches => { | ||
| val processingBatch = new AtomicInteger(batches._2.length) | ||
| JobLogger.log(s"Started Processing to archive the data", Some(Map("batch_id" -> batches._1, "total_part_files_to_archive" -> processingBatch))) | ||
|
|
||
| // Loop through the week_num & year batch partition | ||
| batches._2.map((batch: BatchPartition) => { | ||
| val filteredDF = data.filter(col("batch_id") === batch.batchId && col("year") === batch.period.year && col("week_of_year") === batch.period.weekOfYear).select(columnWithOrder.head, columnWithOrder.tail: _*) | ||
| val collectionId = filteredDF.first().getAs[String]("course_id") | ||
| var archivalRequest = getRequest(collectionId, batch.batchId, batch.period.year, batch.period.weekOfYear) | ||
kumarks1122 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| if (archivalRequest == null) { | ||
| val request_data = JSONUtils.deserialize[Map[String, AnyRef]](JSONUtils.serialize(requestConfig)) ++ Map[String, Int]( | ||
| "week" -> batch.period.weekOfYear, | ||
| "year"-> batch.period.year | ||
| ) | ||
| archivalRequest = ArchivalRequest("", batch.batchId, collectionId, Option(getReportKey), jobId, None, None, null, null, None, Option(0), JSONUtils.serialize(request_data), None) | ||
| } | ||
|
|
||
| try { | ||
| val urls = upload(filteredDF, batch) // Upload the archived files into blob store | ||
| archivalRequest.blob_url = Option(urls) | ||
| JobLogger.log(s"Data is archived and Processing the remaining part files ", None, Level.INFO) | ||
kumarks1122 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| markRequestAsSuccess(archivalRequest, requestConfig) | ||
| } catch { | ||
| case ex: Exception => { | ||
| markArchivalRequestAsFailed(archivalRequest, ex.getLocalizedMessage) | ||
| } | ||
| } | ||
| }) | ||
| }).toList | ||
| } | ||
|
|
||
| def deleteArchivedData(archivalRequest: Request): List[ArchivalRequest] = { | ||
| // TODO: Deletion feature | ||
| List() | ||
| } | ||
|
|
||
| def generatePeriodInData(data: DataFrame): DataFrame = { | ||
| data.withColumn("updated_on", to_timestamp(col(dateColumn))) | ||
| .withColumn("year", year(col("updated_on"))) | ||
| .withColumn("week_of_year", weekofyear(col("updated_on"))) | ||
| .withColumn("question", to_json(col("question"))) | ||
| } | ||
| } | ||
124 changes: 124 additions & 0 deletions
124
data-products/src/main/scala/org/sunbird/analytics/archival/BaseArchivalJob.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,124 @@ | ||
| package org.sunbird.analytics.archival | ||
|
|
||
| import java.util.concurrent.atomic.AtomicInteger | ||
|
|
||
| import com.datastax.spark.connector.cql.CassandraConnectorConf | ||
| import org.apache.spark.SparkContext | ||
| import org.apache.spark.sql.{DataFrame, SparkSession} | ||
| import org.apache.spark.sql.cassandra._ | ||
| import org.apache.spark.sql.types.StructType | ||
| import org.ekstep.analytics.framework.Level.ERROR | ||
| import org.ekstep.analytics.framework.conf.AppConf | ||
| import org.ekstep.analytics.framework.util.{CommonUtil, JSONUtils, JobLogger} | ||
| import org.ekstep.analytics.framework.{FrameworkContext, IJob, JobConfig, Level} | ||
| import org.sunbird.analytics.exhaust.BaseReportsJob | ||
| import org.ekstep.analytics.framework.util.DatasetUtil.extensions | ||
| import org.apache.spark.sql.functions._ | ||
| import org.joda.time.DateTime | ||
| import org.sunbird.analytics.archival.AssessmentArchivalJob.{getRequests, jobId} | ||
| import org.sunbird.analytics.archival.util.{ArchivalMetaDataStoreJob, ArchivalRequest} | ||
|
|
||
| case class Request(archivalTable: String, keyspace: Option[String], query: Option[String] = Option(""), batchId: Option[String] = Option(""), collectionId: Option[String]=Option(""), date: Option[String] = Option("")) | ||
|
|
||
| trait BaseArchivalJob extends BaseReportsJob with IJob with ArchivalMetaDataStoreJob with Serializable { | ||
|
|
||
| val cassandraUrl = "org.apache.spark.sql.cassandra" | ||
| def dateColumn: String | ||
|
|
||
| def main(config: String)(implicit sc: Option[SparkContext] = None, fc: Option[FrameworkContext] = None): Unit = { | ||
| implicit val className: String = getClassName; | ||
| JobLogger.init(jobName) | ||
| JobLogger.start(s"$jobName started executing - ver3", Option(Map("config" -> config, "model" -> jobName))) | ||
| implicit val jobConfig: JobConfig = JSONUtils.deserialize[JobConfig](config) | ||
| implicit val spark: SparkSession = openSparkSession(jobConfig) | ||
| implicit val frameworkContext: FrameworkContext = getReportingFrameworkContext() | ||
|
|
||
| try { | ||
| val res = CommonUtil.time(execute()); | ||
| JobLogger.end(s"$jobName completed execution", "SUCCESS", None) | ||
| } catch { | ||
| case ex: Exception => ex.printStackTrace() | ||
| JobLogger.log(ex.getMessage, None, ERROR); | ||
| JobLogger.end(jobName + " execution failed", "FAILED", Option(Map("model" -> jobName, "statusMsg" -> ex.getMessage))); | ||
| } | ||
| finally { | ||
| frameworkContext.closeContext(); | ||
| spark.close() | ||
| } | ||
|
|
||
|
|
||
| } | ||
|
|
||
| def init()(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): Unit = { | ||
| spark.setCassandraConf("LMSCluster", CassandraConnectorConf.ConnectionHostParam.option(AppConf.getConfig("sunbird.courses.cluster.host"))) | ||
| } | ||
|
|
||
| def execute()(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): Unit = { | ||
| val modelParams = config.modelParams.getOrElse(Map[String, Option[AnyRef]]()); | ||
| val requestConfig = JSONUtils.deserialize[Request](JSONUtils.serialize(modelParams.getOrElse("request", Request).asInstanceOf[Map[String,AnyRef]])) | ||
| val mode: String = modelParams.getOrElse("mode","archive").asInstanceOf[String] | ||
|
|
||
| val requests = getRequests(jobId, requestConfig.batchId) | ||
|
|
||
| val archivalRequests = mode.toLowerCase() match { | ||
| case "archival" => | ||
| archiveData(requestConfig, requests) | ||
| case "delete" => | ||
| deleteArchivedData(requestConfig) | ||
| } | ||
| for (archivalRequest <- archivalRequests) { | ||
| upsertRequest(archivalRequest) | ||
| } | ||
| } | ||
|
|
||
| def getWeekAndYearVal(date: String): Period = { | ||
| if (null != date && date.nonEmpty) { | ||
| val dt = new DateTime(date) | ||
| Period(year = dt.getYear, weekOfYear = dt.getWeekOfWeekyear) | ||
| } else { | ||
| Period(0, 0) | ||
| } | ||
| } | ||
|
|
||
| def upload(archivedData: DataFrame, batch: BatchPartition)(implicit jobConfig: JobConfig): List[String] = { | ||
| val blobConfig = jobConfig.modelParams.get("blobConfig").asInstanceOf[Map[String, AnyRef]] | ||
| val reportPath: String = blobConfig.getOrElse("reportPath", "archived-data/").asInstanceOf[String] | ||
| val container = AppConf.getConfig("cloud.container.reports") | ||
| val objectKey = AppConf.getConfig("course.metrics.cloud.objectKey") | ||
| val fileName = archivalFormat(batch) | ||
| val storageConfig = getStorageConfig(jobConfig, objectKey) | ||
| JobLogger.log(s"Uploading reports to blob storage", None, Level.INFO) | ||
| archivedData.saveToBlobStore(storageConfig, "csv", s"$reportPath$fileName-${System.currentTimeMillis()}", Option(Map("header" -> "true", "codec" -> "org.apache.hadoop.io.compress.GzipCodec")), None, fileExt=Some("csv.gz")) | ||
| } | ||
|
|
||
| def dataFilter(requests: Array[ArchivalRequest], dataDF: DataFrame): DataFrame = { | ||
| var filteredDF = dataDF | ||
kumarks1122 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| for (request <- requests) { | ||
| if (request.archival_status.equals("SUCCESS")) { | ||
| val request_data = JSONUtils.deserialize[Map[String, AnyRef]](request.request_data) | ||
| filteredDF = dataDF.filter( | ||
| col("batch_id").equalTo(request.batch_id) && | ||
| concat(col("year"), lit("-"), col("week_of_year")) =!= lit(request_data.get("year").get + "-" + request_data.get("week").get) | ||
kumarks1122 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| ) | ||
| } | ||
| } | ||
| filteredDF | ||
| }; | ||
|
|
||
| // Overriding methods START: | ||
| def jobId: String; | ||
| def jobName: String; | ||
| def getReportPath: String; | ||
| def getReportKey: String; | ||
| def getClassName: String; | ||
|
|
||
| def archiveData(requestConfig: Request, requests: Array[ArchivalRequest])(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): List[ArchivalRequest]; | ||
| def deleteArchivedData(archivalRequest: Request): List[ArchivalRequest]; | ||
|
|
||
| def archivalFormat(batch: BatchPartition): String = { | ||
kumarks1122 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| s"${batch.batchId}/${batch.period.year}-${batch.period.weekOfYear}" | ||
|
|
||
| //Overriding methods END: | ||
| } | ||
|
|
||
| } | ||
157 changes: 157 additions & 0 deletions
157
...roducts/src/main/scala/org/sunbird/analytics/archival/util/ArchivalMetaDataStoreJob.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,157 @@ | ||
| package org.sunbird.analytics.archival.util | ||
|
|
||
| import java.security.MessageDigest | ||
| import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet, Timestamp} | ||
| import java.util.Properties | ||
| import org.apache.commons.lang.StringUtils | ||
| import org.apache.spark.sql.{Encoders, SparkSession} | ||
| import org.apache.spark.sql.functions.{col, lit} | ||
| import org.ekstep.analytics.framework.{FrameworkContext, JobConfig} | ||
| import org.ekstep.analytics.framework.Level.INFO | ||
| import org.ekstep.analytics.framework.conf.AppConf | ||
| import org.ekstep.analytics.framework.util.{CommonUtil, JSONUtils, JobLogger} | ||
| import org.sunbird.analytics.archival.Request | ||
|
|
||
| case class ArchivalRequest(request_id: String, batch_id: String, collection_id: String, resource_type: Option[String], job_id: String, | ||
| var archival_date: Option[Long],var completion_date: Option[Long],var archival_status: String,var deletion_status: String, | ||
| var blob_url: Option[List[String]],var iteration: Option[Int], request_data: String, var err_message: Option[String]) | ||
|
|
||
| trait ArchivalMetaDataStoreJob { | ||
|
|
||
| implicit val className: String = getClassName; | ||
| val connProperties: Properties = CommonUtil.getPostgresConnectionProps() | ||
| val db: String = AppConf.getConfig("postgres.db") | ||
| val url: String = AppConf.getConfig("postgres.url") + s"$db" | ||
| val requestsTable: String = AppConf.getConfig("postgres.table.archival_request") | ||
| val dbc: Connection = DriverManager.getConnection(url, connProperties.getProperty("user"), connProperties.getProperty("password")); | ||
| dbc.setAutoCommit(true); | ||
|
|
||
| def getClassName(): String; | ||
|
|
||
| def cleanUp() { | ||
| dbc.close(); | ||
| } | ||
|
|
||
| def getRequests(jobId: String, batchId: Option[String])(implicit spark: SparkSession, fc: FrameworkContext): Array[ArchivalRequest] = { | ||
| println("jobid: " + jobId + " batchid: " + batchId) | ||
kumarks1122 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| val encoder = Encoders.product[ArchivalRequest] | ||
| val archivalConfigsDf = spark.read.jdbc(url, requestsTable, connProperties) | ||
| .where(col("job_id") === jobId && col("iteration") < 3) | ||
| println("archivalConfigDF:") | ||
kumarks1122 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| archivalConfigsDf.show(false) | ||
kumarks1122 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| val filteredReportConfigDf = if (batchId.isDefined) { | ||
| val filteredArchivalConfig = archivalConfigsDf.filter(col("batch_id").equalTo(batchId.get)) | ||
| if (filteredArchivalConfig.count() > 0) filteredArchivalConfig else archivalConfigsDf | ||
| } else archivalConfigsDf | ||
| println("filteredtReportCOnfig: ") | ||
kumarks1122 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| filteredReportConfigDf.show(false) | ||
| JobLogger.log("fetched records count" + filteredReportConfigDf.count(), None, INFO) | ||
| val requests = filteredReportConfigDf.as[ArchivalRequest](encoder).collect() | ||
| requests | ||
| } | ||
|
|
||
| def getRequestID(collectionId: String, batchId: String, year: Int, week: Int): String = { | ||
kumarks1122 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| val requestComb = s"$collectionId:$batchId:$year:$week" | ||
| MessageDigest.getInstance("MD5").digest(requestComb.getBytes).map("%02X".format(_)).mkString | ||
| } | ||
|
|
||
| def getRequest(collectionId: String, batchId: String, year: Int, week: Int): ArchivalRequest = { | ||
| val requestId = getRequestID(collectionId, batchId, year, week) | ||
| val archivalRequest = s"""select * from $requestsTable where request_id = '$requestId' limit 1""" | ||
| val pstmt: PreparedStatement = dbc.prepareStatement(archivalRequest); | ||
| val resultSet = pstmt.executeQuery() | ||
|
|
||
| if (resultSet.next()) getArchivalRequest(resultSet) else null | ||
| } | ||
|
|
||
| private def getArchivalRequest(resultSet: ResultSet): ArchivalRequest = { | ||
| ArchivalRequest( | ||
| resultSet.getString("request_id"), | ||
| resultSet.getString("batch_id"), | ||
| resultSet.getString("collection_id"), | ||
| Option(resultSet.getString("resource_type")), | ||
| resultSet.getString("job_id"), | ||
| Option(resultSet.getTimestamp("archival_date").getTime), | ||
| if (resultSet.getTimestamp("completion_date") != null) Option(resultSet.getTimestamp("completion_date").getTime) else None, | ||
| resultSet.getString("archival_status"), | ||
| resultSet.getString("deletion_status"), | ||
| if (resultSet.getArray("blob_url") != null) Option(resultSet.getArray("blob_url").getArray().asInstanceOf[Array[String]].toList) else None, | ||
| Option(resultSet.getInt("iteration")), | ||
| resultSet.getString("request_data"), | ||
| Option(resultSet.getString("err_message")) | ||
| ) | ||
| } | ||
|
|
||
| def markArchivalRequestAsFailed(request: ArchivalRequest, failedMsg: String): ArchivalRequest = { | ||
| request.archival_status = "FAILED"; | ||
| request.archival_date = Option(System.currentTimeMillis()); | ||
| request.iteration = Option(request.iteration.getOrElse(0) + 1); | ||
| request.err_message = Option(failedMsg); | ||
| request | ||
| } | ||
|
|
||
| def markDeletionRequestAsFailed(request: ArchivalRequest, failedMsg: String): ArchivalRequest = { | ||
| request.deletion_status = "FAILED"; | ||
| request.archival_date = Option(System.currentTimeMillis()); | ||
| request.iteration = Option(request.iteration.getOrElse(0) + 1); | ||
| request.err_message = Option(failedMsg); | ||
| request | ||
| } | ||
|
|
||
| def createRequest(request: ArchivalRequest) = { | ||
| val insertQry = s"INSERT INTO $requestsTable (request_id, batch_id, collection_id, resource_type, job_id, archival_date, completion_date, archival_status, " + | ||
| s"deletion_status, blob_url, iteration, request_data, err_message) VALUES (?,?,?,?,?,?,?,?,?,?,?,?::json,?)" | ||
| val pstmt: PreparedStatement = dbc.prepareStatement(insertQry); | ||
| val request_data = JSONUtils.deserialize[Map[String, AnyRef]](request.request_data) | ||
| val requestId = getRequestID(request.collection_id, request.batch_id, request_data("year").asInstanceOf[Int], request_data("week").asInstanceOf[Int]) | ||
| pstmt.setString(1, requestId); | ||
| pstmt.setString(2, request.batch_id); | ||
| pstmt.setString(3, request.collection_id); | ||
| pstmt.setString(4, request.resource_type.getOrElse("assessment")); | ||
| pstmt.setString(5, request.job_id); | ||
| pstmt.setTimestamp(6, if (request.archival_date.isDefined) new Timestamp(request.archival_date.get) else null); | ||
| pstmt.setTimestamp(7, if (request.completion_date.isDefined) new Timestamp(request.completion_date.get) else null); | ||
| pstmt.setString(8, request.archival_status); | ||
| pstmt.setString(9, request.deletion_status); | ||
| val blobURLs = request.blob_url.getOrElse(List()).toArray.asInstanceOf[Array[Object]]; | ||
| pstmt.setArray(10, dbc.createArrayOf("text", blobURLs)) | ||
| pstmt.setInt(11, request.iteration.getOrElse(0)) | ||
| pstmt.setString(12, request.request_data) | ||
| pstmt.setString(13, StringUtils.abbreviate(request.err_message.getOrElse(""), 300)); | ||
|
|
||
| pstmt.execute() | ||
| } | ||
|
|
||
| def upsertRequest(request: ArchivalRequest): Unit = { | ||
| if (request.request_id.isEmpty) { | ||
| createRequest(request) | ||
| } else { | ||
| updateRequest(request) | ||
| } | ||
| } | ||
|
|
||
| def updateRequest(request: ArchivalRequest): Unit = { | ||
| val updateQry = s"UPDATE $requestsTable SET blob_url=?, iteration = ?, archival_date=?, completion_date=?, " + | ||
| s"archival_status=?, deletion_status=? WHERE request_id=?"; | ||
| val pstmt: PreparedStatement = dbc.prepareStatement(updateQry) | ||
|
|
||
| val blobURLs = request.blob_url.getOrElse(List()).toArray.asInstanceOf[Array[Object]]; | ||
| pstmt.setArray(1, dbc.createArrayOf("text", blobURLs)) | ||
| pstmt.setInt(2, request.iteration.get); | ||
| pstmt.setTimestamp(3, if (request.archival_date.isDefined) new Timestamp(request.archival_date.get) else null); | ||
| pstmt.setTimestamp(4, if (request.completion_date.isDefined) new Timestamp(request.completion_date.get) else null); | ||
| pstmt.setString(5, request.archival_status); | ||
| pstmt.setString(6, request.deletion_status); | ||
| pstmt.setString(7, request.request_id); | ||
|
|
||
| pstmt.execute() | ||
| } | ||
|
|
||
| def markRequestAsSuccess(request: ArchivalRequest, requestConfig: Request): ArchivalRequest = { | ||
| request.archival_status = "SUCCESS"; | ||
| request.archival_date = Option(System.currentTimeMillis()) | ||
| request | ||
| } | ||
|
|
||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.