Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions app/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from minio import Minio
import os

from app.gateways.gatekeeper import GatekeeperGateway
from app.services.zipper import ZipperService

logger = logging.getLogger("uvicorn")
Expand All @@ -26,8 +27,16 @@ class Container(containers.DeclarativeContainer):
secure=False,
)

gatekeeper_gateway = providers.Factory(
GatekeeperGateway,
base_url=config.gatekeeper.base_url,
api_key=config.gatekeeper.api_key,
api_secret=config.gatekeeper.api_secret,
)

zipper_service = providers.Factory(
ZipperService,
minio_client=minio_client,
gatekeeper_gateway=gatekeeper_gateway,
temp_dir=config.temp_dir,
)
5 changes: 3 additions & 2 deletions app/controllers/v1/resources.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from uuid import UUID
from pydantic import BaseModel, Field


Expand All @@ -8,7 +9,7 @@ class CreateZipRequest(BaseModel):


class CreateZipResponse(BaseModel):
success: bool = Field(..., description="Success flag")
id: UUID = Field(..., description="Zip process ID")
status: str = Field(..., description="Zip file status")
message: str | None = Field(None, description="Error message")
bucket: str | None = Field(None, description="Bucket name")
name: str | None = Field(None, description="Zip file name")
23 changes: 15 additions & 8 deletions app/controllers/v1/zipper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@

from app.containers import Container
from app.controllers.v1.resources import CreateZipRequest, CreateZipResponse
from app.models.zipper import ZipStatus
from app.services.zipper import ZipperService

