Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ import org.joda.time.format.{DateTimeFormat, DateTimeFormatter}
import org.joda.time.{DateTime, DateTimeZone}
import org.sunbird.analytics.exhaust.{BaseReportsJob, JobRequest, OnDemandExhaustJob}
import org.sunbird.analytics.util.DecryptUtil

import java.security.MessageDigest
import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicInteger

import org.apache.spark.sql.expressions.UserDefinedFunction
import org.sunbird.analytics.exhaust.collection.ResponseExhaustJobV2.Question

import scala.collection.immutable.List
import scala.collection.mutable.ListBuffer

Expand Down Expand Up @@ -543,4 +546,7 @@ object UDFUtils extends Serializable {
}

val getLatestValue = udf[String, String, String](getLatestValueFun)

def convertStringToList: UserDefinedFunction =
udf { str: String => JSONUtils.deserialize[List[Question]](str) }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package org.sunbird.analytics.exhaust.collection

import org.apache.spark.sql.functions.{col, explode_outer, round}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.ekstep.analytics.framework.conf.AppConf
import org.ekstep.analytics.framework.util.JobLogger
import org.ekstep.analytics.framework.{FrameworkContext, JobConfig}
import org.sunbird.analytics.exhaust.util.ExhaustUtil

object ResponseExhaustJobV2 extends optional.Application with BaseCollectionExhaustJob {

override def getClassName = "org.sunbird.analytics.exhaust.collection.ResponseExhaustJobV2"
override def jobName() = "ResponseExhaustJobV2";
override def jobId() = "response-exhaust";
override def getReportPath() = "response-exhaust/";
override def getReportKey() = "response";

private val persistedDF:scala.collection.mutable.ListBuffer[DataFrame] = scala.collection.mutable.ListBuffer[DataFrame]();

private val assessmentAggDBSettings = Map("table" -> "assessment_aggregator", "keyspace" -> AppConf.getConfig("sunbird.courses.keyspace"), "cluster" -> "LMSCluster");

private val filterColumns = Seq("courseid", "collectionName", "batchid", "batchName", "userid", "content_id", "contentname", "attempt_id", "last_attempted_on", "questionid",
"questiontype", "questiontitle", "questiondescription", "questionduration", "questionscore", "questionmaxscore", "questionoption", "questionresponse");
private val columnsOrder = List("Collection Id", "Collection Name", "Batch Id", "Batch Name", "User UUID", "QuestionSet Id", "QuestionSet Title", "Attempt Id", "Attempted On",
"Question Id", "Question Type", "Question Title", "Question Description", "Question Duration", "Question Score", "Question Max Score", "Question Options", "Question Response");
val columnMapping = Map("courseid" -> "Collection Id", "collectionName" -> "Collection Name", "batchid" -> "Batch Id", "batchName" -> "Batch Name", "userid" -> "User UUID",
"content_id" -> "QuestionSet Id", "contentname" -> "QuestionSet Title", "attempt_id" -> "Attempt Id", "last_attempted_on" -> "Attempted On", "questionid" -> "Question Id",
"questiontype" -> "Question Type", "questiontitle" -> "Question Title", "questiondescription" -> "Question Description", "questionduration" -> "Question Duration",
"questionscore" -> "Question Score", "questionmaxscore" -> "Question Max Score", "questionoption" -> "Question Options", "questionresponse" -> "Question Response")

case class AssessmentData(user_id: String, course_id: String,batch_id: String,content_id: String, attempt_id: String,created_on: Option[String],grand_total: Option[String],last_attempted_on: Option[String],question: List[Question],total_max_score: Option[Double],total_score: Option[Double],updated_on: Option[String]) extends scala.Product with scala.Serializable
case class Question(id: String, assess_ts: String,max_score: Double, score: Double,`type`: String,title: String,resvalues: List[Map[String, String]],params: List[Map[String, String]],description: String,duration: Double) extends scala.Product with scala.Serializable

override def processBatch(userEnrolmentDF: DataFrame, collectionBatch: CollectionBatch)(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): DataFrame = {
val assessmentDF = getAssessmentDF(userEnrolmentDF, collectionBatch).persist();
persistedDF.append(assessmentDF);
val contentIds = assessmentDF.select("content_id").dropDuplicates().collect().map(f => f.get(0));
val contentDF = searchContent(Map("request" -> Map("filters" -> Map("identifier" -> contentIds)))).withColumnRenamed("collectionName", "contentname").select("identifier", "contentname");
val reportDF = assessmentDF.join(contentDF, assessmentDF("content_id") === contentDF("identifier"), "left_outer").drop("identifier").select(filterColumns.head, filterColumns.tail: _*);
organizeDF(reportDF, columnMapping, columnsOrder);
}

override def unpersistDFs() {
persistedDF.foreach(f => f.unpersist(true))
}

def getAssessmentDF(userEnrolmentDF: DataFrame, batch: CollectionBatch)(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): DataFrame = {

val userEnrolmentDataDF = userEnrolmentDF
.select(
col("userid"),
col("courseid"),
col("collectionName"),
col("batchName"),
col("batchid"))

val batchid = userEnrolmentDataDF.select("batchid").distinct().collect().head.getString(0)

val assessAggregateData = loadData(assessmentAggDBSettings, cassandraFormat, new StructType())

val joinedDF = try {
val assessBlobData = getAssessmentBlobDF(batchid, config)

val joinDF = assessAggregateData.join(assessBlobData, Seq("batch_id", "course_id", "user_id"), "left")
.select(assessAggregateData.col("*"))
joinDF
} catch {
case e => JobLogger.log("Blob does not contain any file for batchid: " + batchid)
assessAggregateData
}
Comment on lines +60 to +71
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we still creating one single CSV file of few GBs?


joinedDF.join(userEnrolmentDataDF, joinedDF.col("user_id") === userEnrolmentDataDF.col("userid") && joinedDF.col("course_id") === userEnrolmentDataDF.col("courseid"), "inner")
.select(userEnrolmentDataDF.col("*"), joinedDF.col("question"), col("content_id"), col("attempt_id"), col("last_attempted_on"))
.withColumn("questiondata",explode_outer(col("question")))
.withColumn("questionid" , col("questiondata.id"))
.withColumn("questiontype", col("questiondata.type"))
.withColumn("questiontitle", col("questiondata.title"))
.withColumn("questiondescription", col("questiondata.description"))
.withColumn("questionduration", round(col("questiondata.duration")))
.withColumn("questionscore", col("questiondata.score"))
.withColumn("questionmaxscore", col("questiondata.max_score"))
.withColumn("questionresponse", UDFUtils.toJSON(col("questiondata.resvalues")))
.withColumn("questionoption", UDFUtils.toJSON(col("questiondata.params")))
.drop("question", "questiondata", "question_data")
}

def getAssessmentBlobDF(batchid: String, config: JobConfig)(implicit spark: SparkSession, fc: FrameworkContext): DataFrame = {
val azureFetcherConfig = config.modelParams.get("assessmentFetcherConfig").asInstanceOf[Map[String, AnyRef]]

val store = azureFetcherConfig("store").asInstanceOf[String]
val format:String = azureFetcherConfig.getOrElse("format", "csv").asInstanceOf[String]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the multiple archived data has created and data has been duplicated for a batch then how are we handling it here? If not could you please handle this scenario?

val filePath = azureFetcherConfig.getOrElse("filePath", "archival-data/").asInstanceOf[String]
val container = azureFetcherConfig.getOrElse("container", "reports").asInstanceOf[String]

val assessAggData = ExhaustUtil.getAssessmentBlobData(store, filePath, container, Option(batchid), Option(format))

assessAggData.distinct()
.withColumn("question", UDFUtils.convertStringToList(col("question")))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.sunbird.analytics.exhaust.util


import org.apache.spark.sql.{DataFrame, SparkSession}
import org.ekstep.analytics.framework.FrameworkContext
import org.ekstep.analytics.framework.conf.AppConf
import org.ekstep.analytics.framework.util.JobLogger

object ExhaustUtil {

def getAssessmentBlobData(store: String, filePath: String, bucket: String, batchId: Option[String], fileFormat: Option[String])(implicit spark: SparkSession, fc: FrameworkContext): DataFrame = {
val format = fileFormat.getOrElse("csv")
val batchid = batchId.getOrElse("")

val url = store match {
case "local" =>
filePath + s"${batchid}-*.${format}"
// $COVERAGE-OFF$ for azure testing
case "s3" | "azure" =>
val key = AppConf.getConfig("azure_storage_key")
val file = s"${filePath}${batchid}-*.${format}"
s"wasb://$bucket@$key.blob.core.windows.net/$file."
// $COVERAGE-ON$
}
Comment on lines +15 to +24
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move to a commonutil method


JobLogger.log(s"Fetching data from ${store} for batchid: " + batchid)(new String())

spark.read.format("csv")
.option("header","true")
.load(url)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
INSERT INTO sunbird_courses.course_batch (courseid, batchid, createdby, createddate, enddate, end_date, enrollmentenddate, name, startdate, start_date, status) VALUES ('do_1130928636168192001667', 'batch-001', 'manju', '2020-10-10', '2020-01-10', '2020-01-10', '2020-01-01', 'Basic Java', '2020-01-20', '2020-01-20', 1 );
INSERT INTO sunbird_courses.course_batch (courseid, batchid, createdby, createddate, enddate, end_date, enrollmentenddate, name, startdate, start_date, status) VALUES ('do_1130293726460805121168', 'batch-002', 'manju', '2020-11-11', '2020-02-10', '2020-02-10', '2020-02-10', 'Basic C++', '2020-02-20', '2020-02-20', 1 );
INSERT INTO sunbird_courses.course_batch (courseid, batchid, createdby, createddate, enddate, end_date, enrollmentenddate, name, startdate, start_date, status) VALUES ('do_1131975645014835201326', 'batch-003', 'manju', '2020-10-02', '2020-10-01', '2020-10-01', '2020-10-01', 'Basic C++', '2020-10-20', '2020-10-20', 1 );
INSERT INTO sunbird_courses.course_batch (courseid, batchid, createdby, createddate, enddate, end_date, enrollmentenddate, name, startdate, start_date, status) VALUES ('do_112835334818643968148', 'batch-004', 'manju', '2020-11-11', '2020-02-10', '2020-02-10', '2020-02-10', 'Basic C++', '2020-02-20', '2020-10-20', 1 );
INSERT INTO sunbird_courses.course_batch (courseid, batchid, createdby, createddate, enddate, end_date, enrollmentenddate, name, startdate, start_date, status) VALUES ('do_112835334818643968148', 'batch-005', 'manju', '2020-11-15', '2020-02-12', '2020-02-10', '2020-02-10', 'Basic C++', '2020-02-20', '2020-10-20', 2 );
INSERT INTO sunbird_courses.course_batch (courseid, batchid, createdby, createddate, enddate, end_date, enrollmentenddate, name, startdate, start_date, status) VALUES ('do_1130928636168192001667', 'batch-006', 'manju', '2020-11-15', '2020-02-12', '2020-02-10', '2020-02-10', 'Basic C++', '2020-02-20', '2020-10-20', 1 );

INSERT INTO sunbird_courses.user_enrolments (userid, courseid, batchid, active, completionpercentage, progress, status, enrolleddate, enrolled_date) VALUES ('user-001', 'do_1130928636168192001667', 'batch-001', True, 10, 0, 1, '2019-11-13 05:41:50:382+0000', '2019-11-16 05:41:50');
INSERT INTO sunbird_courses.user_enrolments (userid, courseid, batchid, active, completionpercentage, progress, status, enrolled_date) VALUES ('user-002', 'do_1130928636168192001667', 'batch-001', True, 15, 1, 1, '2019-11-15 05:41:50');
INSERT INTO sunbird_courses.user_enrolments (userid, courseid, batchid, active, completionpercentage, progress, status, enrolleddate) VALUES ('user-003', 'do_1130928636168192001667', 'batch-001', True, 20, 1, 1, '2019-11-15 05:41:50:382+0000');
INSERT INTO sunbird_courses.user_enrolments (userid, courseid, batchid, active, completionpercentage, progress, status, enrolleddate) VALUES ('user-004', 'do_1130928636168192001667', 'batch-001', True, 10, 0, 1, '2019-11-15 05:41:50:382+0000');
INSERT INTO sunbird_courses.user_enrolments (userid, courseid, batchid, active, completionpercentage, progress, status, enrolleddate) VALUES ('user-005', 'do_1130292569979781121111', 'batch-002', True, 10, 0, 1, '2019-11-15 05:41:50:382+0000');
INSERT INTO sunbird_courses.user_enrolments (userid, courseid, batchid, active, completionpercentage, progress, status, enrolleddate) VALUES ('user-006', 'do_1130292569979781121111', 'batch-002', True, 10, 1, 1, '2019-11-15 05:41:50:382+0000');
INSERT INTO sunbird_courses.user_enrolments (userid, courseid, batchid, active, completionpercentage, progress, status, enrolleddate) VALUES ('user-007', 'do_1130292569979781121111', 'batch-002', True, 10, 0, 1, '2019-11-15 05:41:50:382+0000');
INSERT INTO sunbird_courses.user_enrolments (userid, courseid, batchid, active, completionpercentage, progress, status, enrolleddate) VALUES ('user-008', 'do_1131975645014835201326', 'batch-003', True, 20, 1, 1, '2019-11-15 05:41:50:382+0000');
INSERT INTO sunbird_courses.user_enrolments (userid, courseid, batchid, active, completionpercentage, progress, status, enrolleddate) VALUES ('user-009', 'do_1131975645014835201326', 'batch-003', True, 20, 1, 1, '2019-11-15 05:41:50:382+0000');
INSERT INTO sunbird_courses.user_enrolments (userid, courseid, batchid, active, completionpercentage, progress, status, enrolleddate) VALUES ('user-010', 'do_1131975645014835201326', 'batch-003', True, 10, 1, 1, '2019-11-15 05:41:50:382+0000');
INSERT INTO sunbird_courses.user_enrolments (userid, courseid, batchid, active, completionpercentage, progress, status, enrolleddate) VALUES ('user-011', 'do_1130293726460805121168', 'batch-002', True, 10, 1, 1, '2019-11-15 05:41:50:382+0000');
INSERT INTO sunbird_courses.user_enrolments (userid, courseid, batchid, active, completionpercentage, progress, status, enrolleddate) VALUES ('user-011', 'do_1130293726460805121168', 'batch-002', True, 10, 1, 1, '2019-11-15 05:41:50:382+0000');
INSERT INTO sunbird_courses.user_enrolments (userid, courseid, batchid, active, completionpercentage, progress, status, enrolleddate) VALUES ('user-014', 'do_112835334818643968148', 'batch-004', True, 10, 1, 1, '2019-11-15 05:41:50:382+0000');
INSERT INTO sunbird_courses.user_enrolments (userid, courseid, batchid, active, completionpercentage, progress, status, enrolleddate, enrolled_date) VALUES ('user-015', 'do_1130928636168192001667', 'batch-006', True, 10, 0, 1, '2019-11-13 05:41:50:382+0000', '2019-11-16 05:41:50');
INSERT INTO sunbird_courses.user_enrolments (userid, courseid, batchid, active, completionpercentage, progress, status, enrolleddate, enrolled_date) VALUES ('user-016', 'do_1130928636168192001667', 'batch-006', True, 10, 0, 1, '2019-11-13 05:41:50:382+0000', '2019-11-16 05:41:50');
INSERT INTO sunbird_courses.user_enrolments (userid, courseid, batchid, active, completionpercentage, progress, status, enrolleddate, enrolled_date) VALUES ('user-017', 'do_1130928636168192001667', 'batch-006', True, 10, 0, 1, '2019-11-13 05:41:50:382+0000', '2019-11-16 05:41:50');

INSERT INTO sunbird_courses.assessment_aggregator (course_id, batch_id, user_id, content_id, attempt_id, grand_total, total_max_score, total_score, question, updated_on) VALUES ('do_1130928636168192001667', 'batch-001', 'user-001', 'do_1128870328040161281204', 'attempt-001', '20', 20, 20, [{id: 'do_213019475454476288155', assess_ts: '2020-06-18T18:15:56.490+0000', max_score: 1, score: 1, type: 'mcq', title: 'testQuestiontextandformula', resvalues: [{'1': '{"text":"A=\\\\pi r^2\n"}'}], params: [{'1': '{"text":"A=\\\\pi r^2\n"}'}, {'2': '{"text":"no\n"}'}, {'answer': '{"correct":["1"]}'}], description: 'testQuestiontextandformula', duration: 1.0}, {id: 'do_213019970118279168165', assess_ts: '2020-06-18T18:15:56.490+0000', max_score: 1, score: 1, type: 'mcq', title: 'test with formula', resvalues: [{'1': '{"text":"1\nA=\\\\pi r^2A=\\\\pi r^2\n"}'}], params: [{'1': '{"text":"1\nA=\\\\pi r^2A=\\\\pi r^2\n"}'}, {'2': '{"text":"2\n"}'}, {'answer': '{"correct":["1"]}'}], description: '', duration: 1.0}, {id: 'do_213019972814823424168', assess_ts: '2020-06-18T18:15:56.490+0000', max_score: 1, score: 0.33, type: 'mtf', title: 'Copy of - Match the following:\n\nx=\\frac{-b\\pm\\sqrt{b^2-4ac}}{2a}\nArrange the following equations in correct order.\n', resvalues: [{'lhs': '[{"1":"{\"text\":\"A=\\\\\\\\pi r^2\\n\"}"},{"2":"{\"text\":\"\\\\\\\\frac{4}{3}\\\\\\\\pi r^3\\n\"}"},{"3":"{\"text\":\"a^n\\\\\\\\times a^m=a^{n+m}\\n\"}"}]'}, {'rhs': '[{"1":"{\"text\":\"Volume of sphere\\n\"}"},{"2":"{\"text\":\"Area of Circle\\n\"}"},{"3":"{\"text\":\"Product Rule\\n\"}"}]'}], params: [{'lhs': '[{"1":"{\"text\":\"A=\\\\\\\\pi r^2\\n\"}"},{"2":"{\"text\":\"\\\\\\\\frac{4}{3}\\\\\\\\pi r^3\\n\"}"},{"3":"{\"text\":\"a^n\\\\\\\\times a^m=a^{n+m}\\n\"}"}]'}, {'rhs': '[{"1":"{\"text\":\"Volume of sphere\\n\"}"},{"2":"{\"text\":\"Product Rule\\n\"}"},{"3":"{\"text\":\"Area of Circle\\n\"}"}]'}, {'answer': '{"lhs":["1","2","3"],"rhs":["3","1","2"]}'}], description: '', duration: 2.0}, {id: 'do_2130256513760624641171', assess_ts: '2020-06-18T18:15:56.490+0000', max_score: 10, score: 10, type: 'mcq', title: '2 +2 is..? mark ia 10\n', resvalues: [{'1': '{"text":"4\n"}'}], params: [{'1': '{"text":"4\n"}'}, {'2': '{"text":"3\n"}'}, {'3': '{"text":"8\n"}'}, {'4': '{"text":"10\n"}'}, {'answer': '{"correct":["1"]}'}], description: '', duration: 12.0}], toTimeStamp(toDate(now())));
INSERT INTO sunbird_courses.assessment_aggregator (course_id, batch_id, user_id, content_id, attempt_id, grand_total, total_max_score, total_score, updated_on) VALUES ('do_1130928636168192001667', 'batch-001', 'user-002', 'do_1128870328040161281204', 'attempt-001', '10', 10, 10, toTimeStamp(toDate(now())));
INSERT INTO sunbird_courses.assessment_aggregator (course_id, batch_id, user_id, content_id, attempt_id, grand_total, total_max_score, total_score, updated_on) VALUES ('do_1130928636168192001667', 'batch-001', 'user-003', 'do_112876961957437440179', 'attempt-001', '10', 10, 10, toTimeStamp(toDate(now())));
INSERT INTO sunbird_courses.assessment_aggregator (course_id, batch_id, user_id, content_id, attempt_id, grand_total, total_max_score, total_score, updated_on) VALUES ('do_112835334818643968148', 'batch-004', 'user-014', 'do_11307593493010022418', 'attempat-001', '15', 15, 15, toTimeStamp(toDate(now())));

INSERT INTO sunbird.system_settings (id, field, value) VALUES ('custodianOrgId', 'custodianOrgId', '0130107621805015045');
Loading