From 9068899d38d994067ffdb850bed8eb1c17cbd3fc Mon Sep 17 00:00:00 2001 From: utotsubasa Date: Sun, 9 Feb 2025 23:00:41 +0900 Subject: [PATCH 01/17] feat: add in-memory target --- gokart/in_memory/__init__.py | 2 + gokart/in_memory/repository.py | 38 +++++++++++++++++ gokart/in_memory/target.py | 50 +++++++++++++++++++++++ test/in_memory/test_in_memory_target.py | 32 +++++++++++++++ test/in_memory/test_repository.py | 54 +++++++++++++++++++++++++ 5 files changed, 176 insertions(+) create mode 100644 gokart/in_memory/__init__.py create mode 100644 gokart/in_memory/repository.py create mode 100644 gokart/in_memory/target.py create mode 100644 test/in_memory/test_in_memory_target.py create mode 100644 test/in_memory/test_repository.py diff --git a/gokart/in_memory/__init__.py b/gokart/in_memory/__init__.py new file mode 100644 index 00000000..dfab283d --- /dev/null +++ b/gokart/in_memory/__init__.py @@ -0,0 +1,2 @@ +from .repository import InMemeryCacheRepository +from .target import InMemoryTarget, make_inmemory_target \ No newline at end of file diff --git a/gokart/in_memory/repository.py b/gokart/in_memory/repository.py new file mode 100644 index 00000000..efb07929 --- /dev/null +++ b/gokart/in_memory/repository.py @@ -0,0 +1,38 @@ +import abc +from typing import Any + +class BaseRepository(abc.ABC): + ... + +class InMemeryCacheRepository(BaseRepository): + _cache: dict[str, Any] = {} + def __init__(self): + pass + + def get(self, id: str): + return self._cache[id] + + def set(self, id: str, obj: Any): + assert not self.has(id) + self._cache[id] = obj + + def has(self, id: str): + return id in self._cache + + def remove_by_id(self, id: str): + assert self.has(id) + del self._cache[id] + + def empty(self): + return not self._cache + + def clear(self): + self._cache.clear() + + def get_gen(self): + for key, value in self._cache.items(): + yield key, value + + @property + def size(self): + return len(self._cache) \ No newline at end of file diff --git a/gokart/in_memory/target.py b/gokart/in_memory/target.py new file mode 100644 index 00000000..8d9dda3b --- /dev/null +++ b/gokart/in_memory/target.py @@ -0,0 +1,50 @@ +from gokart.target import TargetOnKart, TaskLockParams +from gokart.in_memory.repository import InMemeryCacheRepository +from datetime import datetime + +_repository = InMemeryCacheRepository() + +# TODO: unnecessary params in task_lock_param expecially regarding redies +class InMemoryTarget(TargetOnKart): + def __init__( + self, + id: str, + task_lock_param: TaskLockParams + ): + self._id = id + self._task_lock_params = task_lock_param + self._last_modification_time_value: None | datetime = None + + def _exists(self): + # import pdb;pdb.set_trace() + return _repository.has(self._id) + + def _get_task_lock_params(self): + return self._task_lock_params + + def _load(self): + # import pdb + # pdb.set_trace() + return _repository.get(self._id) + + def _dump(self, obj): + return _repository.set(self._id, obj) + + def _remove(self) -> None: + _repository.remove_by_id(self._id) + + def _last_modification_time(self) -> datetime: + if self._last_modification_time_value is None: + raise ValueError(f"No object(s) which id is {self._id} are stored before.") + self._last_modification_time_value + + def _path(self): + # TODO: this module name `_path` migit not be appropriate + return self._id + + @property + def id(self): + return self._id + +def make_inmemory_target(target_key: str, task_lock_params: TaskLockParams | None = None): + return InMemoryTarget(target_key, task_lock_params) diff --git a/test/in_memory/test_in_memory_target.py b/test/in_memory/test_in_memory_target.py new file mode 100644 index 00000000..4b173610 --- /dev/null +++ b/test/in_memory/test_in_memory_target.py @@ -0,0 +1,32 @@ +from gokart.conflict_prevention_lock.task_lock import TaskLockParams +from gokart.in_memory import make_inmemory_target, InMemoryTarget, InMemeryCacheRepository +import pytest + +class TestInMemoryTarget: + @pytest.fixture + def task_lock_params(self): + return TaskLockParams( + redis_host=None, + redis_port=None, + redis_timeout=None, + redis_key='dummy', + should_task_lock=False, + raise_task_lock_exception_on_collision=False, + lock_extend_seconds=0 + ) + @pytest.fixture + def target(self, task_lock_params: TaskLockParams): + return make_inmemory_target(target_key='dummy_task_id', task_lock_params=task_lock_params) + + @pytest.fixture(autouse=True) + def clear_repo(self): + InMemeryCacheRepository().clear() + + def test_dump_and_load_data(self, target: InMemoryTarget): + dumped = 'dummy_data' + target.dump(dumped) + loaded = target.load() + assert loaded == dumped + + with pytest.raises(AssertionError): + target.dump('another_data') \ No newline at end of file diff --git a/test/in_memory/test_repository.py b/test/in_memory/test_repository.py new file mode 100644 index 00000000..2610b80a --- /dev/null +++ b/test/in_memory/test_repository.py @@ -0,0 +1,54 @@ +from gokart.in_memory import InMemeryCacheRepository as Repo +import pytest + +dummy_num = 100 + +class TestInMemoryCacheRepository: + @pytest.fixture + def repo(self): + repo = Repo() + repo.clear() + return repo + + def test_set(self, repo: Repo): + repo.set("dummy_id", dummy_num) + assert repo.size == 1 + for key, value in repo.get_gen(): + assert (key, value) == ("dummy_id", dummy_num) + + with pytest.raises(AssertionError): + repo.set('dummy_id', "dummy_value") + + repo.set('another_id', 'another_value') + assert repo.size == 2 + + def test_get(self, repo: Repo): + repo.set('dummy_id', dummy_num) + repo.set('another_id', 'another_val') + + """Raise Error when key doesn't exist.""" + with pytest.raises(KeyError): + repo.get('not_exist_id') + + assert repo.get('dummy_id') == dummy_num + assert repo.get('another_id') == 'another_val' + + def test_empty(self, repo: Repo): + assert repo.empty() + repo.set("dummmy_id", dummy_num) + assert not repo.empty() + + def test_has(self, repo: Repo): + assert not repo.has('dummy_id') + repo.set('dummy_id', dummy_num) + assert repo.has('dummy_id') + + def test_remove_by_id(self, repo: Repo): + repo.set('dummy_id', dummy_num) + + with pytest.raises(AssertionError): + repo.remove_by_id('not_exist_id') + + assert repo.has('dummy_id') + repo.remove_by_id('dummy_id') + assert not repo.has('dummy_id') From 58aa9a56e59504b709a3bb9ad3c4ec7a22d7a794 Mon Sep 17 00:00:00 2001 From: utotsubasa Date: Mon, 10 Feb 2025 10:42:15 +0900 Subject: [PATCH 02/17] feat: add a data format to be stored in `Repository` feat: last_modification_time feature in `InMemoryTarget` style: add some type hints fix: fix typo in `InMemoryCacheRepository` test: add some tests for `InMemoryTarget` and `InMemoryCacheRepository` --- gokart/in_memory/__init__.py | 2 +- gokart/in_memory/data.py | 16 ++++++++ gokart/in_memory/repository.py | 39 +++++++++++-------- gokart/in_memory/target.py | 51 ++++++++++++------------ test/in_memory/test_in_memory_target.py | 34 ++++++++++++---- test/in_memory/test_repository.py | 52 ++++++++++++++----------- 6 files changed, 120 insertions(+), 74 deletions(-) create mode 100644 gokart/in_memory/data.py diff --git a/gokart/in_memory/__init__.py b/gokart/in_memory/__init__.py index dfab283d..f3685297 100644 --- a/gokart/in_memory/__init__.py +++ b/gokart/in_memory/__init__.py @@ -1,2 +1,2 @@ -from .repository import InMemeryCacheRepository +from .repository import InMemoryCacheRepository from .target import InMemoryTarget, make_inmemory_target \ No newline at end of file diff --git a/gokart/in_memory/data.py b/gokart/in_memory/data.py new file mode 100644 index 00000000..5c26998e --- /dev/null +++ b/gokart/in_memory/data.py @@ -0,0 +1,16 @@ +from dataclasses import dataclass +from typing import Any +from abc import ABC +from datetime import datetime + +class BaseData(ABC): + ... + +@dataclass +class InMemoryData(BaseData): + value: Any + last_modified_time: datetime + + @classmethod + def create_data(self, value: Any) -> 'InMemoryData': + return InMemoryData(value=value, last_modified_time=datetime.now()) \ No newline at end of file diff --git a/gokart/in_memory/repository.py b/gokart/in_memory/repository.py index efb07929..fc871366 100644 --- a/gokart/in_memory/repository.py +++ b/gokart/in_memory/repository.py @@ -1,38 +1,45 @@ import abc -from typing import Any +from typing import Any, Iterator +from .data import InMemoryData class BaseRepository(abc.ABC): ... -class InMemeryCacheRepository(BaseRepository): - _cache: dict[str, Any] = {} +class InMemoryCacheRepository(BaseRepository): + _cache: dict[str, InMemoryData] = {} def __init__(self): pass - def get(self, id: str): - return self._cache[id] + def get_value(self, key: str) -> Any: + return self._get_data(key).value - def set(self, id: str, obj: Any): - assert not self.has(id) - self._cache[id] = obj + def get_last_modification_time(self, key: str): + return self._get_data(key).last_modified_time + + def _get_data(self, id: str) -> InMemoryData: + return self._cache[id] + + def set_value(self, id: str, obj: Any) -> None: + data = InMemoryData.create_data(obj) + self._cache[id] = data - def has(self, id: str): + def has(self, id: str) -> bool: return id in self._cache - def remove_by_id(self, id: str): + def remove(self, id: str) -> None: assert self.has(id) del self._cache[id] - def empty(self): + def empty(self) -> bool: return not self._cache - def clear(self): + def clear(self) -> None: self._cache.clear() - def get_gen(self): - for key, value in self._cache.items(): - yield key, value + def get_gen(self) -> Iterator[tuple[str, Any]]: + for key, data in self._cache.items(): + yield key, data.value @property - def size(self): + def size(self) -> int: return len(self._cache) \ No newline at end of file diff --git a/gokart/in_memory/target.py b/gokart/in_memory/target.py index 8d9dda3b..9b910c68 100644 --- a/gokart/in_memory/target.py +++ b/gokart/in_memory/target.py @@ -1,50 +1,47 @@ from gokart.target import TargetOnKart, TaskLockParams -from gokart.in_memory.repository import InMemeryCacheRepository +from gokart.in_memory.repository import InMemoryCacheRepository from datetime import datetime +from typing import Any +from logging import warning -_repository = InMemeryCacheRepository() +_repository = InMemoryCacheRepository() -# TODO: unnecessary params in task_lock_param expecially regarding redies class InMemoryTarget(TargetOnKart): def __init__( self, - id: str, + data_key: str, task_lock_param: TaskLockParams ): - self._id = id + if task_lock_param.should_task_lock: + warning(f'Redis in {self.__class__.__name__} is not supported now.') + + self._data_key = data_key self._task_lock_params = task_lock_param - self._last_modification_time_value: None | datetime = None - def _exists(self): - # import pdb;pdb.set_trace() - return _repository.has(self._id) + def _exists(self) -> bool: + return _repository.has(self._data_key) - def _get_task_lock_params(self): + def _get_task_lock_params(self) -> TaskLockParams: return self._task_lock_params - def _load(self): - # import pdb - # pdb.set_trace() - return _repository.get(self._id) + def _load(self) -> Any: + return _repository.get_value(self._data_key) - def _dump(self, obj): - return _repository.set(self._id, obj) + def _dump(self, obj: Any) -> None: + return _repository.set_value(self._data_key, obj) def _remove(self) -> None: - _repository.remove_by_id(self._id) + _repository.remove(self._data_key) def _last_modification_time(self) -> datetime: - if self._last_modification_time_value is None: - raise ValueError(f"No object(s) which id is {self._id} are stored before.") - self._last_modification_time_value - - def _path(self): - # TODO: this module name `_path` migit not be appropriate - return self._id + if not _repository.has(self._data_key): + raise ValueError(f"No object(s) which id is {self._data_key} are stored before.") + time = _repository.get_last_modification_time(self._data_key) + return time - @property - def id(self): - return self._id + def _path(self) -> str: + # TODO: this module name `_path` migit not be appropriate + return self._data_key def make_inmemory_target(target_key: str, task_lock_params: TaskLockParams | None = None): return InMemoryTarget(target_key, task_lock_params) diff --git a/test/in_memory/test_in_memory_target.py b/test/in_memory/test_in_memory_target.py index 4b173610..125b634d 100644 --- a/test/in_memory/test_in_memory_target.py +++ b/test/in_memory/test_in_memory_target.py @@ -1,7 +1,8 @@ from gokart.conflict_prevention_lock.task_lock import TaskLockParams -from gokart.in_memory import make_inmemory_target, InMemoryTarget, InMemeryCacheRepository +from gokart.in_memory import make_inmemory_target, InMemoryTarget, InMemoryCacheRepository import pytest - +from datetime import datetime +from time import sleep class TestInMemoryTarget: @pytest.fixture def task_lock_params(self): @@ -14,13 +15,14 @@ def task_lock_params(self): raise_task_lock_exception_on_collision=False, lock_extend_seconds=0 ) + @pytest.fixture def target(self, task_lock_params: TaskLockParams): - return make_inmemory_target(target_key='dummy_task_id', task_lock_params=task_lock_params) - + return make_inmemory_target(target_key='dummy_key', task_lock_params=task_lock_params) + @pytest.fixture(autouse=True) def clear_repo(self): - InMemeryCacheRepository().clear() + InMemoryCacheRepository().clear() def test_dump_and_load_data(self, target: InMemoryTarget): dumped = 'dummy_data' @@ -28,5 +30,23 @@ def test_dump_and_load_data(self, target: InMemoryTarget): loaded = target.load() assert loaded == dumped - with pytest.raises(AssertionError): - target.dump('another_data') \ No newline at end of file + def test_exist(self, target: InMemoryTarget): + assert not target.exists() + target.dump('dummy_data') + assert target.exists() + + def test_last_modified_time(self, target: InMemoryTarget): + input = 'dummy_data' + target.dump(input) + time = target.last_modification_time() + assert isinstance(time, datetime) + + sleep(0.1) + another_input = 'another_data' + target.dump(another_input) + another_time = target.last_modification_time() + assert time < another_time + + target.remove() + with pytest.raises(ValueError): + assert target.last_modification_time() diff --git a/test/in_memory/test_repository.py b/test/in_memory/test_repository.py index 2610b80a..480910d3 100644 --- a/test/in_memory/test_repository.py +++ b/test/in_memory/test_repository.py @@ -1,5 +1,6 @@ -from gokart.in_memory import InMemeryCacheRepository as Repo +from gokart.in_memory import InMemoryCacheRepository as Repo import pytest +import time dummy_num = 100 @@ -11,44 +12,49 @@ def repo(self): return repo def test_set(self, repo: Repo): - repo.set("dummy_id", dummy_num) + repo.set_value("dummy_key", dummy_num) assert repo.size == 1 for key, value in repo.get_gen(): - assert (key, value) == ("dummy_id", dummy_num) + assert (key, value) == ("dummy_key", dummy_num) - with pytest.raises(AssertionError): - repo.set('dummy_id', "dummy_value") - - repo.set('another_id', 'another_value') + repo.set_value('another_key', 'another_value') assert repo.size == 2 def test_get(self, repo: Repo): - repo.set('dummy_id', dummy_num) - repo.set('another_id', 'another_val') + repo.set_value('dummy_key', dummy_num) + repo.set_value('another_key', 'another_value') """Raise Error when key doesn't exist.""" with pytest.raises(KeyError): - repo.get('not_exist_id') + repo.get_value('not_exist_key') - assert repo.get('dummy_id') == dummy_num - assert repo.get('another_id') == 'another_val' + assert repo.get_value('dummy_key') == dummy_num + assert repo.get_value('another_key') == 'another_value' def test_empty(self, repo: Repo): assert repo.empty() - repo.set("dummmy_id", dummy_num) + repo.set_value("dummmy_key", dummy_num) assert not repo.empty() def test_has(self, repo: Repo): - assert not repo.has('dummy_id') - repo.set('dummy_id', dummy_num) - assert repo.has('dummy_id') + assert not repo.has('dummy_key') + repo.set_value('dummy_key', dummy_num) + assert repo.has('dummy_key') + assert not repo.has('not_exist_key') - def test_remove_by_id(self, repo: Repo): - repo.set('dummy_id', dummy_num) + def test_remove(self, repo: Repo): + repo.set_value('dummy_key', dummy_num) with pytest.raises(AssertionError): - repo.remove_by_id('not_exist_id') - - assert repo.has('dummy_id') - repo.remove_by_id('dummy_id') - assert not repo.has('dummy_id') + repo.remove('not_exist_key') + + repo.remove('dummy_key') + assert not repo.has('dummy_key') + + def test_last_modification_time(self, repo: Repo): + repo.set_value('dummy_key', dummy_num) + date1 = repo.get_last_modification_time('dummy_key') + time.sleep(0.1) + repo.set_value('dummy_key', dummy_num) + date2 = repo.get_last_modification_time('dummy_key') + assert date1 < date2 \ No newline at end of file From 4c311bb439b0dbd19f2f57d81632173e4fc275f7 Mon Sep 17 00:00:00 2001 From: utotsubasa Date: Mon, 10 Feb 2025 11:50:56 +0900 Subject: [PATCH 03/17] fix: fix linting errors --- gokart/file_processor.py | 10 ++++----- gokart/in_memory/__init__.py | 4 ++-- gokart/in_memory/data.py | 10 ++++----- gokart/in_memory/repository.py | 18 ++++++++------- gokart/in_memory/target.py | 29 ++++++++++++------------- test/in_memory/test_in_memory_target.py | 16 +++++++++----- test/in_memory/test_repository.py | 23 +++++++++++--------- 7 files changed, 58 insertions(+), 52 deletions(-) diff --git a/gokart/file_processor.py b/gokart/file_processor.py index 87958327..21b8b77f 100644 --- a/gokart/file_processor.py +++ b/gokart/file_processor.py @@ -166,9 +166,9 @@ def load(self, file): return pd.DataFrame() def dump(self, obj, file): - assert isinstance(obj, pd.DataFrame) or isinstance(obj, pd.Series) or isinstance(obj, dict), ( - f'requires pd.DataFrame or pd.Series or dict, but {type(obj)} is passed.' - ) + assert ( + isinstance(obj, pd.DataFrame) or isinstance(obj, pd.Series) or isinstance(obj, dict) + ), f'requires pd.DataFrame or pd.Series or dict, but {type(obj)} is passed.' if isinstance(obj, dict): obj = pd.DataFrame.from_dict(obj) obj.to_json(file) @@ -263,10 +263,8 @@ def dump(self, obj, file): if self._store_index_in_feather: index_column_name = f'{self.INDEX_COLUMN_PREFIX}{dump_obj.index.name}' - assert index_column_name not in dump_obj.columns, ( - f'column name {index_column_name} already exists in dump_obj. \ + assert index_column_name not in dump_obj.columns, f'column name {index_column_name} already exists in dump_obj. \ Consider not saving index by setting store_index_in_feather=False.' - ) assert dump_obj.index.name != 'None', 'index name is "None", which is not allowed in gokart. Consider setting another index name.' dump_obj[index_column_name] = dump_obj.index diff --git a/gokart/in_memory/__init__.py b/gokart/in_memory/__init__.py index f3685297..69e7e4c3 100644 --- a/gokart/in_memory/__init__.py +++ b/gokart/in_memory/__init__.py @@ -1,2 +1,2 @@ -from .repository import InMemoryCacheRepository -from .target import InMemoryTarget, make_inmemory_target \ No newline at end of file +from .repository import InMemoryCacheRepository # noqa:F401 +from .target import InMemoryTarget, make_inmemory_target # noqa:F401 diff --git a/gokart/in_memory/data.py b/gokart/in_memory/data.py index 5c26998e..9362cfe6 100644 --- a/gokart/in_memory/data.py +++ b/gokart/in_memory/data.py @@ -1,10 +1,10 @@ from dataclasses import dataclass -from typing import Any -from abc import ABC from datetime import datetime +from typing import Any + + +class BaseData: ... -class BaseData(ABC): - ... @dataclass class InMemoryData(BaseData): @@ -13,4 +13,4 @@ class InMemoryData(BaseData): @classmethod def create_data(self, value: Any) -> 'InMemoryData': - return InMemoryData(value=value, last_modified_time=datetime.now()) \ No newline at end of file + return InMemoryData(value=value, last_modified_time=datetime.now()) diff --git a/gokart/in_memory/repository.py b/gokart/in_memory/repository.py index fc871366..6babaed7 100644 --- a/gokart/in_memory/repository.py +++ b/gokart/in_memory/repository.py @@ -1,18 +1,20 @@ -import abc from typing import Any, Iterator + from .data import InMemoryData -class BaseRepository(abc.ABC): - ... + +class BaseRepository: ... + class InMemoryCacheRepository(BaseRepository): _cache: dict[str, InMemoryData] = {} + def __init__(self): pass def get_value(self, key: str) -> Any: return self._get_data(key).value - + def get_last_modification_time(self, key: str): return self._get_data(key).last_modified_time @@ -22,17 +24,17 @@ def _get_data(self, id: str) -> InMemoryData: def set_value(self, id: str, obj: Any) -> None: data = InMemoryData.create_data(obj) self._cache[id] = data - + def has(self, id: str) -> bool: return id in self._cache - + def remove(self, id: str) -> None: assert self.has(id) del self._cache[id] def empty(self) -> bool: return not self._cache - + def clear(self) -> None: self._cache.clear() @@ -42,4 +44,4 @@ def get_gen(self) -> Iterator[tuple[str, Any]]: @property def size(self) -> int: - return len(self._cache) \ No newline at end of file + return len(self._cache) diff --git a/gokart/in_memory/target.py b/gokart/in_memory/target.py index 9b910c68..067e5299 100644 --- a/gokart/in_memory/target.py +++ b/gokart/in_memory/target.py @@ -1,41 +1,39 @@ -from gokart.target import TargetOnKart, TaskLockParams -from gokart.in_memory.repository import InMemoryCacheRepository from datetime import datetime -from typing import Any from logging import warning +from typing import Any + +from gokart.in_memory.repository import InMemoryCacheRepository +from gokart.target import TargetOnKart, TaskLockParams _repository = InMemoryCacheRepository() + class InMemoryTarget(TargetOnKart): - def __init__( - self, - data_key: str, - task_lock_param: TaskLockParams - ): + def __init__(self, data_key: str, task_lock_param: TaskLockParams): if task_lock_param.should_task_lock: warning(f'Redis in {self.__class__.__name__} is not supported now.') self._data_key = data_key self._task_lock_params = task_lock_param - + def _exists(self) -> bool: return _repository.has(self._data_key) - + def _get_task_lock_params(self) -> TaskLockParams: return self._task_lock_params - + def _load(self) -> Any: return _repository.get_value(self._data_key) - + def _dump(self, obj: Any) -> None: return _repository.set_value(self._data_key, obj) - + def _remove(self) -> None: _repository.remove(self._data_key) - + def _last_modification_time(self) -> datetime: if not _repository.has(self._data_key): - raise ValueError(f"No object(s) which id is {self._data_key} are stored before.") + raise ValueError(f'No object(s) which id is {self._data_key} are stored before.') time = _repository.get_last_modification_time(self._data_key) return time @@ -43,5 +41,6 @@ def _path(self) -> str: # TODO: this module name `_path` migit not be appropriate return self._data_key + def make_inmemory_target(target_key: str, task_lock_params: TaskLockParams | None = None): return InMemoryTarget(target_key, task_lock_params) diff --git a/test/in_memory/test_in_memory_target.py b/test/in_memory/test_in_memory_target.py index 125b634d..c20bfa3b 100644 --- a/test/in_memory/test_in_memory_target.py +++ b/test/in_memory/test_in_memory_target.py @@ -1,8 +1,12 @@ -from gokart.conflict_prevention_lock.task_lock import TaskLockParams -from gokart.in_memory import make_inmemory_target, InMemoryTarget, InMemoryCacheRepository -import pytest from datetime import datetime from time import sleep + +import pytest + +from gokart.conflict_prevention_lock.task_lock import TaskLockParams +from gokart.in_memory import InMemoryCacheRepository, InMemoryTarget, make_inmemory_target + + class TestInMemoryTarget: @pytest.fixture def task_lock_params(self): @@ -13,7 +17,7 @@ def task_lock_params(self): redis_key='dummy', should_task_lock=False, raise_task_lock_exception_on_collision=False, - lock_extend_seconds=0 + lock_extend_seconds=0, ) @pytest.fixture @@ -34,13 +38,13 @@ def test_exist(self, target: InMemoryTarget): assert not target.exists() target.dump('dummy_data') assert target.exists() - + def test_last_modified_time(self, target: InMemoryTarget): input = 'dummy_data' target.dump(input) time = target.last_modification_time() assert isinstance(time, datetime) - + sleep(0.1) another_input = 'another_data' target.dump(another_input) diff --git a/test/in_memory/test_repository.py b/test/in_memory/test_repository.py index 480910d3..30c5b033 100644 --- a/test/in_memory/test_repository.py +++ b/test/in_memory/test_repository.py @@ -1,9 +1,12 @@ -from gokart.in_memory import InMemoryCacheRepository as Repo -import pytest import time +import pytest + +from gokart.in_memory import InMemoryCacheRepository as Repo + dummy_num = 100 + class TestInMemoryCacheRepository: @pytest.fixture def repo(self): @@ -12,11 +15,11 @@ def repo(self): return repo def test_set(self, repo: Repo): - repo.set_value("dummy_key", dummy_num) + repo.set_value('dummy_key', dummy_num) assert repo.size == 1 for key, value in repo.get_gen(): - assert (key, value) == ("dummy_key", dummy_num) - + assert (key, value) == ('dummy_key', dummy_num) + repo.set_value('another_key', 'another_value') assert repo.size == 2 @@ -27,21 +30,21 @@ def test_get(self, repo: Repo): """Raise Error when key doesn't exist.""" with pytest.raises(KeyError): repo.get_value('not_exist_key') - + assert repo.get_value('dummy_key') == dummy_num assert repo.get_value('another_key') == 'another_value' def test_empty(self, repo: Repo): assert repo.empty() - repo.set_value("dummmy_key", dummy_num) + repo.set_value('dummmy_key', dummy_num) assert not repo.empty() - + def test_has(self, repo: Repo): assert not repo.has('dummy_key') repo.set_value('dummy_key', dummy_num) assert repo.has('dummy_key') assert not repo.has('not_exist_key') - + def test_remove(self, repo: Repo): repo.set_value('dummy_key', dummy_num) @@ -57,4 +60,4 @@ def test_last_modification_time(self, repo: Repo): time.sleep(0.1) repo.set_value('dummy_key', dummy_num) date2 = repo.get_last_modification_time('dummy_key') - assert date1 < date2 \ No newline at end of file + assert date1 < date2 From da3ae69d99818520ea814da21808155271062b77 Mon Sep 17 00:00:00 2001 From: utotsubasa Date: Mon, 10 Feb 2025 12:10:10 +0900 Subject: [PATCH 04/17] fix: update type union shorthand to to make compatible with py39 --- gokart/in_memory/target.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gokart/in_memory/target.py b/gokart/in_memory/target.py index 067e5299..82979205 100644 --- a/gokart/in_memory/target.py +++ b/gokart/in_memory/target.py @@ -1,6 +1,6 @@ from datetime import datetime from logging import warning -from typing import Any +from typing import Any, Optional from gokart.in_memory.repository import InMemoryCacheRepository from gokart.target import TargetOnKart, TaskLockParams @@ -42,5 +42,5 @@ def _path(self) -> str: return self._data_key -def make_inmemory_target(target_key: str, task_lock_params: TaskLockParams | None = None): +def make_inmemory_target(target_key: str, task_lock_params: Optional[TaskLockParams] = None): return InMemoryTarget(target_key, task_lock_params) From c09040d95f34989830f833d4bb3e784794a861b7 Mon Sep 17 00:00:00 2001 From: utotsubasa Date: Mon, 10 Feb 2025 12:20:11 +0900 Subject: [PATCH 05/17] style: refactor some base classes to inherite from --- gokart/in_memory/data.py | 4 ++-- gokart/in_memory/repository.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/gokart/in_memory/data.py b/gokart/in_memory/data.py index 9362cfe6..0729ff80 100644 --- a/gokart/in_memory/data.py +++ b/gokart/in_memory/data.py @@ -1,9 +1,9 @@ from dataclasses import dataclass from datetime import datetime -from typing import Any +from typing import Any, Protocol -class BaseData: ... +class BaseData(Protocol): ... @dataclass diff --git a/gokart/in_memory/repository.py b/gokart/in_memory/repository.py index 6babaed7..c4ea8485 100644 --- a/gokart/in_memory/repository.py +++ b/gokart/in_memory/repository.py @@ -1,9 +1,9 @@ -from typing import Any, Iterator +from typing import Any, Iterator, Protocol from .data import InMemoryData -class BaseRepository: ... +class BaseRepository(Protocol): ... class InMemoryCacheRepository(BaseRepository): From 333986df6eba69342e121a233b3c3ed3f327c587 Mon Sep 17 00:00:00 2001 From: utotsubasa Date: Mon, 10 Feb 2025 13:03:24 +0900 Subject: [PATCH 06/17] fix: remove unnessesary optional type --- gokart/in_memory/target.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gokart/in_memory/target.py b/gokart/in_memory/target.py index 82979205..b47e96a7 100644 --- a/gokart/in_memory/target.py +++ b/gokart/in_memory/target.py @@ -1,6 +1,6 @@ from datetime import datetime from logging import warning -from typing import Any, Optional +from typing import Any from gokart.in_memory.repository import InMemoryCacheRepository from gokart.target import TargetOnKart, TaskLockParams @@ -42,5 +42,5 @@ def _path(self) -> str: return self._data_key -def make_inmemory_target(target_key: str, task_lock_params: Optional[TaskLockParams] = None): +def make_inmemory_target(target_key: str, task_lock_params: TaskLockParams): return InMemoryTarget(target_key, task_lock_params) From b0c6d5a2973fa0f87591d43aec2c59fe392aafdd Mon Sep 17 00:00:00 2001 From: utotsubasa Date: Mon, 10 Feb 2025 20:27:16 +0900 Subject: [PATCH 07/17] fix: fix format error --- gokart/file_processor.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/gokart/file_processor.py b/gokart/file_processor.py index 21b8b77f..87958327 100644 --- a/gokart/file_processor.py +++ b/gokart/file_processor.py @@ -166,9 +166,9 @@ def load(self, file): return pd.DataFrame() def dump(self, obj, file): - assert ( - isinstance(obj, pd.DataFrame) or isinstance(obj, pd.Series) or isinstance(obj, dict) - ), f'requires pd.DataFrame or pd.Series or dict, but {type(obj)} is passed.' + assert isinstance(obj, pd.DataFrame) or isinstance(obj, pd.Series) or isinstance(obj, dict), ( + f'requires pd.DataFrame or pd.Series or dict, but {type(obj)} is passed.' + ) if isinstance(obj, dict): obj = pd.DataFrame.from_dict(obj) obj.to_json(file) @@ -263,8 +263,10 @@ def dump(self, obj, file): if self._store_index_in_feather: index_column_name = f'{self.INDEX_COLUMN_PREFIX}{dump_obj.index.name}' - assert index_column_name not in dump_obj.columns, f'column name {index_column_name} already exists in dump_obj. \ + assert index_column_name not in dump_obj.columns, ( + f'column name {index_column_name} already exists in dump_obj. \ Consider not saving index by setting store_index_in_feather=False.' + ) assert dump_obj.index.name != 'None', 'index name is "None", which is not allowed in gokart. Consider setting another index name.' dump_obj[index_column_name] = dump_obj.index From a53378a12b90259c034344f39ddcfd547031d694 Mon Sep 17 00:00:00 2001 From: utotsubasa Date: Tue, 11 Feb 2025 00:05:47 +0900 Subject: [PATCH 08/17] chore: add an assertion error message style: update variable name from `id` to `key` --- gokart/in_memory/repository.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/gokart/in_memory/repository.py b/gokart/in_memory/repository.py index c4ea8485..727935ca 100644 --- a/gokart/in_memory/repository.py +++ b/gokart/in_memory/repository.py @@ -18,19 +18,19 @@ def get_value(self, key: str) -> Any: def get_last_modification_time(self, key: str): return self._get_data(key).last_modified_time - def _get_data(self, id: str) -> InMemoryData: - return self._cache[id] + def _get_data(self, key: str) -> InMemoryData: + return self._cache[key] - def set_value(self, id: str, obj: Any) -> None: + def set_value(self, key: str, obj: Any) -> None: data = InMemoryData.create_data(obj) - self._cache[id] = data + self._cache[key] = data - def has(self, id: str) -> bool: - return id in self._cache + def has(self, key: str) -> bool: + return key in self._cache - def remove(self, id: str) -> None: - assert self.has(id) - del self._cache[id] + def remove(self, key: str) -> None: + assert self.has(key), f'{key} does not exist.' + del self._cache[key] def empty(self) -> bool: return not self._cache From 4d9b033b9c0329faab0b26d134b42a1ee3a771f6 Mon Sep 17 00:00:00 2001 From: utotsubasa Date: Tue, 11 Feb 2025 09:09:11 +0900 Subject: [PATCH 09/17] style: update the variable name to for code consistency --- gokart/in_memory/data.py | 4 ++-- gokart/in_memory/repository.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/gokart/in_memory/data.py b/gokart/in_memory/data.py index 0729ff80..a01c3ad2 100644 --- a/gokart/in_memory/data.py +++ b/gokart/in_memory/data.py @@ -9,8 +9,8 @@ class BaseData(Protocol): ... @dataclass class InMemoryData(BaseData): value: Any - last_modified_time: datetime + last_modification_time: datetime @classmethod def create_data(self, value: Any) -> 'InMemoryData': - return InMemoryData(value=value, last_modified_time=datetime.now()) + return InMemoryData(value=value, last_modification_time=datetime.now()) diff --git a/gokart/in_memory/repository.py b/gokart/in_memory/repository.py index 727935ca..4d1920b6 100644 --- a/gokart/in_memory/repository.py +++ b/gokart/in_memory/repository.py @@ -16,7 +16,7 @@ def get_value(self, key: str) -> Any: return self._get_data(key).value def get_last_modification_time(self, key: str): - return self._get_data(key).last_modified_time + return self._get_data(key).last_modification_time def _get_data(self, key: str) -> InMemoryData: return self._cache[key] From b9dbb4d9756aa8c381fbd4faea422c0eef848378 Mon Sep 17 00:00:00 2001 From: utotsubasa Date: Wed, 12 Feb 2025 09:40:40 +0900 Subject: [PATCH 10/17] docs: add a document of how to create InMemoryTarget --- docs/task_on_kart.rst | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/docs/task_on_kart.rst b/docs/task_on_kart.rst index ce52c6d5..09e7a59f 100644 --- a/docs/task_on_kart.rst +++ b/docs/task_on_kart.rst @@ -286,3 +286,24 @@ If you want to dump csv file with other encodings, you can use `encoding` parame def output(self): return self.make_target('file_name.csv', processor=CsvFileProcessor(encoding='cp932')) # This will dump csv as 'cp932' which is used in Windows. + +Cache output in memory instead of dumping to files +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +You can use :class:`~InMemoryTarget` to cache output in memory instead of dumping to files by calling :func:`~gokart.target.make_inmemory_target`. + +.. code:: python + + from gokart.in_memory.target import make_inmemory_target + + def output(self): + unique_id = self.make_unique_id() if use_unique_id else None + # TaskLock is not supported in InMemoryTarget, so it's dummy + task_lock_params = make_task_lock_params( + file_path='dummy_path', + unique_id=unique_id, + redis_host=None, + redis_port=None, + redis_timeout=self.redis_timeout, + raise_task_lock_exception_on_collision=False, + ) + return make_inmemory_target('dummy_path', task_lock_params, unique_id) \ No newline at end of file From 769404a50133ba80ceab87199cc9f0fba43e1403 Mon Sep 17 00:00:00 2001 From: utotsubasa Date: Wed, 12 Feb 2025 21:28:08 +0900 Subject: [PATCH 11/17] chore: update name from `inmemory` to `in_memory` chore: add type hints style: remove `Protocol` --- docs/task_on_kart.rst | 28 +++++++++++++------------ gokart/in_memory/__init__.py | 2 +- gokart/in_memory/data.py | 6 ++---- gokart/in_memory/repository.py | 6 ++---- gokart/in_memory/target.py | 2 +- test/in_memory/test_in_memory_target.py | 10 ++++----- test/in_memory/test_repository.py | 2 +- 7 files changed, 27 insertions(+), 29 deletions(-) diff --git a/docs/task_on_kart.rst b/docs/task_on_kart.rst index 09e7a59f..0941996f 100644 --- a/docs/task_on_kart.rst +++ b/docs/task_on_kart.rst @@ -289,21 +289,23 @@ If you want to dump csv file with other encodings, you can use `encoding` parame Cache output in memory instead of dumping to files ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -You can use :class:`~InMemoryTarget` to cache output in memory instead of dumping to files by calling :func:`~gokart.target.make_inmemory_target`. +You can use :class:`~InMemoryTarget` to cache output in memory instead of dumping to files by calling :func:`~gokart.target.make_in_memory_target`. + +Please note that :class:`~InMemoryTarget` is an experimental feature. .. code:: python - from gokart.in_memory.target import make_inmemory_target + from gokart.in_memory.target import make_in_memory_target def output(self): - unique_id = self.make_unique_id() if use_unique_id else None - # TaskLock is not supported in InMemoryTarget, so it's dummy - task_lock_params = make_task_lock_params( - file_path='dummy_path', - unique_id=unique_id, - redis_host=None, - redis_port=None, - redis_timeout=self.redis_timeout, - raise_task_lock_exception_on_collision=False, - ) - return make_inmemory_target('dummy_path', task_lock_params, unique_id) \ No newline at end of file + unique_id = self.make_unique_id() if use_unique_id else None + # TaskLock is not supported in InMemoryTarget, so it's dummy + task_lock_params = make_task_lock_params( + file_path='dummy_path', + unique_id=unique_id, + redis_host=None, + redis_port=None, + redis_timeout=self.redis_timeout, + raise_task_lock_exception_on_collision=False, + ) + return make_in_memory_target('dummy_path', task_lock_params, unique_id) \ No newline at end of file diff --git a/gokart/in_memory/__init__.py b/gokart/in_memory/__init__.py index 69e7e4c3..c418c139 100644 --- a/gokart/in_memory/__init__.py +++ b/gokart/in_memory/__init__.py @@ -1,2 +1,2 @@ from .repository import InMemoryCacheRepository # noqa:F401 -from .target import InMemoryTarget, make_inmemory_target # noqa:F401 +from .target import InMemoryTarget, make_in_memory_target # noqa:F401 diff --git a/gokart/in_memory/data.py b/gokart/in_memory/data.py index a01c3ad2..eb90a834 100644 --- a/gokart/in_memory/data.py +++ b/gokart/in_memory/data.py @@ -1,13 +1,11 @@ from dataclasses import dataclass from datetime import datetime -from typing import Any, Protocol +from typing import Any -class BaseData(Protocol): ... - @dataclass -class InMemoryData(BaseData): +class InMemoryData: value: Any last_modification_time: datetime diff --git a/gokart/in_memory/repository.py b/gokart/in_memory/repository.py index 4d1920b6..d5073ab7 100644 --- a/gokart/in_memory/repository.py +++ b/gokart/in_memory/repository.py @@ -1,12 +1,10 @@ -from typing import Any, Iterator, Protocol +from typing import Any, Iterator from .data import InMemoryData -class BaseRepository(Protocol): ... - -class InMemoryCacheRepository(BaseRepository): +class InMemoryCacheRepository: _cache: dict[str, InMemoryData] = {} def __init__(self): diff --git a/gokart/in_memory/target.py b/gokart/in_memory/target.py index b47e96a7..cb2122b5 100644 --- a/gokart/in_memory/target.py +++ b/gokart/in_memory/target.py @@ -42,5 +42,5 @@ def _path(self) -> str: return self._data_key -def make_inmemory_target(target_key: str, task_lock_params: TaskLockParams): +def make_in_memory_target(target_key: str, task_lock_params: TaskLockParams) -> InMemoryTarget: return InMemoryTarget(target_key, task_lock_params) diff --git a/test/in_memory/test_in_memory_target.py b/test/in_memory/test_in_memory_target.py index c20bfa3b..ae6ca11d 100644 --- a/test/in_memory/test_in_memory_target.py +++ b/test/in_memory/test_in_memory_target.py @@ -4,12 +4,12 @@ import pytest from gokart.conflict_prevention_lock.task_lock import TaskLockParams -from gokart.in_memory import InMemoryCacheRepository, InMemoryTarget, make_inmemory_target +from gokart.in_memory import InMemoryCacheRepository, InMemoryTarget, make_in_memory_target class TestInMemoryTarget: @pytest.fixture - def task_lock_params(self): + def task_lock_params(self) -> TaskLockParams: return TaskLockParams( redis_host=None, redis_port=None, @@ -21,11 +21,11 @@ def task_lock_params(self): ) @pytest.fixture - def target(self, task_lock_params: TaskLockParams): - return make_inmemory_target(target_key='dummy_key', task_lock_params=task_lock_params) + def target(self, task_lock_params: TaskLockParams) -> InMemoryTarget: + return make_in_memory_target(target_key='dummy_key', task_lock_params=task_lock_params) @pytest.fixture(autouse=True) - def clear_repo(self): + def clear_repo(self) -> None: InMemoryCacheRepository().clear() def test_dump_and_load_data(self, target: InMemoryTarget): diff --git a/test/in_memory/test_repository.py b/test/in_memory/test_repository.py index 30c5b033..70d32151 100644 --- a/test/in_memory/test_repository.py +++ b/test/in_memory/test_repository.py @@ -9,7 +9,7 @@ class TestInMemoryCacheRepository: @pytest.fixture - def repo(self): + def repo(self) -> Repo: repo = Repo() repo.clear() return repo From 96d9b6ea16baaef9c8705d72ce189e7179511964 Mon Sep 17 00:00:00 2001 From: utotsubasa Date: Wed, 12 Feb 2025 21:32:20 +0900 Subject: [PATCH 12/17] fix: fix lint errors --- gokart/in_memory/data.py | 1 - gokart/in_memory/repository.py | 1 - 2 files changed, 2 deletions(-) diff --git a/gokart/in_memory/data.py b/gokart/in_memory/data.py index eb90a834..4430af44c 100644 --- a/gokart/in_memory/data.py +++ b/gokart/in_memory/data.py @@ -3,7 +3,6 @@ from typing import Any - @dataclass class InMemoryData: value: Any diff --git a/gokart/in_memory/repository.py b/gokart/in_memory/repository.py index d5073ab7..a90f0178 100644 --- a/gokart/in_memory/repository.py +++ b/gokart/in_memory/repository.py @@ -3,7 +3,6 @@ from .data import InMemoryData - class InMemoryCacheRepository: _cache: dict[str, InMemoryData] = {} From 7adae09f23e676fcc52b99033de5fb163e276f39 Mon Sep 17 00:00:00 2001 From: utotsubasa Date: Wed, 12 Feb 2025 22:19:21 +0900 Subject: [PATCH 13/17] chore: raise Error when using InMemoryTarget with Redis --- gokart/in_memory/target.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/gokart/in_memory/target.py b/gokart/in_memory/target.py index cb2122b5..e4fcee16 100644 --- a/gokart/in_memory/target.py +++ b/gokart/in_memory/target.py @@ -1,5 +1,4 @@ from datetime import datetime -from logging import warning from typing import Any from gokart.in_memory.repository import InMemoryCacheRepository @@ -11,7 +10,7 @@ class InMemoryTarget(TargetOnKart): def __init__(self, data_key: str, task_lock_param: TaskLockParams): if task_lock_param.should_task_lock: - warning(f'Redis in {self.__class__.__name__} is not supported now.') + raise ValueError(f'Redis with `InMemoryTarget` is not currently supported.') self._data_key = data_key self._task_lock_params = task_lock_param From 5796544f4ed7bf5b1c38b710b2ff3ad379bd3e25 Mon Sep 17 00:00:00 2001 From: utotsubasa Date: Wed, 12 Feb 2025 22:21:02 +0900 Subject: [PATCH 14/17] fix: fix a linting error --- gokart/in_memory/target.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gokart/in_memory/target.py b/gokart/in_memory/target.py index e4fcee16..c1fd185a 100644 --- a/gokart/in_memory/target.py +++ b/gokart/in_memory/target.py @@ -10,7 +10,7 @@ class InMemoryTarget(TargetOnKart): def __init__(self, data_key: str, task_lock_param: TaskLockParams): if task_lock_param.should_task_lock: - raise ValueError(f'Redis with `InMemoryTarget` is not currently supported.') + raise ValueError('Redis with `InMemoryTarget` is not currently supported.') self._data_key = data_key self._task_lock_params = task_lock_param From d777e9218cd6bb1365415e8d97a44666e957da50 Mon Sep 17 00:00:00 2001 From: utotsubasa Date: Tue, 11 Feb 2025 12:47:54 +0900 Subject: [PATCH 15/17] feat: add the new `TargetOnKart` entrypoint `make_cache_target` feat: add the new parameter `cache_in_memory_by_default` to switch default Target style: update the variable name from `target_key` to `data_key` for code consistency test: add tests for `TaskOnKart`s with the `cache_in_memory` parameter --- gokart/in_memory/target.py | 13 +- gokart/task.py | 33 +++++- test/in_memory/test_in_memory_target.py | 2 +- test/in_memory/test_task_cached_in_memory.py | 118 +++++++++++++++++++ 4 files changed, 160 insertions(+), 6 deletions(-) create mode 100644 test/in_memory/test_task_cached_in_memory.py diff --git a/gokart/in_memory/target.py b/gokart/in_memory/target.py index c1fd185a..03803d4b 100644 --- a/gokart/in_memory/target.py +++ b/gokart/in_memory/target.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import Any +from typing import Any, Optional from gokart.in_memory.repository import InMemoryCacheRepository from gokart.target import TargetOnKart, TaskLockParams @@ -41,5 +41,12 @@ def _path(self) -> str: return self._data_key -def make_in_memory_target(target_key: str, task_lock_params: TaskLockParams) -> InMemoryTarget: - return InMemoryTarget(target_key, task_lock_params) +def _make_data_key(data_key: str, unique_id: Optional[str] = None) -> str: + if not unique_id: + return data_key + return data_key + '_' + unique_id + + +def make_in_memory_target(data_key: str, task_lock_params: TaskLockParams, unique_id: Optional[str] = None) -> InMemoryTarget: + _data_key = _make_data_key(data_key, unique_id) + return InMemoryTarget(_data_key, task_lock_params) diff --git a/gokart/task.py b/gokart/task.py index f577f64b..6e5eb024 100644 --- a/gokart/task.py +++ b/gokart/task.py @@ -20,9 +20,10 @@ import gokart import gokart.target -from gokart.conflict_prevention_lock.task_lock import make_task_lock_params, make_task_lock_params_for_run +from gokart.conflict_prevention_lock.task_lock import TaskLockParams, make_task_lock_params, make_task_lock_params_for_run from gokart.conflict_prevention_lock.task_lock_wrappers import wrap_run_with_lock from gokart.file_processor import FileProcessor +from gokart.in_memory.target import make_inmemory_target from gokart.pandas_type_config import PandasTypeConfigMap from gokart.parameter import ExplicitBoolParameter, ListTaskInstanceParameter, TaskInstanceParameter from gokart.target import TargetOnKart @@ -105,6 +106,9 @@ class TaskOnKart(luigi.Task, Generic[T]): default=True, description='Check if output file exists at run. If exists, run() will be skipped.', significant=False ) should_lock_run: bool = ExplicitBoolParameter(default=False, significant=False, description='Whether to use redis lock or not at task run.') + cache_in_memory_by_default: bool = ExplicitBoolParameter( + default=False, significant=False, description='If `True`, output is stored on a memory instead of files unless specified.' + ) @property def priority(self): @@ -134,11 +138,13 @@ def __init__(self, *args, **kwargs): task_lock_params = make_task_lock_params_for_run(task_self=self) self.run = wrap_run_with_lock(run_func=self.run, task_lock_params=task_lock_params) # type: ignore + self.make_default_target = self.make_target if not self.cache_in_memory_by_default else self.make_cache_target + def input(self) -> FlattenableItems[TargetOnKart]: return super().input() def output(self) -> FlattenableItems[TargetOnKart]: - return self.make_target() + return self.make_default_target() def requires(self) -> FlattenableItems['TaskOnKart']: tasks = self.make_task_instance_dictionary() @@ -210,11 +216,19 @@ def clone(self, cls=None, **kwargs): return cls(**new_k) def make_target(self, relative_file_path: Optional[str] = None, use_unique_id: bool = True, processor: Optional[FileProcessor] = None) -> TargetOnKart: + # if self.cache_in_memory and processor: + # logger.warning(f"processor {type(processor)} never used.") formatted_relative_file_path = ( relative_file_path if relative_file_path is not None else os.path.join(self.__module__.replace('.', '/'), f'{type(self).__name__}.pkl') ) file_path = os.path.join(self.workspace_directory, formatted_relative_file_path) unique_id = self.make_unique_id() if use_unique_id else None + # if self.cache_in_memory: + # from gokart.target import _make_file_path + # return make_inmemory_target( + # target_key=_make_file_path(file_path, unique_id), + # task_lock_params=TaskLockParams(None, None, None, "hoge", False, False, 100) + # ) task_lock_params = make_task_lock_params( file_path=file_path, @@ -229,6 +243,21 @@ def make_target(self, relative_file_path: Optional[str] = None, use_unique_id: b file_path=file_path, unique_id=unique_id, processor=processor, task_lock_params=task_lock_params, store_index_in_feather=self.store_index_in_feather ) + def make_cache_target(self, data_key: Optional[str] = None, use_unique_id: bool = True): + _data_key = data_key if data_key else os.path.join(self.__module__.replace('.', '/'), type(self).__name__) + unique_id = self.make_unique_id() if use_unique_id else None + # TODO: combine with redis + task_lock_params = TaskLockParams( + redis_host=None, + redis_port=None, + redis_timeout=None, + redis_key='redis_key', + should_task_lock=False, + raise_task_lock_exception_on_collision=False, + lock_extend_seconds=-1, + ) + return make_inmemory_target(_data_key, task_lock_params, unique_id) + def make_large_data_frame_target(self, relative_file_path: Optional[str] = None, use_unique_id: bool = True, max_byte=int(2**26)) -> TargetOnKart: formatted_relative_file_path = ( relative_file_path if relative_file_path is not None else os.path.join(self.__module__.replace('.', '/'), f'{type(self).__name__}.zip') diff --git a/test/in_memory/test_in_memory_target.py b/test/in_memory/test_in_memory_target.py index ae6ca11d..a6efccd0 100644 --- a/test/in_memory/test_in_memory_target.py +++ b/test/in_memory/test_in_memory_target.py @@ -22,7 +22,7 @@ def task_lock_params(self) -> TaskLockParams: @pytest.fixture def target(self, task_lock_params: TaskLockParams) -> InMemoryTarget: - return make_in_memory_target(target_key='dummy_key', task_lock_params=task_lock_params) + return make_in_memory_target(data_key='dummy_key', task_lock_params=task_lock_params) @pytest.fixture(autouse=True) def clear_repo(self) -> None: diff --git a/test/in_memory/test_task_cached_in_memory.py b/test/in_memory/test_task_cached_in_memory.py new file mode 100644 index 00000000..a874ee6b --- /dev/null +++ b/test/in_memory/test_task_cached_in_memory.py @@ -0,0 +1,118 @@ +from typing import Optional, Type, Union + +import luigi +import pytest + +import gokart +from gokart.in_memory import InMemoryCacheRepository, InMemoryTarget +from gokart.target import SingleFileTarget + + +class DummyTask(gokart.TaskOnKart): + task_namespace = __name__ + param: str = luigi.Parameter() + + def run(self): + self.dump(self.param) + + +class DummyTaskWithDependencies(gokart.TaskOnKart): + task_namespace = __name__ + task: list[gokart.TaskOnKart[str]] = gokart.ListTaskInstanceParameter() + + def run(self): + result = ','.join(self.load()) + self.dump(result) + + +class DumpIntTask(gokart.TaskOnKart[int]): + task_namespace = __name__ + value: int = luigi.IntParameter() + + def run(self): + self.dump(self.value) + + +class AddTask(gokart.TaskOnKart[Union[int, float]]): + a: gokart.TaskOnKart[int] = gokart.TaskInstanceParameter() + b: gokart.TaskOnKart[int] = gokart.TaskInstanceParameter() + + def requires(self): + return dict(a=self.a, b=self.b) + + def run(self): + a = self.load(self.a) + b = self.load(self.b) + self.dump(a + b) + + +class TestTaskOnKartWithCache: + @pytest.fixture(autouse=True) + def clear_repository(slef): + InMemoryCacheRepository().clear() + + @pytest.mark.parametrize('data_key', ['sample_key', None]) + @pytest.mark.parametrize('use_unique_id', [True, False]) + def test_key_identity(self, data_key: Optional[str], use_unique_id: bool): + task = DummyTask(param='param') + ext = '.pkl' + relative_file_path = data_key + ext if data_key else None + target = task.make_target(relative_file_path=relative_file_path, use_unique_id=use_unique_id) + cached_target = task.make_cache_target(data_key=data_key, use_unique_id=use_unique_id) + + target_path = target.path().removeprefix(task.workspace_directory).removesuffix(ext).strip('/') + assert cached_target.path() == target_path + + def test_make_cached_target(self): + task = DummyTask(param='param') + target = task.make_cache_target() + assert isinstance(target, InMemoryTarget) + + @pytest.mark.parametrize(['cache_in_memory_by_default', 'target_type'], [[True, InMemoryTarget], [False, SingleFileTarget]]) + def test_make_default_target(self, cache_in_memory_by_default: bool, target_type: Type[gokart.TaskOnKart]): + task = DummyTask(param='param', cache_in_memory_by_default=cache_in_memory_by_default) + target = task.output() + assert isinstance(target, target_type) + + def test_complete_with_cache_in_memory_flag(self, tmpdir): + task = DummyTask(param='param', cache_in_memory_by_default=True, workspace_directory=tmpdir) + assert not task.complete() + file_target = task.make_target() + file_target.dump('data') + assert not task.complete() + cache_target = task.make_cache_target() + cache_target.dump('data') + assert task.complete() + + def test_complete_without_cache_in_memory_flag(self, tmpdir): + task = DummyTask(param='param', workspace_directory=tmpdir) + assert not task.complete() + cache_target = task.make_cache_target() + cache_target.dump('data') + assert not task.complete() + file_target = task.make_target() + file_target.dump('data') + assert task.complete() + + def test_dump_with_cache_in_memory_flag(self, tmpdir): + task = DummyTask(param='param', cache_in_memory_by_default=True, workspace_directory=tmpdir) + file_target = task.make_target() + cache_target = task.make_cache_target() + task.dump('data') + assert not file_target.exists() + assert cache_target.exists() + + def test_dump_without_cache_in_memory_flag(self, tmpdir): + task = DummyTask(param='param', workspace_directory=tmpdir) + file_target = task.make_target() + cache_target = task.make_cache_target() + task.dump('data') + assert file_target.exists() + assert not cache_target.exists() + + def test_gokart_build(self): + task = AddTask( + a=DumpIntTask(value=2, cache_in_memory_by_default=True), b=DumpIntTask(value=3, cache_in_memory_by_default=True), cache_in_memory_by_default=True + ) + output = gokart.build(task, reset_register=False) + assert output == 5 From 76cb25560de79306654a07e68e3b504e717b88c0 Mon Sep 17 00:00:00 2001 From: utotsubasa Date: Tue, 11 Feb 2025 12:54:22 +0900 Subject: [PATCH 16/17] chore: delete unecessary comments --- gokart/task.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/gokart/task.py b/gokart/task.py index 6e5eb024..8eac941d 100644 --- a/gokart/task.py +++ b/gokart/task.py @@ -216,19 +216,11 @@ def clone(self, cls=None, **kwargs): return cls(**new_k) def make_target(self, relative_file_path: Optional[str] = None, use_unique_id: bool = True, processor: Optional[FileProcessor] = None) -> TargetOnKart: - # if self.cache_in_memory and processor: - # logger.warning(f"processor {type(processor)} never used.") formatted_relative_file_path = ( relative_file_path if relative_file_path is not None else os.path.join(self.__module__.replace('.', '/'), f'{type(self).__name__}.pkl') ) file_path = os.path.join(self.workspace_directory, formatted_relative_file_path) unique_id = self.make_unique_id() if use_unique_id else None - # if self.cache_in_memory: - # from gokart.target import _make_file_path - # return make_inmemory_target( - # target_key=_make_file_path(file_path, unique_id), - # task_lock_params=TaskLockParams(None, None, None, "hoge", False, False, 100) - # ) task_lock_params = make_task_lock_params( file_path=file_path, From 1a349af12b877fd562eb449093d9efca9dcd4553 Mon Sep 17 00:00:00 2001 From: utotsubasa Date: Sun, 16 Feb 2025 21:11:30 +0900 Subject: [PATCH 17/17] fix: fix typo style: add a type hint --- gokart/task.py | 6 +++--- test/in_memory/test_task_cached_in_memory.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/gokart/task.py b/gokart/task.py index 8eac941d..77080cb1 100644 --- a/gokart/task.py +++ b/gokart/task.py @@ -23,7 +23,7 @@ from gokart.conflict_prevention_lock.task_lock import TaskLockParams, make_task_lock_params, make_task_lock_params_for_run from gokart.conflict_prevention_lock.task_lock_wrappers import wrap_run_with_lock from gokart.file_processor import FileProcessor -from gokart.in_memory.target import make_inmemory_target +from gokart.in_memory.target import InMemoryTarget, make_in_memory_target from gokart.pandas_type_config import PandasTypeConfigMap from gokart.parameter import ExplicitBoolParameter, ListTaskInstanceParameter, TaskInstanceParameter from gokart.target import TargetOnKart @@ -235,7 +235,7 @@ def make_target(self, relative_file_path: Optional[str] = None, use_unique_id: b file_path=file_path, unique_id=unique_id, processor=processor, task_lock_params=task_lock_params, store_index_in_feather=self.store_index_in_feather ) - def make_cache_target(self, data_key: Optional[str] = None, use_unique_id: bool = True): + def make_cache_target(self, data_key: Optional[str] = None, use_unique_id: bool = True) -> InMemoryTarget: _data_key = data_key if data_key else os.path.join(self.__module__.replace('.', '/'), type(self).__name__) unique_id = self.make_unique_id() if use_unique_id else None # TODO: combine with redis @@ -248,7 +248,7 @@ def make_cache_target(self, data_key: Optional[str] = None, use_unique_id: bool raise_task_lock_exception_on_collision=False, lock_extend_seconds=-1, ) - return make_inmemory_target(_data_key, task_lock_params, unique_id) + return make_in_memory_target(_data_key, task_lock_params, unique_id) def make_large_data_frame_target(self, relative_file_path: Optional[str] = None, use_unique_id: bool = True, max_byte=int(2**26)) -> TargetOnKart: formatted_relative_file_path = ( diff --git a/test/in_memory/test_task_cached_in_memory.py b/test/in_memory/test_task_cached_in_memory.py index a874ee6b..2d09a754 100644 --- a/test/in_memory/test_task_cached_in_memory.py +++ b/test/in_memory/test_task_cached_in_memory.py @@ -48,7 +48,7 @@ def run(self): class TestTaskOnKartWithCache: @pytest.fixture(autouse=True) - def clear_repository(slef): + def clear_repository(self) -> None: InMemoryCacheRepository().clear() @pytest.mark.parametrize('data_key', ['sample_key', None])