Skip to content
This repository has been archived by the owner on Jan 2, 2025. It is now read-only.

Sonarqube #105

Draft
wants to merge 25 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions deployment/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
FROM amazonlinux:latest

# Tools
RUN yum install -y shadow-utils curl git openssl11-devel tar gzip gcc make bzip2-devel bzip2-libs ncurses-devel ncurses-lib libffi libffi-devel readline-devel sqlite-devel pyliblzma xz xz-devel xz-libs

# Worker
RUN adduser --user-group worker
ENV PATH="/home/worker/.local/bin:${PATH}"

# Copy files
RUN mkdir /app
RUN chown -R worker:worker /app
USER worker
WORKDIR /app
COPY --chown=worker:worker megalista_dataflow megalista_dataflow

# Python - virtual env
RUN curl https://pyenv.run | bash
RUN /home/worker/.pyenv/bin/pyenv install 3.9.9
RUN /home/worker/.pyenv/bin/pyenv virtualenv 3.9.9 virtual_env
RUN mv /home/worker/.pyenv/versions/virtual_env /app
ENV VIRTUAL_ENV=/app/virtual_env
ENV PATH="$VIRTUAL_ENV/bin:$PATH"
RUN env
RUN python --version
RUN python -m ensurepip --upgrade
RUN pip install --upgrade pip setuptools wheel

# Install requirements
RUN pip install --no-warn-script-location -r megalista_dataflow/requirements.txt

COPY --chown=worker:worker deployment/docker/entrypoint.sh .
COPY --chown=worker:worker deployment/docker/service-account-file.json ./megalista_dataflow/

RUN ["chmod", "+x", "entrypoint.sh"]
ENTRYPOINT [ "/app/entrypoint.sh" ]
21 changes: 21 additions & 0 deletions deployment/docker/build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/bin/bash

if [ $# != 1 ]; then
echo "Usage: $0 container_tag"
exit 1
fi

echo
echo "${bold}┌──────────────────────────────────┐${reset}"
echo "${bold}│ Megalista Deployment │${reset}"
echo "${bold}└──────────────────────────────────┘${reset}"
echo
echo "${bold}${text_red}This is not an officially supported Google product.${reset}"
echo "${bold}Megalista docker image will be built with the following tag: ${text_green}$1${bold}${reset}"
echo "Update commit info inside code"
sed -i "s/MEGALISTA_VERSION\s*=.*/MEGALISTA_VERSION = '$(git rev-parse HEAD)'/" ../../megalista_dataflow/config/version.py
echo "Build container"
docker build ../../ -t $1 -f Dockerfile
echo "Cleanup"
sed -i "s/MEGALISTA_VERSION\s*=.*/MEGALISTA_VERSION = '\[megalista_version\]'/" ../../megalista_dataflow/config/version.py
echo "${bold}${text_green}Finished. Image build: $1${reset}"
28 changes: 28 additions & 0 deletions deployment/docker/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/bin/bash
echo "Environment variables handling"

ENV_VARS=`env | grep ^MEGALISTA_.*`
params=""

for var in $ENV_VARS;
do
IFS='='
read key value <<< "$var"
key=`echo "${key}" | tr [:upper:] [:lower:]`
if [[ $key == megalista_* ]]
then
params="${params} --${key:10} ${value}"
fi
IFS=' '
done

export GOOGLE_APPLICATION_CREDENTIALS=/app/megalista_dataflow/service-account-file.json

echo "Activating virual environment (python)"
source virtual_env/bin/activate
echo "Running Megalista"
python megalista_dataflow/main.py \
--runner DirectRunner \
--direct_num_workers 0 \
--direct_running_mode multi_threading \
${params}
13 changes: 13 additions & 0 deletions deployment/docker/service-account-file.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"type": "service_account",
"project_id": "",
"private_key_id": "",
"private_key": "",
"client_email": "",
"client_id": "",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": ""
}

136 changes: 123 additions & 13 deletions megalista_dataflow/config/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@
# limitations under the License.

import logging
import sys
from error.logging_handler import LoggingHandler
from optparse import Option
import sys, io, os, traceback
from types import FrameType
from typing import Optional, Tuple, List, Any

from models.execution import Execution

class LoggingConfig:
@staticmethod
def config_logging(show_lines: bool = False):
# If there is a FileHandler, the execution is running on Dataflow
# In this scenario, we shouldn't change the formatter
logging_handler = LoggingHandler()
logging.getLogger().addHandler(logging_handler)
file_handler = LoggingConfig.get_file_handler()
if file_handler is None:
log_format = "[%(levelname)s] %(name)s: %(message)s"
Expand All @@ -36,9 +38,7 @@ def config_logging(show_lines: bool = False):
stream_handler = logging.StreamHandler(stream=sys.stderr)
logging.getLogger().addHandler(stream_handler)
stream_handler.setFormatter(formatter)

logging_handler.setFormatter(formatter)


logging.getLogger().setLevel(logging.ERROR)
logging.getLogger("megalista").setLevel(logging.INFO)

