Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 44 additions & 36 deletions getgauge/impl_loader.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import glob
import importlib
import inspect
import json
Expand All @@ -8,6 +9,8 @@
import traceback
from contextlib import contextmanager
from os import path
from pathlib import Path
from typing import Optional

from getgauge import logger
from getgauge.registry import registry
Expand All @@ -18,28 +21,34 @@
env_dir = os.path.join(project_root, 'env', 'default')
requirements_file = os.path.join(project_root, 'requirements.txt')
sys.path.append(project_root)
temporary_sys_path = []

PLUGIN_JSON = 'python.json'
VERSION = 'version'
PYTHON_PROPERTIES = 'python.properties'
SKEL = 'skel'


def load_impls(step_impl_dirs=impl_dirs):
def load_impls(step_impl_dirs=impl_dirs, project_root=project_root):
""" project_root can be overwritten in tests! """

os.chdir(project_root)

for impl_dir in step_impl_dirs:
if not os.path.isdir(impl_dir):
logger.error('Cannot import step implementations. Error: {} does not exist.'.format(step_impl_dirs))

resolved_impl_dir = Path(impl_dir).resolve()
if not resolved_impl_dir.is_dir():
logger.error('Cannot import step implementations. Error: {} does not exist.'.format(impl_dir))
logger.error('Make sure `STEP_IMPL_DIR` env var is set to a valid directory path.')
return
base_dir = project_root if impl_dir.startswith(project_root) else os.path.dirname(impl_dir)
# Handle multi-level relative imports
for _ in range(impl_dir.count('..')):
base_dir = os.path.dirname(base_dir).replace("/", os.path.sep).replace("\\", os.path.sep)
# Add temporary sys path for relative imports that is not already added
if '..' in impl_dir and base_dir not in temporary_sys_path:
temporary_sys_path.append(base_dir)
_import_impl(base_dir, impl_dir)

base_dir = os.path.commonpath([project_root, f"{resolved_impl_dir}"])
logger.debug("Base directory '{}' of '{}'".format(base_dir, resolved_impl_dir))

temporary_sys_path = None
if project_root != base_dir:
temporary_sys_path = base_dir

_import_impl(base_dir, resolved_impl_dir, temporary_sys_path)


def copy_skel_files():
Expand All @@ -57,49 +66,48 @@ def copy_skel_files():
logger.fatal('Exception occurred while copying skel files.\n{}.'.format(traceback.format_exc()))


def _import_impl(base_dir, step_impl_dir):
for f in os.listdir(step_impl_dir):
file_path = os.path.join(step_impl_dir, f)
if f.endswith('.py'):
_import_file(base_dir, file_path)
elif path.isdir(file_path):
_import_impl(base_dir, file_path)
def _import_impl(base_dir: str, absolute_step_impl_dir: str, temporary_sys_path: Optional[str]):
for python_file in glob.glob(f"{absolute_step_impl_dir}/**/*.py", recursive=True):
relative_path = Path(python_file).relative_to(base_dir)
module_name = ".".join(relative_path.parts).replace(".py", "")
_import_file(module_name, python_file, temporary_sys_path)

@contextmanager
def use_temporary_sys_path():
def use_temporary_sys_path(temporary_sys_path: str):
original_sys_path = sys.path[:]
sys.path.extend(temporary_sys_path)
sys.path.append(temporary_sys_path)
try:
yield
finally:
sys.path = original_sys_path

def _import_file(base_dir, file_path):
rel_path = os.path.normpath(file_path.replace(base_dir + os.path.sep, ''))
def _import_file(module_name: str, file_path: str, temporary_sys_path: Optional[str]):
try:
module_name = os.path.splitext(rel_path.replace(os.path.sep, '.'))[0]
logger.debug('Import module {} with path {}'.format(module_name, file_path))

# Use temporary sys path for relative imports
if '..' in file_path:
with use_temporary_sys_path():
if temporary_sys_path is not None:
logger.debug('Import module {} using temporary sys path {}'.format(module_name, temporary_sys_path))
with use_temporary_sys_path(temporary_sys_path):
m = importlib.import_module(module_name)
else:
m = importlib.import_module(module_name)

