diff --git a/app/containers.py b/app/containers.py index 08bf769..977a891 100644 --- a/app/containers.py +++ b/app/containers.py @@ -3,6 +3,7 @@ from minio import Minio import os +from app.gateways.gatekeeper import GatekeeperGateway from app.services.zipper import ZipperService logger = logging.getLogger("uvicorn") @@ -26,8 +27,16 @@ class Container(containers.DeclarativeContainer): secure=False, ) + gatekeeper_gateway = providers.Factory( + GatekeeperGateway, + base_url=config.gatekeeper.base_url, + api_key=config.gatekeeper.api_key, + api_secret=config.gatekeeper.api_secret, + ) + zipper_service = providers.Factory( ZipperService, minio_client=minio_client, + gatekeeper_gateway=gatekeeper_gateway, temp_dir=config.temp_dir, ) diff --git a/app/controllers/v1/resources.py b/app/controllers/v1/resources.py index cfd43b5..f00eb87 100644 --- a/app/controllers/v1/resources.py +++ b/app/controllers/v1/resources.py @@ -1,3 +1,4 @@ +from uuid import UUID from pydantic import BaseModel, Field @@ -8,7 +9,7 @@ class CreateZipRequest(BaseModel): class CreateZipResponse(BaseModel): - success: bool = Field(..., description="Success flag") + id: UUID = Field(..., description="Zip process ID") + status: str = Field(..., description="Zip file status") message: str | None = Field(None, description="Error message") - bucket: str | None = Field(None, description="Bucket name") name: str | None = Field(None, description="Zip file name") diff --git a/app/controllers/v1/zipper.py b/app/controllers/v1/zipper.py index fa0c89e..e56ee05 100644 --- a/app/controllers/v1/zipper.py +++ b/app/controllers/v1/zipper.py @@ -3,10 +3,10 @@ from app.containers import Container from app.controllers.v1.resources import CreateZipRequest, CreateZipResponse +from app.models.zipper import ZipStatus from app.services.zipper import ZipperService router = APIRouter( - prefix="/zip", tags=["zip"], dependencies=[], responses={ @@ -16,32 +16,39 @@ ) -# POST /api/v1/zip +# POST /api/v1/datasets/:dataset_id/versions/:version/zip @router.post( - path="/", + path="/datasets/{dataset_id}/versions/{version}/zip", status_code=201, description="Zip dataset files", response_model_exclude_none=True, ) @inject def zip( + dataset_id: str, + version: str, payload: CreateZipRequest, response: Response, service: ZipperService = Depends(Provide[Container.zipper_service]), ) -> CreateZipResponse: zipped_resource = service.zip_files( - bucket=payload.bucket, file_names=payload.files, zip_name=payload.zip_name + dataset_id=dataset_id, + version=version, + bucket=payload.bucket, + file_names=payload.files, + zip_name=payload.zip_name, ) - if not zipped_resource.success: + if zipped_resource.status == ZipStatus.FAILURE: response.status_code = 500 return CreateZipResponse( - success=False, + id=zipped_resource.id, + status=zipped_resource.status.name, message="Failed to zip files", ) return CreateZipResponse( - success=True, - bucket=zipped_resource.bucket, + id=zipped_resource.id, name=zipped_resource.name, + status=zipped_resource.status.name, ) diff --git a/app/gateways/__init__.py b/app/gateways/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/gateways/gatekeeper.py b/app/gateways/gatekeeper.py new file mode 100644 index 0000000..24b4576 --- /dev/null +++ b/app/gateways/gatekeeper.py @@ -0,0 +1,28 @@ +import logging +import requests + +from app.models.zipper import ZippedResource + +logger = logging.getLogger("uvicorn") + + +class GatekeeperGateway: + def __init__(self, base_url: str, api_key: str, api_secret: str): + self._base_url = base_url + self._base_headers = { + "Content-Type": "application/json", + "X-Api-Key": api_key, + "X-Api-Secret": str(api_secret), + } + + def post_zip_callback(self, zipped_resource: ZippedResource): + url = f"{self._base_url}/v1/internal/datasets/{zipped_resource.dataset_id}/versions/{zipped_resource.version}/files/zip/{zipped_resource.id}" + response = requests.post( + url=url, + headers=self._base_headers, + json={"status": zipped_resource.status.name}, + ) + if not response.status_code == 200: + logger.error( + f"POST request failed with status code {response.status_code} for process id {zipped_resource.id}." + ) diff --git a/app/gateways/gatekeeper_test.py b/app/gateways/gatekeeper_test.py new file mode 100644 index 0000000..60890f9 --- /dev/null +++ b/app/gateways/gatekeeper_test.py @@ -0,0 +1,55 @@ +import unittest +from unittest.mock import patch +from app.models.zipper import ZippedResource, ZipStatus +from app.gateways.gatekeeper import GatekeeperGateway +import uuid + + +class TestGatekeeperGateway(unittest.TestCase): + def setUp(self): + self.base_url = "http://example.com" + self.gatekeeper_gateway = GatekeeperGateway(self.base_url) + self.zipped_resource = ZippedResource( + id=uuid.uuid4(), + dataset_id=uuid.uuid4(), + version="1.0", + status=ZipStatus.SUCCESS, + ) + + @patch("app.gateways.gatekeeper.requests.post") + def test_post_zip_callback_success(self, mock_post): + # given + mock_post.return_value.status_code = 200 + + # when + self.gatekeeper_gateway.post_zip_callback(self.zipped_resource) + + # then + mock_post.assert_called_once() + expected_url = f"{self.base_url}/datasets/{self.zipped_resource.dataset_id}/versions/{self.zipped_resource.version}/files/zip/{self.zipped_resource.id}" + mock_post.assert_called_with( + url=expected_url, data={"status": self.zipped_resource.status.name} + ) + + @patch("app.gateways.gatekeeper.requests.post") + @patch("app.gateways.gatekeeper.logger") + def test_post_zip_callback_failure(self, mock_logger, mock_post): + # given + mock_post.return_value.status_code = 500 + + # when + self.gatekeeper_gateway.post_zip_callback(self.zipped_resource) + + # then + mock_post.assert_called_once() + expected_url = f"{self.base_url}/datasets/{self.zipped_resource.dataset_id}/versions/{self.zipped_resource.version}/files/zip/{self.zipped_resource.id}" + mock_post.assert_called_with( + url=expected_url, data={"status": self.zipped_resource.status.name} + ) + mock_logger.error.assert_called_once_with( + f"POST request failed with status code 500 for process id {self.zipped_resource.id}." + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/app/models/__init__.py b/app/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/models/zipper.py b/app/models/zipper.py index 885b7dd..2e809fd 100644 --- a/app/models/zipper.py +++ b/app/models/zipper.py @@ -1,7 +1,26 @@ +import enum +from uuid import UUID + + +class ZipStatus(enum.Enum): + IN_PROGRESS = "IN_PROGRESS" + SUCCESS = "SUCCESS" + FAILURE = "FAILURE" + + class ZippedResource: def __init__( - self, success: bool, bucket: str | None = None, name: str | None = None + self, + id: UUID, + status: ZipStatus, + dataset_id: UUID, + version: str, + bucket: str | None = None, + name: str | None = None, ): - self.success = success + self.id = id self.bucket = bucket self.name = name + self.status = status + self.dataset_id = dataset_id + self.version = version diff --git a/app/services/zipper.py b/app/services/zipper.py index c3ea51a..d0fa137 100644 --- a/app/services/zipper.py +++ b/app/services/zipper.py @@ -1,31 +1,37 @@ import logging import os import tempfile +import threading import zipfile from minio import Minio import uuid -from app.models.zipper import ZippedResource +from app.gateways.gatekeeper import GatekeeperGateway +from app.models.zipper import ZipStatus, ZippedResource logger = logging.getLogger("uvicorn") class ZipperService: - def __init__(self, minio_client: Minio, temp_dir: str): + def __init__( + self, minio_client: Minio, gatekeeper_gateway: GatekeeperGateway, temp_dir: str + ): self._minio_client = minio_client + self._gatekeeper_gateway = gatekeeper_gateway self._temp_dir = temp_dir - def zip_files( - self, bucket: str, file_names: list[str], zip_name: str | None = None - ) -> ZippedResource: - if not file_names: - return ZippedResource(success=False) - - if not zip_name: - zip_name = f"{uuid.uuid4()}.zip" - - if ".zip" not in zip_name: - zip_name = f"{zip_name}.zip" + def _zip_files( + self, + id: uuid.UUID, + dataset_id: uuid.UUID, + version: str, + bucket: str, + file_names: list[str], + zip_name: str, + ): + logger.info( + f"Zipping files: {file_names} to {zip_name} in bucket {bucket} with process ID {id}" + ) if not os.path.exists(self._temp_dir): os.makedirs(self._temp_dir) @@ -48,10 +54,54 @@ def zip_files( self._minio_client.fput_object(bucket, zip_name, temp_zip_file.name) logger.info(f"Zipped files to {zip_name} in bucket {bucket}") except Exception as e: - logger.error(f"Failed to zip files: {e}") - return ZippedResource(success=False) + logger.error(f"Failed to zip files for process id {id}: {e}") finally: logger.info(f"Removing temporary zip file: {temp_zip_file.name}") os.remove(temp_zip_file.name) + logger.info(f"Finished zipping files for process id {id}") + self._gatekeeper_gateway.post_zip_callback( + ZippedResource( + id=id, + dataset_id=dataset_id, + version=version, + status=ZipStatus.SUCCESS, + ) + ) + + def zip_files( + self, + dataset_id: uuid.UUID, + version: str, + bucket: str, + file_names: list[str], + zip_name: str | None = None, + ) -> ZippedResource: + process_id = uuid.uuid4() + + if not file_names: + return ZippedResource( + id=process_id, + dataset_id=dataset_id, + version=version, + status=ZipStatus.FAILURE, + ) + + if not zip_name: + zip_name = f"{uuid.uuid4()}.zip" + + if ".zip" not in zip_name: + zip_name = f"{zip_name}.zip" + + threading.Thread( + target=self._zip_files, + args=(process_id, dataset_id, version, bucket, file_names, zip_name), + ).start() - return ZippedResource(success=True, bucket=bucket, name=zip_name) + return ZippedResource( + id=process_id, + dataset_id=dataset_id, + version=version, + status=ZipStatus.IN_PROGRESS, + bucket=bucket, + name=zip_name, + ) diff --git a/app/services/zipper_test.py b/app/services/zipper_test.py index 67b94cd..08077f1 100644 --- a/app/services/zipper_test.py +++ b/app/services/zipper_test.py @@ -1,8 +1,10 @@ +import os import shutil import tempfile import unittest -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import uuid +from app.models.zipper import ZipStatus from app.services.zipper import ZipperService @@ -27,113 +29,213 @@ def setUp(self) -> None: def tearDown(self) -> None: shutil.rmtree(self.test_dir) - def test_zip_files(self): + @patch("app.services.zipper.threading.Thread") + def test_zip_files(self, MockThread): # given minio_client = MagicMock() - minio_client.get_object.return_value = MockResponse() - minio_client.fput_object = MagicMock() - zipper = ZipperService(minio_client=minio_client, temp_dir=self.test_dir) + gatekeeper_gateway = MagicMock() + zipper = ZipperService( + minio_client=minio_client, + gatekeeper_gateway=gatekeeper_gateway, + temp_dir=self.test_dir, + ) bucket = "bucket" file_names = ["file1", "file2"] zip_name = "tmp.zip" + dataset_id = uuid.uuid4() + version = "1.0" # when - result = zipper.zip_files(bucket, file_names, zip_name) + result = zipper.zip_files(dataset_id, version, bucket, file_names, zip_name) # then - minio_client.get_object.assert_any_call(bucket_name=bucket, object_name="file1") - minio_client.get_object.assert_any_call(bucket_name=bucket, object_name="file2") - self.assertIn(self.test_dir, minio_client.fput_object.call_args[0][2]) - self.assertEqual(bucket, minio_client.fput_object.call_args[0][0]) - self.assertEqual(f"{zip_name}", minio_client.fput_object.call_args[0][1]) - self.assertEqual(result.success, True) + self.assertTrue(isinstance(result.id, uuid.UUID)) + self.assertEqual(result.dataset_id, dataset_id) + self.assertEqual(result.version, version) + self.assertEqual(result.status, ZipStatus.IN_PROGRESS) self.assertEqual(result.bucket, bucket) self.assertEqual(result.name, zip_name) + MockThread.assert_called_once() + MockThread.return_value.start.assert_called_once() - def test_zip_files_no_zip_name(self): + @patch("app.services.zipper.threading.Thread") + def test_zip_files_no_zip_name(self, MockThread): # given minio_client = MagicMock() - minio_client.get_object.return_value = MockResponse() - minio_client.fput_object = MagicMock() - zipper = ZipperService(minio_client=minio_client, temp_dir=self.test_dir) + gatekeeper_gateway = MagicMock() + zipper = ZipperService( + minio_client=minio_client, + gatekeeper_gateway=gatekeeper_gateway, + temp_dir=self.test_dir, + ) bucket = "bucket" file_names = ["file1", "file2"] + dataset_id = uuid.uuid4() + version = "1.0" # when - result = zipper.zip_files(bucket, file_names) + result = zipper.zip_files(dataset_id, version, bucket, file_names) # then - minio_client.get_object.assert_any_call(bucket_name=bucket, object_name="file1") - minio_client.get_object.assert_any_call(bucket_name=bucket, object_name="file2") - self.assertIn(self.test_dir, minio_client.fput_object.call_args[0][2]) - self.assertEqual(bucket, minio_client.fput_object.call_args[0][0]) - self.assertEqual(result.success, True) + self.assertTrue(isinstance(result.id, uuid.UUID)) + self.assertEqual(result.status, ZipStatus.IN_PROGRESS) self.assertEqual(result.bucket, bucket) + self.assertEqual(result.dataset_id, dataset_id) + self.assertEqual(result.version, version) self.assertTrue(result.name.endswith(".zip")) self.assertEqual(4, uuid.UUID(result.name.split(".zip")[0]).version) + MockThread.assert_called_once() + MockThread.return_value.start.assert_called_once() - def test_zip_files_exception(self): + def test_zip_files_no_files(self): # given minio_client = MagicMock() - minio_client.get_object.return_value = MockResponse() - minio_client.fput_object.side_effect = Exception("error") - zipper = ZipperService(minio_client=minio_client, temp_dir=self.test_dir) + gatekeeper_gateway = MagicMock() + zipper = ZipperService( + minio_client=minio_client, + gatekeeper_gateway=gatekeeper_gateway, + temp_dir=self.test_dir, + ) bucket = "bucket" - file_names = ["file1", "file2"] + file_names = [] + dataset_id = uuid.uuid4() + version = "1.0" # when - result = zipper.zip_files(bucket, file_names) + result = zipper.zip_files(dataset_id, version, bucket, file_names) # then - self.assertEqual(result.success, False) + self.assertTrue(isinstance(result.id, uuid.UUID)) + self.assertEqual(result.status, ZipStatus.FAILURE) + self.assertEqual(result.dataset_id, dataset_id) + self.assertEqual(result.version, version) self.assertIsNone(result.bucket) self.assertIsNone(result.name) - def test_zip_files_no_files(self): + @patch("app.services.zipper.threading.Thread") + def test_zip_files_no_zip_extension(self, MockThread): # given minio_client = MagicMock() - zipper = ZipperService(minio_client=minio_client, temp_dir=self.test_dir) + gatekeeper_gateway = MagicMock() + zipper = ZipperService( + minio_client=minio_client, + gatekeeper_gateway=gatekeeper_gateway, + temp_dir=self.test_dir, + ) bucket = "bucket" - file_names = [] + file_names = ["file1", "file2"] + zip_name = "tmp" + dataset_id = uuid.uuid4() + version = "1.0" # when - result = zipper.zip_files(bucket, file_names) + result = zipper.zip_files(dataset_id, version, bucket, file_names, zip_name) # then - self.assertEqual(result.success, False) - self.assertIsNone(result.bucket) - self.assertIsNone(result.name) + self.assertTrue(isinstance(result.id, uuid.UUID)) + self.assertEqual(result.status, ZipStatus.IN_PROGRESS) + self.assertEqual(result.bucket, bucket) + self.assertEqual(result.name, zip_name + ".zip") + self.assertEqual(result.dataset_id, dataset_id) + self.assertEqual(result.version, version) + MockThread.assert_called_once() + MockThread.return_value.start.assert_called_once() - def test_zip_files_no_bucket(self): + def test__zip_files(self): # given minio_client = MagicMock() - zipper = ZipperService(minio_client=minio_client, temp_dir=self.test_dir) - bucket = "" + gatekeeper_gateway = MagicMock() + minio_client.get_object.return_value = MockResponse() + minio_client.fput_object = MagicMock() + zipper = ZipperService( + minio_client=minio_client, + gatekeeper_gateway=gatekeeper_gateway, + temp_dir=self.test_dir, + ) + bucket = "bucket" file_names = ["file1", "file2"] + zip_name = "tmp.zip" + process_id = uuid.uuid4() + dataset_id = uuid.uuid4() + version = "1.0" # when - result = zipper.zip_files(bucket, file_names) + zipper._zip_files(process_id, dataset_id, version, bucket, file_names, zip_name) # then - self.assertEqual(result.success, False) - self.assertIsNone(result.bucket) - self.assertIsNone(result.name) + minio_client.get_object.assert_any_call(bucket_name=bucket, object_name="file1") + minio_client.get_object.assert_any_call(bucket_name=bucket, object_name="file2") + self.assertIn(self.test_dir, minio_client.fput_object.call_args[0][2]) + self.assertEqual(bucket, minio_client.fput_object.call_args[0][0]) + self.assertEqual(f"{zip_name}", minio_client.fput_object.call_args[0][1]) + gatekeeper_gateway.post_zip_callback.assert_called_once() + self.assertEqual( + gatekeeper_gateway.post_zip_callback.call_args[0][0].status, + ZipStatus.SUCCESS, + ) + + def test__zip_files_exception(self): + # given + minio_client = MagicMock() + gatekeeper_gateway = MagicMock() + minio_client.get_object.return_value = MockResponse() + minio_client.fput_object.side_effect = Exception("error") + zipper = ZipperService( + minio_client=minio_client, + gatekeeper_gateway=gatekeeper_gateway, + temp_dir=self.test_dir, + ) + bucket = "bucket" + file_names = ["file1", "file2"] + zip_name = "tmp.zip" + process_id = uuid.uuid4() + dataset_id = uuid.uuid4() + version = "1.0" + + # when + with self.assertLogs("uvicorn", level="ERROR") as cm: + zipper._zip_files( + process_id, dataset_id, version, bucket, file_names, zip_name + ) + + # then + self.assertIn( + f"ERROR:uvicorn:Failed to zip files for process id {process_id}:", + cm.output[0], + ) + minio_client.get_object.assert_any_call(bucket_name=bucket, object_name="file1") + minio_client.get_object.assert_any_call(bucket_name=bucket, object_name="file2") + gatekeeper_gateway.post_zip_callback.assert_called_once() + self.assertEqual( + gatekeeper_gateway.post_zip_callback.call_args[0][0].status, + ZipStatus.SUCCESS, + ) - def test_zip_files_no_zip_extension(self): + def test__zip_files_tempfile_cleanup(self): # given minio_client = MagicMock() + gatekeeper_gateway = MagicMock() minio_client.get_object.return_value = MockResponse() minio_client.fput_object = MagicMock() - zipper = ZipperService(minio_client=minio_client, temp_dir=self.test_dir) + zipper = ZipperService( + minio_client=minio_client, + gatekeeper_gateway=gatekeeper_gateway, + temp_dir=self.test_dir, + ) bucket = "bucket" file_names = ["file1", "file2"] - zip_name = "tmp" + zip_name = "tmp.zip" + process_id = uuid.uuid4() + dataset_id = uuid.uuid4() + version = "1.0" # when - result = zipper.zip_files(bucket, file_names, zip_name) + zipper._zip_files(process_id, dataset_id, version, bucket, file_names, zip_name) # then - self.assertIn(".zip", result.name) - self.assertEqual(result.success, True) - self.assertEqual(result.bucket, bucket) - self.assertEqual(result.name, zip_name + ".zip") + temp_files = os.listdir(self.test_dir) + self.assertNotIn(zip_name, temp_files) + + +if __name__ == "__main__": + unittest.main() diff --git a/local_config.yml b/local_config.yml index ae1efd9..d4ae1a9 100644 --- a/local_config.yml +++ b/local_config.yml @@ -3,4 +3,9 @@ minio: access_key: vmI02fswfhdHP1mmoASs secret_key: NhZqdCsEDS6gLwcYXU6B48w244H7IA0GbYOoXWaC +gatekeeper: + base_url: http://localhost:9092 + api_key: 2836396d-7316-4db2-859b-d9047d4b3469 + api_secret: 1234 + temp_dir: ../tmp_zip diff --git a/staging_config.yml b/staging_config.yml index f9924ca..450801a 100644 --- a/staging_config.yml +++ b/staging_config.yml @@ -3,4 +3,9 @@ minio: access_key: vmI02fswfhdHP1mmoASs secret_key: NhZqdCsEDS6gLwcYXU6B48w244H7IA0GbYOoXWaC +gatekeeper: + base_url: http://datamap_gatekeeper:9092 + api_key: 2836396d-7316-4db2-859b-d9047d4b3469 + api_secret: 1234 + temp_dir: ../tmp_zip