router = APIRouter(
prefix="/zip",
tags=["zip"],
dependencies=[],
responses={
Expand All @@ -16,32 +16,39 @@
)


# POST /api/v1/zip
# POST /api/v1/datasets/:dataset_id/versions/:version/zip
@router.post(
path="/",
path="/datasets/{dataset_id}/versions/{version}/zip",
status_code=201,
description="Zip dataset files",
response_model_exclude_none=True,
)
@inject
def zip(
dataset_id: str,
version: str,
payload: CreateZipRequest,
response: Response,
service: ZipperService = Depends(Provide[Container.zipper_service]),
) -> CreateZipResponse:
zipped_resource = service.zip_files(
bucket=payload.bucket, file_names=payload.files, zip_name=payload.zip_name
dataset_id=dataset_id,
version=version,
bucket=payload.bucket,
file_names=payload.files,
zip_name=payload.zip_name,
)

if not zipped_resource.success:
if zipped_resource.status == ZipStatus.FAILURE:
response.status_code = 500
return CreateZipResponse(
success=False,
id=zipped_resource.id,
status=zipped_resource.status.name,
message="Failed to zip files",
)

return CreateZipResponse(
success=True,
bucket=zipped_resource.bucket,
id=zipped_resource.id,
name=zipped_resource.name,
status=zipped_resource.status.name,
)
Empty file added app/gateways/__init__.py
Empty file.
28 changes: 28 additions & 0 deletions app/gateways/gatekeeper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import logging
import requests

from app.models.zipper import ZippedResource

logger = logging.getLogger("uvicorn")


class GatekeeperGateway:
def __init__(self, base_url: str, api_key: str, api_secret: str):
self._base_url = base_url
self._base_headers = {
"Content-Type": "application/json",
"X-Api-Key": api_key,
"X-Api-Secret": str(api_secret),
}

def post_zip_callback(self, zipped_resource: ZippedResource):
url = f"{self._base_url}/v1/internal/datasets/{zipped_resource.dataset_id}/versions/{zipped_resource.version}/files/zip/{zipped_resource.id}"
response = requests.post(
url=url,
headers=self._base_headers,
json={"status": zipped_resource.status.name},
)
if not response.status_code == 200:
logger.error(
f"POST request failed with status code {response.status_code} for process id {zipped_resource.id}."
)
55 changes: 55 additions & 0 deletions app/gateways/gatekeeper_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import unittest
from unittest.mock import patch
from app.models.zipper import ZippedResource, ZipStatus
from app.gateways.gatekeeper import GatekeeperGateway
import uuid


class TestGatekeeperGateway(unittest.TestCase):
def setUp(self):
self.base_url = "http://example.com"
self.gatekeeper_gateway = GatekeeperGateway(self.base_url)
self.zipped_resource = ZippedResource(
id=uuid.uuid4(),
dataset_id=uuid.uuid4(),
version="1.0",
status=ZipStatus.SUCCESS,
)

@patch("app.gateways.gatekeeper.requests.post")
def test_post_zip_callback_success(self, mock_post):
# given
mock_post.return_value.status_code = 200

# when
self.gatekeeper_gateway.post_zip_callback(self.zipped_resource)

# then
mock_post.assert_called_once()
expected_url = f"{self.base_url}/datasets/{self.zipped_resource.dataset_id}/versions/{self.zipped_resource.version}/files/zip/{self.zipped_resource.id}"
mock_post.assert_called_with(
url=expected_url, data={"status": self.zipped_resource.status.name}
)

@patch("app.gateways.gatekeeper.requests.post")
@patch("app.gateways.gatekeeper.logger")
def test_post_zip_callback_failure(self, mock_logger, mock_post):
# given
mock_post.return_value.status_code = 500

# when
self.gatekeeper_gateway.post_zip_callback(self.zipped_resource)

# then
mock_post.assert_called_once()
expected_url = f"{self.base_url}/datasets/{self.zipped_resource.dataset_id}/versions/{self.zipped_resource.version}/files/zip/{self.zipped_resource.id}"
mock_post.assert_called_with(
url=expected_url, data={"status": self.zipped_resource.status.name}
)
mock_logger.error.assert_called_once_with(
f"POST request failed with status code 500 for process id {self.zipped_resource.id}."
)


if __name__ == "__main__":
unittest.main()
Empty file added app/models/__init__.py
Empty file.
23 changes: 21 additions & 2 deletions app/models/zipper.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,26 @@
import enum
from uuid import UUID


class ZipStatus(enum.Enum):
IN_PROGRESS = "IN_PROGRESS"
SUCCESS = "SUCCESS"
FAILURE = "FAILURE"


class ZippedResource:
def __init__(
self, success: bool, bucket: str | None = None, name: str | None = None
self,
id: UUID,
status: ZipStatus,
dataset_id: UUID,
version: str,
bucket: str | None = None,
name: str | None = None,
):
self.success = success
self.id = id
self.bucket = bucket
self.name = name
self.status = status
self.dataset_id = dataset_id
self.version = version
82 changes: 66 additions & 16 deletions app/services/zipper.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,37 @@
import logging
import os
import tempfile
import threading
import zipfile
from minio import Minio
import uuid

from app.models.zipper import ZippedResource
from app.gateways.gatekeeper import GatekeeperGateway
from app.models.zipper import ZipStatus, ZippedResource

logger = logging.getLogger("uvicorn")


class ZipperService:
def __init__(self, minio_client: Minio, temp_dir: str):
def __init__(
self, minio_client: Minio, gatekeeper_gateway: GatekeeperGateway, temp_dir: str
):
self._minio_client = minio_client
self._gatekeeper_gateway = gatekeeper_gateway
self._temp_dir = temp_dir

def zip_files(
self, bucket: str, file_names: list[str], zip_name: str | None = None
) -> ZippedResource:
if not file_names:
return ZippedResource(success=False)

if not zip_name:
zip_name = f"{uuid.uuid4()}.zip"

if ".zip" not in zip_name:
zip_name = f"{zip_name}.zip"
def _zip_files(
self,
id: uuid.UUID,
dataset_id: uuid.UUID,
version: str,
bucket: str,
file_names: list[str],
zip_name: str,
):
logger.info(
f"Zipping files: {file_names} to {zip_name} in bucket {bucket} with process ID {id}"
)

if not os.path.exists(self._temp_dir):
os.makedirs(self._temp_dir)
Expand All @@ -48,10 +54,54 @@ def zip_files(
self._minio_client.fput_object(bucket, zip_name, temp_zip_file.name)
logger.info(f"Zipped files to {zip_name} in bucket {bucket}")
except Exception as e:
logger.error(f"Failed to zip files: {e}")
return ZippedResource(success=False)
logger.error(f"Failed to zip files for process id {id}: {e}")
finally:
logger.info(f"Removing temporary zip file: {temp_zip_file.name}")
os.remove(temp_zip_file.name)
logger.info(f"Finished zipping files for process id {id}")
self._gatekeeper_gateway.post_zip_callback(
ZippedResource(
id=id,
dataset_id=dataset_id,
version=version,
status=ZipStatus.SUCCESS,
)
)

def zip_files(
self,
dataset_id: uuid.UUID,
version: str,
bucket: str,
file_names: list[str],
zip_name: str | None = None,
) -> ZippedResource:
process_id = uuid.uuid4()

if not file_names:
return ZippedResource(
id=process_id,
dataset_id=dataset_id,
version=version,
status=ZipStatus.FAILURE,
)

if not zip_name:
zip_name = f"{uuid.uuid4()}.zip"

if ".zip" not in zip_name:
zip_name = f"{zip_name}.zip"

threading.Thread(
target=self._zip_files,
args=(process_id, dataset_id, version, bucket, file_names, zip_name),
).start()

return ZippedResource(success=True, bucket=bucket, name=zip_name)
return ZippedResource(
id=process_id,
dataset_id=dataset_id,
version=version,
status=ZipStatus.IN_PROGRESS,
bucket=bucket,
name=zip_name,
)
Loading