# Get all classes in the imported module
classes = inspect.getmembers(m, lambda member: inspect.isclass(member) and member.__module__ == module_name)
if len(classes) > 0:
for c in classes:
file = inspect.getfile(c[1])
# Create instance of step implementation class.
if _has_methods_with_gauge_decoratores(c[1]):
update_step_registry_with_class(c[1](), file_path) # c[1]() will create a new instance of the class
class_obj = c[1]
if _has_methods_with_gauge_decoratores(class_obj):
update_step_registry_with_class(
instance=class_obj(), # class_obj() will create a new instance of the class
file_path=file_path
)
except:
logger.fatal('Exception occurred while loading step implementations from file: {}.\n{}'.format(rel_path, traceback.format_exc()))
logger.fatal('Exception occurred while loading step implementations from file: {}.\n{}'.format(file_path, traceback.format_exc()))

# Inject instance in each class method (hook/step)
def update_step_registry_with_class(instance, file_path):
# Resolve the absolute path from relative path
# Note: relative path syntax ".." can appear in between the file_path too like "<Project_Root>/../../Other_Project/src/step_impl/file.py"
file_path = os.path.abspath(file_path) if ".." in str(file_path) else file_path
def update_step_registry_with_class(instance, file_path: str):
""" Inject instance in each class method (hook/step) """
method_list = registry.get_all_methods_in(file_path)
for info in method_list:
class_methods = [x[0] for x in inspect.getmembers(instance, inspect.ismethod)]
Expand Down
24 changes: 17 additions & 7 deletions getgauge/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
import os
import re
import sys
from pathlib import Path
from subprocess import call
from typing import Union
from uuid import uuid1

from getgauge import logger
Expand Down Expand Up @@ -111,6 +113,9 @@ def __init__(self):
for hook in Registry.hooks:
self.__def_hook(hook)

def get_steps_map(self):
return self.__steps_map

def __def_hook(self, hook):
def get(self, tags=None):
return _filter_hooks(tags, getattr(self, '__{}'.format(hook)))
Expand Down Expand Up @@ -172,33 +177,33 @@ def get_step_positions(self, file_name):
positions = []
for step, infos in self.__steps_map.items():
positions = positions + [{'stepValue': step, 'span': i.span}
for i in infos if i.file_name == file_name]
for i in infos if paths_equal(i.file_name, file_name)]
return positions

def _get_all_hooks(self, file_name):
all_hooks = []
for hook in self.hooks:
all_hooks = all_hooks + \
[h for h in getattr(self, "__{}".format(hook))
if h.file_name == file_name]
[h for h in getattr(self, "__{}".format(hook))
if paths_equal(h.file_name, file_name)]
return all_hooks

def get_all_methods_in(self, file_name):
def get_all_methods_in(self, file_name: str):
methods = []
for _, infos in self.__steps_map.items():
methods = methods + [i for i in infos if i.file_name == file_name]
methods = methods + [i for i in infos if paths_equal(i.file_name, file_name)]
return methods + self._get_all_hooks(file_name)

def is_file_cached(self, file_name):
for _, infos in self.__steps_map.items():
if any(i.file_name == file_name for i in infos):
if any(paths_equal(i.file_name, file_name) for i in infos):
return True
return False

def remove_steps(self, file_name):
new_map = {}
for step, infos in self.__steps_map.items():
filtered_info = [i for i in infos if i.file_name != file_name]
filtered_info = [i for i in infos if not paths_equal(i.file_name, file_name)]
if len(filtered_info) > 0:
new_map[step] = filtered_info
self.__steps_map = new_map
Expand All @@ -209,6 +214,11 @@ def clear(self):
setattr(self, '__{}'.format(hook), [])


def paths_equal(p1: Union[str, Path], p2: Union[str, Path]) -> bool:
""" Normalize paths in order to compare them. """
return os.path.normcase(str(p1)) == os.path.normcase(str(p2))


