diff --git a/data-products/src/main/scala/org/sunbird/analytics/job/report/AssessmentArchivalJob.scala b/data-products/src/main/scala/org/sunbird/analytics/job/report/AssessmentArchivalJob.scala new file mode 100644 index 000000000..ba0734a08 --- /dev/null +++ b/data-products/src/main/scala/org/sunbird/analytics/job/report/AssessmentArchivalJob.scala @@ -0,0 +1,114 @@ +package org.sunbird.analytics.job.report + +import com.datastax.spark.connector.cql.CassandraConnectorConf +import org.apache.spark.SparkContext +import org.apache.spark.sql.cassandra.CassandraSparkSessionFunctions +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.ekstep.analytics.framework.Level.INFO +import org.ekstep.analytics.framework.conf.AppConf +import org.ekstep.analytics.framework.util.DatasetUtil.extensions +import org.ekstep.analytics.framework.util.{CommonUtil, JSONUtils, JobLogger} +import org.ekstep.analytics.framework.{FrameworkContext, IJob, JobConfig} + +import java.util.concurrent.atomic.AtomicInteger + +object AssessmentArchivalJob extends optional.Application with IJob with BaseReportsJob { + val cassandraUrl = "org.apache.spark.sql.cassandra" + private val assessmentAggDBSettings: Map[String, String] = Map("table" -> AppConf.getConfig("sunbird.courses.assessment.table"), "keyspace" -> AppConf.getConfig("sunbird.courses.keyspace"), "cluster" -> "LMSCluster") + implicit val className: String = "org.sunbird.analytics.job.report.AssessmentArchivalJob" + private val partitionCols = List("batch_id", "year", "week_of_year") + + case class BatchPartition(batch_id: String, year: Int, week_of_year: Int) + + // $COVERAGE-OFF$ Disabling scoverage for main and execute method + override def main(config: String)(implicit sc: Option[SparkContext], fc: Option[FrameworkContext]): Unit = { + + implicit val className: String = "org.sunbird.analytics.job.report.AssessmentArchivalJob" + val jobName = "AssessmentArchivalJob" + JobLogger.init(jobName) + JobLogger.start(s"$jobName started executing", Option(Map("config" -> config, "model" -> jobName))) + implicit val jobConfig: JobConfig = JSONUtils.deserialize[JobConfig](config) + implicit val spark: SparkSession = openSparkSession(jobConfig) + + implicit val frameworkContext: FrameworkContext = getReportingFrameworkContext() + val modelParams = jobConfig.modelParams.get + val truncateData: Boolean = modelParams.getOrElse("truncateData", "false").asInstanceOf[Boolean] + init() + try { + val res = CommonUtil.time(archiveData(spark, fetchData, jobConfig)) + val total_archived_files = res._2.length + if (truncateData) deleteRecords(spark, assessmentAggDBSettings.getOrElse("keyspace", "sunbird_courses"), assessmentAggDBSettings.getOrElse("table", "assessment_aggregator")) else JobLogger.log(s"Skipping the ${assessmentAggDBSettings.getOrElse("table", "assessment_aggregator")} truncate process", None, INFO) + JobLogger.end(s"$jobName completed execution", "SUCCESS", Option(Map("timeTaken" -> res._1, "total_archived_files" -> total_archived_files))) + } catch { + case ex: Exception => { + ex.printStackTrace() + JobLogger.end(s"$jobName completed execution with the error ${ex.getMessage}", "FAILED", None) + } + } finally { + frameworkContext.closeContext() + spark.close() + } + } + + def init()(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): Unit = { + spark.setCassandraConf("LMSCluster", CassandraConnectorConf.ConnectionHostParam.option(AppConf.getConfig("sunbird.courses.cluster.host"))) + } + + // $COVERAGE-ON$ + def archiveData(sparkSession: SparkSession, fetchData: (SparkSession, Map[String, String], String, StructType) => DataFrame, jobConfig: JobConfig): Array[Map[String, Any]] = { + val batches: List[String] = AppConf.getConfig("assessment.batches").split(",").toList + val assessmentDF: DataFrame = getAssessmentData(sparkSession, fetchData, batches) + val assessmentData = assessmentDF.withColumn("updated_on", to_timestamp(col("updated_on"))) + .withColumn("year", year(col("updated_on"))) + .withColumn("week_of_year", weekofyear(col("updated_on"))) + .withColumn("question", to_json(col("question"))) + val archivedBatchList = assessmentData.groupBy(partitionCols.head, partitionCols.tail: _*).count().collect() + val archivedBatchCount = new AtomicInteger(archivedBatchList.length) + JobLogger.log(s"Total Batches to Archive By Year & Week $archivedBatchCount", None, INFO) + val batchesToArchive: Array[BatchPartition] = archivedBatchList.map(f => BatchPartition(f.get(0).asInstanceOf[String], f.get(1).asInstanceOf[Int], f.get(2).asInstanceOf[Int])) + for (batch <- batchesToArchive) yield { + val filteredDF = assessmentData + .filter(col("batch_id") === batch.batch_id && col("year") === batch.year && col("week_of_year") === batch.week_of_year) + upload(filteredDF.drop("year", "week_of_year"), batch, jobConfig) + val metrics = Map("batch_id" -> batch.batch_id, "year" -> batch.year, "week_of_year" -> batch.week_of_year, "pending_batches" -> archivedBatchCount.getAndDecrement(), "total_records" -> filteredDF.count()) + JobLogger.log(s"Data is archived and Remaining batches to archive is ", Some(metrics), INFO) + assessmentData.unpersist() + metrics + } + } + + def getAssessmentData(spark: SparkSession, fetchData: (SparkSession, Map[String, String], String, StructType) => DataFrame, batchIds: List[String]): DataFrame = { + import spark.implicits._ + val assessmentDF = fetchData(spark, assessmentAggDBSettings, cassandraUrl, new StructType()) + if (batchIds.nonEmpty) { + val batchListDF = batchIds.asInstanceOf[List[String]].toDF("batch_id") + assessmentDF.join(batchListDF, Seq("batch_id"), "inner").persist() + } else { + assessmentDF + } + } + + def deleteRecords(sparkSession: SparkSession, keyspace: String, table: String): Unit = { + // sparkSession.sql(s"TRUNCATE TABLE $keyspace.$table") + JobLogger.log(s"The Job Cleared The Table Data SuccessFully, Please Execute The Compaction", None, INFO) + } + + def upload(archivedData: DataFrame, + batch: BatchPartition, + jobConfig: JobConfig): List[String] = { + val modelParams = jobConfig.modelParams.get + val reportPath: String = modelParams.getOrElse("reportPath", "archival-data/").asInstanceOf[String] + val container = AppConf.getConfig("cloud.container.reports") + val objectKey = AppConf.getConfig("course.metrics.cloud.objectKey") + val fileName = s"${batch.batch_id}-${batch.year}-${batch.week_of_year}" + val storageConfig = getStorageConfig( + container, + objectKey, + jobConfig) + JobLogger.log(s"Uploading reports to blob storage", None, INFO) + archivedData.saveToBlobStore(storageConfig, "csv", s"$reportPath$fileName-${System.currentTimeMillis()}", Option(Map("header" -> "true")), None) + } + +} diff --git a/data-products/src/test/resources/application.conf b/data-products/src/test/resources/application.conf index 9ff1086f6..719cf2953 100644 --- a/data-products/src/test/resources/application.conf +++ b/data-products/src/test/resources/application.conf @@ -164,6 +164,7 @@ sunbird.report.cluster.host=127.0.0.1 sunbird.user.report.keyspace="sunbird_courses" collection.exhaust.store.prefix="reports" postgres.table.job_request="job_request" +sunbird.courses.assessment.table = "assessment_aggregator" druid.report.default.storage="local" druid.report.date.format="yyyy-MM-dd" diff --git a/data-products/src/test/resources/assessment-archival/assessment_aggregator.csv b/data-products/src/test/resources/assessment-archival/assessment_aggregator.csv new file mode 100644 index 000000000..088edc97e --- /dev/null +++ b/data-products/src/test/resources/assessment-archival/assessment_aggregator.csv @@ -0,0 +1,9 @@ +content_id,attempt_id,user_id,course_id,batch_id,created_on,last_attempted_on,total_max_score,total_score,updated_on,grand_total,question +do_112835335135993856149,A3,user030,do_1125559882615357441175,1010,1971-09-22 02:10:53.444+0000,2019-09-06 09:59:51.000+0000,10,5,2019-09-06 09:59:51.000+0000,"10/2","[{id: 'q3', max_score: 2, score: 1, type: 'mmc', title: 'choose one', resvalues: [{'2': '{\"text\":\"Quantity\\n\"}'}], params: [{'1': '{\"text\":\"Space and Shape\\n\"}'}, {'2': '{\"text\":\"Quantity\\n\"}'}, {'3': '{\"text\":\"Reading Skills\\n\"}'}, {'4': '{\"text\":\"Uncertainity and Data\\n\"}'}, {'answer': '{\"correct\":[\"3\"]}'}], description: 'Description of the question', duration: 10}]" +do_112835335135993856149,A4,user021,do_2123101488779837441168,1001,1971-09-22 02:10:53.444+0000,2019-09-06 09:59:51.000+0000,20,4,2019-09-06 09:59:51.000+0000,"2/2","[{id: 'q3', max_score: 2, score: 1, type: 'mmc', title: 'choose one', resvalues: [{'2': '{\"text\":\"Quantity\\n\"}'}], params: [{'1': '{\"text\":\"Space and Shape\\n\"}'}, {'2': '{\"text\":\"Quantity\\n\"}'}, {'3': '{\"text\":\"Reading Skills\\n\"}'}, {'4': '{\"text\":\"Uncertainity and Data\\n\"}'}, {'answer': '{\"correct\":[\"3\"]}'}], description: 'Description of the question', duration: 10}]" +do_112835336280596480151,A4,user021,do_2123101488779837441168,1001,1971-09-22 02:10:53.444+0000,2019-09-06 09:58:51.000+0000,30,10,2019-09-06 09:59:51.000+0000,"4/4","[{id: 'q3', max_score: 2, score: 1, type: 'mmc', title: 'choose one', resvalues: [{'2': '{\"text\":\"Quantity\\n\"}'}], params: [{'1': '{\"text\":\"Space and Shape\\n\"}'}, {'2': '{\"text\":\"Quantity\\n\"}'}, {'3': '{\"text\":\"Reading Skills\\n\"}'}, {'4': '{\"text\":\"Uncertainity and Data\\n\"}'}, {'answer': '{\"correct\":[\"3\"]}'}], description: 'Description of the question', duration: 10}]" +do_112835336280596480151,A4,user021,do_2123101488779837441168,1001,1971-09-22 02:10:53.444+0000,2019-09-06 09:57:51.000+0000,30,8,2019-09-06 09:59:51.000+0000,"4/4","[{id: 'q3', max_score: 2, score: 1, type: 'mmc', title: 'choose one', resvalues: [{'2': '{\"text\":\"Quantity\\n\"}'}], params: [{'1': '{\"text\":\"Space and Shape\\n\"}'}, {'2': '{\"text\":\"Quantity\\n\"}'}, {'3': '{\"text\":\"Reading Skills\\n\"}'}, {'4': '{\"text\":\"Uncertainity and Data\\n\"}'}, {'answer': '{\"correct\":[\"3\"]}'}], description: 'Description of the question', duration: 10}]" +do_112832394979106816112,A1,user015,do_112695422838472704115,1005,1971-09-22 02:10:53.444+0000,2019-09-06 09:59:51.000+0000,10,5,2019-09-06 09:59:51.000+0000,"4/4","[{id: 'q3', max_score: 2, score: 1, type: 'mmc', title: 'choose one', resvalues: [{'2': '{\"text\":\"Quantity\\n\"}'}], params: [{'1': '{\"text\":\"Space and Shape\\n\"}'}, {'2': '{\"text\":\"Quantity\\n\"}'}, {'3': '{\"text\":\"Reading Skills\\n\"}'}, {'4': '{\"text\":\"Uncertainity and Data\\n\"}'}, {'answer': '{\"correct\":[\"3\"]}'}], description: 'Description of the question', duration: 10}]" +do_112832394979106816112,A2,user030,do_1125559882615357441175,1010,1971-09-22 02:10:53.444+0000,2019-09-06 09:59:51.000+0000,10,5,2019-09-06 09:59:51.000+0000,"4/4","[{id: 'q3', max_score: 2, score: 1, type: 'mmc', title: 'choose one', resvalues: [{'2': '{\"text\":\"Quantity\\n\"}'}], params: [{'1': '{\"text\":\"Space and Shape\\n\"}'}, {'2': '{\"text\":\"Quantity\\n\"}'}, {'3': '{\"text\":\"Reading Skills\\n\"}'}, {'4': '{\"text\":\"Uncertainity and Data\\n\"}'}, {'answer': '{\"correct\":[\"3\"]}'}], description: 'Description of the question', duration: 10}]" +do_112832394979106816112,A6,user026,do_1126458775024025601296,1006,1971-09-22 02:10:53.444+0000,2019-09-06 09:59:51.000+0000,30,10,2019-09-06 09:59:51.000+0000,"5/5","[{id: 'q3', max_score: 2, score: 1, type: 'mmc', title: 'choose one', resvalues: [{'2': '{\"text\":\"Quantity\\n\"}'}], params: [{'1': '{\"text\":\"Space and Shape\\n\"}'}, {'2': '{\"text\":\"Quantity\\n\"}'}, {'3': '{\"text\":\"Reading Skills\\n\"}'}, {'4': '{\"text\":\"Uncertainity and Data\\n\"}'}, {'answer': '{\"correct\":[\"3\"]}'}], description: 'Description of the question', duration: 10}]" +do_112832394979106816112,A6,user021,do_1126458775024025601296,1006,1971-09-22 02:10:53.444+0000,2019-09-06 09:59:51.000+0000,30,10,2019-09-06 09:59:51.000+0000,"6/6","[{id: 'q3', max_score: 2, score: 1, type: 'mmc', title: 'choose one', resvalues: [{'2': '{\"text\":\"Quantity\\n\"}'}], params: [{'1': '{\"text\":\"Space and Shape\\n\"}'}, {'2': '{\"text\":\"Quantity\\n\"}'}, {'3': '{\"text\":\"Reading Skills\\n\"}'}, {'4': '{\"text\":\"Uncertainity and Data\\n\"}'}, {'answer': '{\"correct\":[\"3\"]}'}], description: 'Description of the question', duration: 10}]" \ No newline at end of file diff --git a/data-products/src/test/scala/org/sunbird/analytics/job/report/TestAssessmentArchivalJob.scala b/data-products/src/test/scala/org/sunbird/analytics/job/report/TestAssessmentArchivalJob.scala new file mode 100644 index 000000000..fd33988d9 --- /dev/null +++ b/data-products/src/test/scala/org/sunbird/analytics/job/report/TestAssessmentArchivalJob.scala @@ -0,0 +1,83 @@ +package org.sunbird.analytics.job.report + + +import org.apache.spark.sql.functions.udf +import org.apache.spark.sql.types.{ArrayType, MapType, StringType, StructType} +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.ekstep.analytics.framework.conf.AppConf +import org.ekstep.analytics.framework.util.{HadoopFileUtil, JSONUtils} +import org.ekstep.analytics.framework.{FrameworkContext, JobConfig} +import org.scalamock.scalatest.MockFactory + +import scala.collection.mutable + + +class TestAssessmentArchivalJob extends BaseReportSpec with MockFactory { + + var spark: SparkSession = _ + + var assessmentAggDF: DataFrame = _ + var reporterMock: BaseReportsJob = mock[BaseReportsJob] + val sunbirdCoursesKeyspace = "sunbird_courses" + + override def beforeAll(): Unit = { + super.beforeAll() + spark = getSparkSession(); + assessmentAggDF = spark + .read + .format("com.databricks.spark.csv") + .option("header", "true") + .load("src/test/resources/assessment-archival/assessment_aggregator.csv") + .cache() + } + + override def afterAll(): Unit = { + super.afterAll() + val objectKey = AppConf.getConfig("course.metrics.cloud.objectKey") + new HadoopFileUtil().delete(spark.sparkContext.hadoopConfiguration, objectKey + "assessment-archival") + } + + val convertMethod = udf((value: mutable.WrappedArray[String]) => { + if (null != value && value.nonEmpty) + value.toList.map(str => JSONUtils.deserialize(str)(manifest[Map[String, String]])).toArray + else null + }, new ArrayType(MapType(StringType, StringType), true)) + + it should "Should able to archive the batch data" in { + initializeDefaultMockData() + implicit val mockFc: FrameworkContext = mock[FrameworkContext] + val strConfig = """{"search":{"type":"none"},"model":"org.sunbird.analytics.job.report.AssessmentArchivalJob","modelParams":{"truncateData":false,"store":"local","sparkCassandraConnectionHost":"{{ core_cassandra_host }}","fromDate":"$(date --date yesterday '+%Y-%m-%d')","toDate":"$(date --date yesterday '+%Y-%m-%d')"},"parallelization":8,"appName":"Assessment Archival Job"}""".stripMargin + implicit val jobConfig: JobConfig = JSONUtils.deserialize[JobConfig](strConfig) + val reportData = AssessmentArchivalJob.archiveData(spark, reporterMock.fetchData, jobConfig) + + val batch_1 = reportData.filter(x => x.getOrElse("batch_id", "").asInstanceOf[String] === "1010") + batch_1.foreach(res => res("year") === "2019") + batch_1.foreach(res => res("total_records") === "2") + batch_1.foreach(res => res("week_of_year") === "36") + + + val batch_2 = reportData.filter(x => x.getOrElse("batch_id", "").asInstanceOf[String] === "1001") + batch_2.foreach(res => res("year") === "2019") + batch_2.foreach(res => res("total_records") === "3") + batch_2.foreach(res => res("week_of_year") === "36") + + + val batch_3 = reportData.filter(x => x.getOrElse("batch_id", "").asInstanceOf[String] === "1005") + batch_3.foreach(res => res("year") === "2019") + batch_3.foreach(res => res("total_records") === "1") + batch_3.foreach(res => res("week_of_year") === "36") + + + val batch_4 = reportData.filter(x => x.getOrElse("batch_id", "").asInstanceOf[String] === "1006") + batch_4.foreach(res => res("year") === "2019") + batch_4.foreach(res => res("total_records") === "2") + batch_4.foreach(res => res("week_of_year") === "36") + + } + + def initializeDefaultMockData() { + (reporterMock.fetchData _) + .expects(spark, Map("table" -> "assessment_aggregator", "keyspace" -> sunbirdCoursesKeyspace, "cluster" -> "LMSCluster"), "org.apache.spark.sql.cassandra", new StructType()) + .returning(assessmentAggDF) + } +} \ No newline at end of file