Skip to content

Improvements #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
.idea
*.pyc
.pytest_cache
.python-version
.vscode/
__pycache__
db.sqlite3
.DS_Store
45 changes: 45 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.3.0
hooks:
- id: check-merge-conflict
- id: check-added-large-files
- id: check-ast
- id: check-symlinks
- id: check-yaml
args: ['--unsafe']
- id: trailing-whitespace
- id: check-json
- id: debug-statements
- id: detect-aws-credentials
args:
- --allow-missing-credentials
- id: pretty-format-json
args:
- --autofix
exclude: Pipfile.lock
- repo: https://github.com/PyCQA/isort
rev: 5.12.0
hooks:
- id: isort
args: ["--profile", "black"]
- repo: https://github.com/pycqa/flake8
rev: 5.0.4
hooks:
- id: flake8
additional_dependencies: [ flake8-print ]
files: '\.py$'
args:
- --select=F401,F403,F406,F821,T001,T003
- repo: https://github.com/PyCQA/autoflake
rev: v1.7.7
hooks:
- id: autoflake
files: '\.py$'
exclude: '^\..*'
args: ["--in-place", "--remove-all-unused-imports"]
- repo: https://github.com/psf/black
rev: 22.10.0
hooks:
- id: black
args: ["--target-version", "py39"]
18 changes: 18 additions & 0 deletions Pipfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"

[packages]
pytz = "==2023.3"
pytest = "==7.4.0"
sqlalchemy = "==2.0.18"
pre-commit = "==3.2.2"
freezegun = "==1.2.2"
pandas = "==2.0.3"
pymongo = "==4.4.0"

[dev-packages]

[requires]
python_version = "3.11"
415 changes: 415 additions & 0 deletions Pipfile.lock

Large diffs are not rendered by default.

Empty file added __init__.py
Empty file.
Empty file added dao/__init__.py
Empty file.
26 changes: 26 additions & 0 deletions dao/_base_mongo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from pymongo import MongoClient

from sample_code.settings import (
ARC_MONGO_AUTHMECHANISM,
ARC_MONGO_AUTHSOURCE,
ARC_MONGO_READ_PREFERENCE,
)


class BaseMongoDAO:
def __init__(
self,
mongoServers: str,
mongoReplicaset: str,
username: str,
password: str,
database: str,
) -> None:
mongo_uri = f"mongodb://{username}:{password}@{mongoServers}"
self.client = MongoClient(
mongo_uri,
replicaSet=mongoReplicaset,
authSource=ARC_MONGO_AUTHSOURCE,
readPreference=ARC_MONGO_READ_PREFERENCE,
authMechanism=ARC_MONGO_AUTHMECHANISM,
)[database]
102 changes: 102 additions & 0 deletions dao/audit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import logging
from datetime import datetime

from pymongo.collection import Collection
from pymongo.errors import PyMongoError

from sample_code.dao._base_mongo import BaseMongoDAO
from sample_code.settings import (
AUDIT_COLLECTION,
AUDIT_DATABASE,
AUDIT_PASSWORD,
AUDIT_USERNAME,
)

logger = logging.getLogger(__name__)


class AuditDAO(BaseMongoDAO):
def __init__(
self,
mongoServers: str,
mongoReplicaset: str,
) -> None:
super().__init__(
username=AUDIT_USERNAME,
password=AUDIT_PASSWORD,
database=AUDIT_DATABASE,
mongoServers=mongoServers,
mongoReplicaset=mongoReplicaset,
)

def run_aggregation_query(collection: Collection, query: str, **kwargs):
try:
return collection.aggregate(query, **kwargs)
except PyMongoError as exc:
logger.error(str(exc))

def get_subscribers(self, auditRangeStart: datetime, auditRangeEnd: datetime):
logger.info(
f"Get subscriber usage between {auditRangeStart.isoformat()} and {auditRangeEnd.isoformat()}"
)
auditCollection = self.client[AUDIT_COLLECTION]
auditQuery = [
{
"$match": {
"$and": [
{
"details": {
"$elemMatch": {
"state": "ADD",
"data.payload.payloads": {
"$elemMatch": {
"requestpayload.subscriptions": {
"$elemMatch": {
"offerName": "MYOFFERNAME"
}
}
}
},
}
}
},
{
"lastModifiedDate": {
"$gte": auditRangeStart,
"$lte": auditRangeEnd,
}
},
]
}
},
{"$unwind": {"path": "$details"}},
{
"$match": {
"details.state": "ADD",
"details.data.payload.payloads": {
"$elemMatch": {
"requestpayload.subscriptions": {
"$elemMatch": {"offerName": "MYOFFERNAME"}
}
}
},
}
},
{"$unwind": {"path": "$details.data.payload.payloads"}},
{
"$unwind": {
"path": "$details.data.payload.payloads.requestpayload.subscriptions"
}
},
{
"$project": {
"_id": 0.0,
"ban": 1.0,
"subscriberId": "$details.data.payload.subscriberId",
"effectiveDate": "$details.data.payload.payloads.requestpayload.subscriptions.effectiveDate",
"expiryDate": "$details.data.payload.payloads.requestpayload.subscriptions.expiryDate",
}
},
]