def _filter_hooks(tags, hooks):
filtered_hooks = []
for hook in hooks:
Expand Down
2 changes: 1 addition & 1 deletion python.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"id": "python",
"version": "0.4.11",
"version": "0.4.12",
"description": "Python support for gauge",
"run": {
"windows": [
Expand Down
69 changes: 52 additions & 17 deletions tests/test_impl_loader.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,65 @@
import os
import unittest
from pathlib import Path

from getgauge.impl_loader import update_step_registry_with_class
from test_relative_import.relative_import_class import Sample
from getgauge.impl_loader import load_impls
from getgauge.registry import registry

DIRECTORY_NAME = "test_relative_import"


class ImplLoaderTest(unittest.TestCase):
def setUp(self):
self.curr_dir = os.getcwd()
self.relative_file_path = os.path.join('..', 'test_relative_import', 'relative_import_class.py')
self.relative_file_path_one_level_above = os.path.join('tests', '..', 'test_relative_import', 'relative_import_class.py')

def test_update_step_registry_with_class(self):
os.chdir('tests')
method_list = update_step_registry_with_class(Sample(), self.relative_file_path)
os.chdir(self.curr_dir)
self.assertEqual(["Greet <name> from inside the class",
"Greet <name> from outside the class"],
[method.step_text for method in method_list])

test_relative_import_directory = str(Path(__file__).resolve().parent / DIRECTORY_NAME)
relative_file_path = os.path.join('..', DIRECTORY_NAME)

load_impls(
step_impl_dirs=[relative_file_path],
project_root=test_relative_import_directory
)

loaded_steps = registry.get_steps_map()

self.assertEqual(2, len(loaded_steps))

step_infos_of_class_instance = loaded_steps["Greet {} from inside the class"]

self.assertEqual(1, len(step_infos_of_class_instance))
self.assertIsNotNone(step_infos_of_class_instance[0].instance)

self.assertEqual(
["Greet <name> from inside the class", "Greet <name> from outside the class"],
registry.steps()
)

def test_update_step_registry_with_class_one_level_above(self):
os.chdir(self.curr_dir)
method_list = update_step_registry_with_class(Sample(), self.relative_file_path_one_level_above)
self.assertEqual(["Greet <name> from inside the class",
"Greet <name> from outside the class"],
[method.step_text for method in method_list])

repo_root_directory = str(Path(__file__).resolve().parent.parent)
relative_file_path_one_level_above = os.path.join('tests', '..', 'tests', DIRECTORY_NAME)

load_impls(
step_impl_dirs=[relative_file_path_one_level_above],
project_root=repo_root_directory
)

loaded_steps = registry.get_steps_map()

self.assertEqual(2, len(loaded_steps), f"Steps found: {loaded_steps}")

step_infos_of_class_instance = loaded_steps["Greet {} from inside the class"]

self.assertEqual(1, len(step_infos_of_class_instance))
self.assertIsNotNone(step_infos_of_class_instance[0].instance)

self.assertEqual(
["Greet <name> from inside the class", "Greet <name> from outside the class"],
registry.steps()
)

def tearDown(self):
registry.clear()


if __name__ == '__main__':
Expand Down
33 changes: 33 additions & 0 deletions tests/test_registry.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import re
import sys
import unittest

from getgauge.registry import Registry
Expand Down Expand Up @@ -361,6 +362,38 @@ def test_Registry_get_all_methods_in_should_give_all_the_methods_define_in_that_
self.assertEqual(3, len(registry.get_all_methods_in("foo.py")))
self.assertEqual(2, len(registry.get_all_methods_in("bar.py")))

@unittest.skipIf(not sys.platform.startswith("win"), "Test is designed to cover Windows like paths")
def test_Registry_get_all_methods_in_should_handle_paths_case_sensitive(self):
lower_c_drive = 'c:/random/path/foo.py'
upper_c_drive = 'C:/random/path/foo.py'

step_infos = [
{'text': 'Foo', 'func': 'func1', 'file_name': lower_c_drive},
{'text': 'Foo <>', 'func': 'func2', 'file_name': upper_c_drive}
]
for info in step_infos:
registry.add_step(info['text'], info['func'], info['file_name'])

""" Note: we should find both steps regardless the different spelling as the path is in fact equal! """
self.assertEqual(2, len(registry.get_all_methods_in(lower_c_drive)))
self.assertEqual(2, len(registry.get_all_methods_in(upper_c_drive)))

@unittest.skipIf(sys.platform.startswith("win"), "Fails on Windows due to case sensitivity")
def test_Registry_get_all_methods_in_should_handle_paths_case_sensitive_on_mac(self):
path1 = '/random/path/foo.py'
path2 = '/random/PATH/foo.py'

step_infos = [
{'text': 'Foo', 'func': 'func1', 'file_name': path1},
{'text': 'Foo <>', 'func': 'func2', 'file_name': path2}
]
for info in step_infos:
registry.add_step(info['text'], info['func'], info['file_name'])

""" Note: since the paths are in fact different, they should be treated as different paths! """
self.assertEqual(1, len(registry.get_all_methods_in(path1)))
self.assertEqual(1, len(registry.get_all_methods_in(path2)))

def tearDown(self):
global registry
registry = Registry()
Expand Down
Loading