diff --git a/ansible/roles/druid-ingestion/defaults/main.yml b/ansible/roles/druid-ingestion/defaults/main.yml index 2261c4389d..95f096ae59 100644 --- a/ansible/roles/druid-ingestion/defaults/main.yml +++ b/ansible/roles/druid-ingestion/defaults/main.yml @@ -38,4 +38,5 @@ rollup_ml_project_taskcount: 1 rollup_ml_observation_status_taskcount: 1 rollup_ml_project_status_taskcount: 1 rollup_ml_survey_status_taskcount: 1 +ml_user_program_task_count: 1 diff --git a/ansible/roles/druid-ingestion/templates/rollup_ml_user_program b/ansible/roles/druid-ingestion/templates/rollup_ml_user_program new file mode 100644 index 0000000000..01f71f4f33 --- /dev/null +++ b/ansible/roles/druid-ingestion/templates/rollup_ml_user_program @@ -0,0 +1,187 @@ + { + "type": "kafka", + "dataSchema": { + "dataSource": "ml-user-program", + "parser": { + "type": "string", + "parseSpec": { + "format": "json", + "flattenSpec": { + "useFieldDiscovery": true, + "fields": [ + { + "type": "jq", + "name": "user_id", + "expr": ".userId" + }, + { + "type": "jq", + "name": "program_id", + "expr": ".programId" + }, + { + "type": "jq", + "name": "program_externalId", + "expr": ".programExternalId" + }, + { + "type": "jq", + "name": "program_name", + "expr": ".programName" + }, + { + "type": "jq", + "name": "state_externalId", + "expr": "if [.userProfile ? |.userLocations [] ? | select(.type | contains(\"state\"))] | length > 0 then .userProfile ? |.userLocations [] ? | select(.type | contains(\"state\")) | .id else null end" + }, + { + "type": "jq", + "name": "state_name", + "expr": "if [.userProfile ? |.userLocations [] ? | select(.type | contains(\"state\"))] | length > 0 then .userProfile ? |.userLocations [] ? | select(.type | contains(\"state\")) | .name else null end" + }, + { + "type": "jq", + "name": "district_externalId", + "expr": "if [.userProfile ? |.userLocations [] ? | select(.type | contains(\"district\"))] | length > 0 then .userProfile ? |.userLocations [] ? | select(.type | contains(\"district\")) | .id else null end" + }, + { + "type": "jq", + "name": "district_name", + "expr": "if [.userProfile ? |.userLocations [] ? | select(.type | contains(\"district\"))] | length > 0 then .userProfile ? |.userLocations [] ? | select(.type | contains(\"district\")) | .name else null end" + }, + { + "type": "jq", + "name": "block_externalId", + "expr": "if [.userProfile ? |.userLocations [] ? | select(.type | contains(\"block\"))] | length > 0 then .userProfile ? |.userLocations [] ? | select(.type | contains(\"block\")) | .id else null end" + }, + { + "type": "jq", + "name": "block_name", + "expr": "if [.userProfile ? |.userLocations [] ? | select(.type | contains(\"block\"))] | length > 0 then .userProfile ? |.userLocations [] ? | select(.type | contains(\"block\")) | .name else null end" + }, + { + "type": "jq", + "name": "cluster_externalId", + "expr": "if [.userProfile ? |.userLocations [] ? | select(.type | contains(\"cluster\"))] | length > 0 then .userProfile ? |.userLocations [] ? | select(.type | contains(\"cluster\")) | .id else null end" + }, + { + "type": "jq", + "name": "cluster_name", + "expr": "if [.userProfile ? |.userLocations [] ? | select(.type | contains(\"cluster\"))] | length > 0 then .userProfile ? |.userLocations [] ? | select(.type | contains(\"cluster\")) | .name else null end" + }, + { + "type": "jq", + "name": "organisation_id", + "expr": "if [.userProfile? | .organisations[]? | select (.isSchool == false)] | length > 0 then .userProfile? | .organisations[]? | select(.isSchool == false) | .organisationId else null end" + }, + { + "type": "jq", + "name": "organisation_name", + "expr": "if [.userProfile? | .organisations[]? | select (.isSchool == false)] | length > 0 then .userProfile? | .organisations[]? | select(.isSchool == false) | .orgName else null end" + } + ] + }, + "dimensionsSpec": { + "dimensions": [ + { + "type": "string", + "name": "program_id" + }, + { + "type": "string", + "name": "program_externalId" + }, + { + "type": "string", + "name": "program_name" + }, + { + "type": "string", + "name": "state_externalId" + }, + { + "type": "string", + "name": "state_name" + }, + { + "type": "string", + "name": "district_externalId" + }, + { + "type": "string", + "name": "district_name" + }, + { + "type": "string", + "name": "block_externalId" + }, + { + "type": "string", + "name": "block_name" + }, + { + "type": "string", + "name": "cluster_externalId" + }, + { + "type": "string", + "name": "cluster_name" + }, + { + "type": "string", + "name": "organisation_id" + }, + { + "type": "string", + "name": "organisation_name" + } + ], + "dimensionsExclusions": [] + }, + "timestampSpec": { + "column": "createdAt", + "format": "iso" + } + } + }, + "metricsSpec": [ + { + "type": "longSum", + "name": "sum_user", + "fieldName": "user_id" + }, + { + "type": "HLLSketchBuild", + "name": "unique_users", + "fieldName": "user_id" + }, + { + "type": "count", + "name": "count_user" + } + ], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "day", + "queryGranularity": "day", + "rollup": false + } + }, + "ioConfig": { + "topic": "{{Env}}.programuser.info", + "consumerProperties": { + "bootstrap.servers": "{{kafka_brokers}}" + }, + "taskCount": "{{ml_user_program_task_count}}", + "replicas": 1, + "taskDuration": "PT14400S", + "useEarliestOffset": false, + "completionTimeout": "PT1800S" + }, + "tuningConfig": { + "type": "kafka", + "reportParseExceptions": false, + "maxRowsPerSegment": 5000000 + } + } + \ No newline at end of file