return self.run_aggregation_query(auditCollection, auditQuery, cursor={})
62 changes: 62 additions & 0 deletions dao/reporting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import logging

from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError

from sample_code.settings import (
REPORTING_AULDATALEAK_TABLENAME,
REPORTING_SQL_DATABASE,
REPORTING_SQL_PASSWORD,
REPORTING_SQL_PORT,
REPORTING_SQL_SERVER,
REPORTING_SQL_USERNAME,
)

logger = logging.getLogger(__name__)


class ReportDAO:
def __init__(self) -> None:
mysql_uri = f"mysql://{REPORTING_SQL_USERNAME}:{REPORTING_SQL_PASSWORD}@{REPORTING_SQL_SERVER}:{REPORTING_SQL_PORT}/{REPORTING_SQL_DATABASE}?charset=utf8"
self.client = create_engine(mysql_uri, pool_recycle=3600)

def run_query(self, query):
try:
self.client.execute(query)
return 0
except SQLAlchemyError as e:
error = str(e.__dict__["orig"])
logger.error(error)

def create_reporting_table(self) -> None:
logger.info("Initiate reporting table")
reportingTableCreateQuery = f"CREATE TABLE IF NOT EXISTS {REPORTING_AULDATALEAK_TABLENAME} ( \
`SUBSCRIBERID` VARCHAR(100), \
`MDN` VARCHAR(100), \
`BAN` VARCHAR(100), \
`USAGESTART` DATETIME, \
`USAGEEND` DATETIME, \
`TOTALMB` DECIMAL, \
`AUDITDATE` DATETIME \
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;"

reportingTableCreateIndex = f"CREATE INDEX idx_AUDITDATE \
ON {REPORTING_AULDATALEAK_TABLENAME} (AUDITDATE);"

self.run_query(reportingTableCreateQuery)
self.run_query(reportingTableCreateIndex)

@staticmethod
def process_data_for_insert(rows: list) -> str:
return ", ".join([f"({', '.join(map(str, r))})" for r in rows])

def insert_reporting_data(self, rows: list) -> None:
logger.info("Insert new data in reporting table")
usageReportingQuery = f"INSERT INTO {REPORTING_AULDATALEAK_TABLENAME} (SUBSCRIBERID, MDN, BAN, USAGESTART, USAGEEND, TOTALMB, AUDITDATE) VALUES "
data = self.process_data_for_insert(rows)
self.run_query(usageReportingQuery + data)

def clean_reporting_data(self) -> None:
logger.info("Clean reporting table")
reportingTableDeleteQuery = f"DELETE FROM {REPORTING_AULDATALEAK_TABLENAME} WHERE AUDITDATE < DATE_SUB(NOW(), INTERVAL 1 MONTH)"
self.run_query(reportingTableDeleteQuery)
78 changes: 78 additions & 0 deletions dao/usage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import logging
from datetime import datetime

from pymongo import DESCENDING
from pymongo.collection import Collection
from pymongo.errors import PyMongoError

from sample_code.dao._base_mongo import BaseMongoDAO
from sample_code.settings import COLLECTION, DATABASE, PASSWORD, USERNAME

logger = logging.getLogger(__name__)


class UsageDAO(BaseMongoDAO):
def __init__(
self,
mongoServers: str,
mongoReplicaset: str,
) -> None:
super().__init__(
username=USERNAME,
password=PASSWORD,
database=DATABASE,
mongoServers=mongoServers,
mongoReplicaset=mongoReplicaset,
)

def run_query(
collection: Collection,
query: dict,
project: dict = None,
sort: bool = True,
sort_field: str = "eventTime",
limit_results: bool = False,
limit_count: int = 10,
) -> list:
try:
if project is not None:
db_query = collection.find(query, project)
else:
db_query = collection.find(query)

if sort:
db_query.sort(sort_field, DESCENDING)

if limit_results:
db_query.limit(limit_count)

return [doc for doc in db_query]
except PyMongoError as exc:
logger.error(str(exc))

def get_subscriber_usage(
self, subscriberId: str, effectiveDate: datetime, expiryDate: datetime
) -> list:
logger.info(
f"Get subscriber usage between {effectiveDate.isoformat()} and {expiryDate.isoformat()}"
)
collection = self.client[COLLECTION]
usageQuery = {
"$and": [
{"end": {"$gte": effectiveDate, "$lte": expiryDate}},
{"extSubId": eval(subscriberId)},
{"usageType": "OVER"},
{"$or": [{"bytesIn": {"$gt": 0}, "bytesOut": {"$gt": 0}}]},
]
}
usageProject = {
"_id": 0,
"extSubId": 1,
"MDN": 1,
"BAN": 1,
"start": 1,
"end": 1,
"bytesIn": 1,
"bytesOut": 1,
}
return self.run_query(collection, usageQuery, usageProject)
Loading