Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ jobs:
cd data-products && mvn scoverage:report
- save_cache:
key: dp-dependency-cache-{{ checksum "data-products/pom.xml" }}
paths: ~/.m2
paths: ~/.m2
- run:
name: sonar
command: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import org.joda.time.{DateTime, DateTimeZone}
import org.sunbird.analytics.exhaust.collection.UDFUtils
import org.sunbird.analytics.util.{CourseUtils, UserData}


import scala.collection.immutable.List

object CollectionSummaryJobV2 extends IJob with BaseReportsJob {
Expand Down Expand Up @@ -95,6 +96,11 @@ object CollectionSummaryJobV2 extends IJob with BaseReportsJob {
JobLogger.log(s"Total distinct Course Id's ${courseIds.size}", None, INFO)
val courseInfo = CourseUtils.getCourseInfo(courseIds, None, config.modelParams.get.getOrElse("batchSize", 50).asInstanceOf[Int], Option(config.modelParams.get.getOrElse("contentStatus", CourseUtils.defaultContentStatus.toList).asInstanceOf[List[String]].toArray), Option(config.modelParams.get.getOrElse("contentFields", CourseUtils.defaultContentFields.toList).asInstanceOf[List[String]].toArray)).toDF(contentFields: _*)
JobLogger.log(s"Total fetched records from content search ${courseInfo.count()}", None, INFO)
println("processBathc")
processBatches.show(100, false)
println("courseInfo")
courseInfo.show(100, false)

processBatches.join(courseInfo, processBatches.col("courseid") === courseInfo.col("identifier"), "inner")
.withColumn("collectionname", col("name"))
.withColumnRenamed("status", "contentstatus")
Expand All @@ -112,7 +118,9 @@ object CollectionSummaryJobV2 extends IJob with BaseReportsJob {
.join(userCachedDF, Seq("userid"), "inner").persist()
val searchFilter = config.modelParams.get.get("searchFilter").asInstanceOf[Option[Map[String, AnyRef]]]
val reportDF: DataFrame = if (null == searchFilter || searchFilter.isEmpty) getContentMetaData(processBatches, spark) else processBatches
println("reportDF" + reportDF.show(100, false))
val processedBatches = computeValues(reportDF)
println("processedBatches" + processedBatches.show(100, false))
processedBatches.select(filterColumns.head, filterColumns.tail: _*)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
package org.sunbird.analytics.job.report

import java.io.File

import org.apache.spark.sql.functions.{udf, _}
import org.apache.spark.sql.types.{ArrayType, MapType, StringType, StructType}
import kong.unirest.Unirest
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{MapType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, Encoders, SparkSession}
import org.ekstep.analytics.framework.conf.AppConf
import org.ekstep.analytics.framework.util.{HadoopFileUtil, JSONUtils}
import org.ekstep.analytics.framework.util.{HadoopFileUtil, JSONUtils, RestUtil}
import org.ekstep.analytics.framework.{FrameworkContext, JobConfig}
import org.mockserver.integration.ClientAndServer
import org.scalamock.scalatest.MockFactory
import org.sunbird.analytics.util.{CourseUtils, UserData}
import org.sunbird.analytics.util.{CourseResponse, CourseResult, CourseUtils, UserData}

import java.io.File
import scala.collection.mutable


class TestCollectionSummaryJobV2 extends BaseReportSpec with MockFactory {
class TestCollectionSummaryJobV2 extends BaseReportSpec with MockFactory {


var spark: SparkSession = _
Expand All @@ -31,6 +31,7 @@ class TestCollectionSummaryJobV2 extends BaseReportSpec with MockFactory {
override def beforeAll(): Unit = {
super.beforeAll()
spark = getSparkSession();
mockAPIData()
courseBatchDF = spark
.read
.format("com.databricks.spark.csv")
Expand All @@ -49,9 +50,11 @@ class TestCollectionSummaryJobV2 extends BaseReportSpec with MockFactory {
.cache()

}
private val mockServer = ClientAndServer.startClientAndServer(3000)

override def afterAll() : Unit = {
super.afterAll()
//mockServer.stop()
val objectKey = AppConf.getConfig("course.metrics.cloud.objectKey")
new HadoopFileUtil().delete(spark.sparkContext.hadoopConfiguration, objectKey + "collection-summary-reports-v2/")
}
Expand All @@ -62,9 +65,29 @@ class TestCollectionSummaryJobV2 extends BaseReportSpec with MockFactory {
else null
})

def mockAPIData(): Unit ={
import org.mockserver.model.HttpRequest.request
import org.mockserver.model.HttpResponse.response
val query = "{\"request\":{\"filters\":{\"status\":[\"Live\"],\"contentType\":\"Course\"},\"fields\":[\"identifier\",\"name\",\"organisation\",\"channel\",\"status\",\"keywords\",\"createdFor\",\"medium\",\"subject\"],\"limit\":10000}}"
val resultJson = "{\"count\": 1, \"content\": [{\"framework\":\"NCRET\",\"identifier\":\"do_1130314965721088001129\",\"name\":\"Maths\",\"channel\":\"in.ekstep\",\"batches\":[{\"startDate\":\"2022-01-01\",\"endDate\":\"2022-01-01\",\"batchId\":\"b-001\"}],\"organisation\":[],\"status\":\"Live\",\"keywords\":[\"maths\"],\"createdFor\":[],\"medium\":[],\"subject\":[]}]}"
val courseResult = JSONUtils.deserialize[CourseResult](resultJson)
val courseResponse = CourseResponse(courseResult, "200")
mockServer.when(request().withMethod("POST").withPath("/v1/search")
.withBody(query))
.respond(response().withStatusCode(200).withBody(JSONUtils.serialize(courseResponse)))
}





it should "generate the report for all the batches" in {
initializeDefaultMockData()
implicit val mockFc: FrameworkContext = mock[FrameworkContext]
mockAPIData()



val strConfig = """{"search":{"type":"none"},"model":"org.sunbird.analytics.job.report.CollectionSummaryJobV2","modelParams":{"searchFilter":{"request":{"filters":{"status":["Live"],"contentType":"Course"},"fields":["identifier","name","organisation","channel","status","keywords","createdFor","medium", "subject"],"limit":10000}},"store":"azure","sparkElasticsearchConnectionHost":"{{ sunbird_es_host }}","sparkRedisConnectionHost":"{{ metadata2_redis_host }}","sparkUserDbRedisIndex":"12","sparkCassandraConnectionHost":"{{ core_cassandra_host }}","fromDate":"$(date --date yesterday '+%Y-%m-%d')","toDate":"$(date --date yesterday '+%Y-%m-%d')","specPath":"src/test/resources/ingestion-spec/summary-ingestion-spec.json"},"parallelization":8,"appName":"Collection Summary Report"}""".stripMargin
implicit val jobConfig: JobConfig = JSONUtils.deserialize[JobConfig](strConfig)
val reportData = CollectionSummaryJobV2.prepareReport(spark, reporterMock.fetchData)
Expand Down