Expand All @@ -50,11 +50,6 @@ def get_stream_handler():
def get_file_handler():
return LoggingConfig.get_handler(logging.FileHandler)


@staticmethod
def get_logging_handler():
return LoggingConfig.get_handler(LoggingHandler)

@staticmethod
def get_handler(type: type):
result_handler = None
Expand All @@ -63,4 +58,119 @@ def get_handler(type: type):
result_handler = handler
break

return result_handler
return result_handler

class _LogWrapper:
def __init__(self, name: Optional[str]):
self._name = str(name)
self._logger = logging.getLogger(name)

def debug(self, msg: str, *args, **kwargs):
self.log(msg, logging.DEBUG, *args, **kwargs)

def info(self, msg: str, *args, **kwargs):
self.log(msg, logging.INFO, *args, **kwargs)

def warning(self, msg: str, *args, **kwargs):
self.log(msg, logging.WARNING, *args, **kwargs)

def error(self, msg: str, *args, **kwargs):
self.log(msg, logging.ERROR, *args, **kwargs)

def critical(self, msg: str, *args, **kwargs):
self.log(msg, logging.CRITICAL, *args, **kwargs)

def exception(self, msg: str, *args, **kwargs):
self.log(msg, logging.CRITICAL, *args, **kwargs)

def log(self, msg: str, level: int, *args, **kwargs):
stacklevel = self._get_stacklevel(**kwargs)
msg = self._get_msg_execution(msg, **kwargs)
msg = self._get_msg_context(msg, **kwargs)
if level >= logging.ERROR:
_add_error(self._name, msg, stacklevel, level, args)
keys_to_remove = ['execution', 'context']
for key in keys_to_remove:
if key in kwargs:
del kwargs[key]
self._logger.log(level, msg, *args, **self._change_stacklevel(**kwargs))

def _change_stacklevel(self, **kwargs):
stacklevel = self._get_stacklevel(**kwargs)
return dict(kwargs, stacklevel = stacklevel)

def _get_stacklevel(self, **kwargs):
dict_kwargs = dict(kwargs)
stacklevel = 3
if 'stacklevel' in dict_kwargs:
stacklevel = 2 + dict_kwargs['stacklevel']
return stacklevel

def _get_msg_context(self, msg: str, **kwargs):
if 'context' in kwargs:
context = kwargs['context']
msg = f'[Context: {context}] {msg}'
return msg

def _get_msg_execution(self, msg: str, **kwargs):
if 'execution' in kwargs:
execution: Execution = kwargs['execution']
msg = f'[Execution: {execution.source.source_name} -> {execution.destination.destination_name}] {msg}'
return msg


def get_logger(name: Optional[str] = None):
return _LogWrapper(name)

_error_list: List[logging.LogRecord] = []

def _add_error(name: str, msg: str, stacklevel: int, level: int, args):
fn, lno, func, sinfo = _get_stack_trace(stacklevel)
_error_list.append(logging.LogRecord(name, level, fn, lno, msg, args, None, func, sinfo))

def _get_stack_trace(stacklevel: int, stack_info: bool = True):
# from python logging module
f: Optional[FrameType] = sys._getframe(3)
if f is not None:
f = f.f_back
orig_f = f
while f and stacklevel > 1:
f = f.f_back
stacklevel -= 1
if not f:
f = orig_f
rv: Tuple[str, int, str, Optional[str]]= ("(unknown file)", 0, "(unknown function)", None)
if f is not None and hasattr(f, "f_code"):
co = f.f_code
sinfo = None
if stack_info:
sio = io.StringIO()
sio.write('Stack (most recent call last):\n')
traceback.print_stack(f, file=sio)
sinfo = sio.getvalue()
if sinfo[-1] == '\n':
sinfo = sinfo[:-1]
sio.close()
rv = (co.co_filename, f.f_lineno, co.co_name, sinfo)
return rv

def has_errors() -> bool:
return len(_error_list) > 0

def error_list() -> List[logging.LogRecord]:
return _error_list

def get_formatted_error_list() -> Optional[str]:
records = _error_list
if records is not None and len(records) > 0:
message = ''
for i in range(len(records)):
rec = records[i]
message += f'{i+1}. {rec.msg}\n... in {rec.pathname}:{rec.lineno}\n'
return message
else:
return None

def null_filter(el: Any) -> Any:
get_logger('megalista.LOG').info(f'Logging: {el}')
return el
10 changes: 9 additions & 1 deletion megalista_dataflow/data_sources/base_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,12 @@ def retrieve_data(self, executions: ExecutionsGroupedBySource) -> List[DataRowsG
raise NotImplementedError("Source Type not implemented. Please check your configuration (sheet / json / firestore).")

def write_transactional_info(self, rows, execution):
raise NotImplementedError("Source Type not implemented. Please check your configuration (sheet / json / firestore).")
raise NotImplementedError("Source Type not implemented. Please check your configuration (sheet / json / firestore).")

@staticmethod
def _convert_row_to_dict(row):
result = {}
for key, value in row.items():
result[key] = value
return result

Loading