Skip to content
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- Compress and Extract end-points now support multiple compresion types (none, bz2, gzip, and xz).
- Support for Magic Wormhole data transfer method

### Changed

- The System Name path parameter and the corresponding Cluster name configuration are case insensitive.
- Upload and Download transfer endpoints now require to specify transfer directives
- Installation docs:
- Helm charts: FirecREST settings are all included in values.yaml file
- Changed documentation name from Deployment to Install
Expand Down
1 change: 1 addition & 0 deletions build/docker/slurm-cluster/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ RUN apt update -y \
wget \
curl \
python3 \
python3.13-venv \
supervisor \
dos2unix

Expand Down
3 changes: 2 additions & 1 deletion f7t-api-config.local-env.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ clusters:
default_work_dir: true
data_operation:
max_ops_file_size: 1048576 # 1M
# data_transfer:
# service_type: "wormhole"
data_transfer:
service_type: "s3"
name: "s3-storage"
Expand All @@ -124,7 +126,6 @@ data_operation:
secret_access_key: "secret_file:/run/secrets/s3_secret_access_key"
region: "us-east-1"
ttl: 604800
tenant: "test"
multipart:
use_split: false
max_part_size: 1073741824 # 1G
Expand Down
18 changes: 14 additions & 4 deletions src/firecrest/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,14 +195,17 @@ class DataTransferType(str, Enum):
"""Types of data transfer services"""

s3 = "s3"
wormhole = "wormhole"


class BaseDataTransfer(CamelModel):
"""Base data transfer setting"""

service_type: DataTransferType = Field(
..., description="Type of data transfer service."
)
service_type: Literal[
DataTransferType.s3,
DataTransferType.wormhole,
]

probing: Optional[Probing] = Field(
None, description="Configuration for probing storage availability."
)
Expand All @@ -217,6 +220,7 @@ class BaseDataTransfer(CamelModel):
class S3DataTransfer(BaseDataTransfer):
"""Object storage configuration, including credentials, endpoints, and upload behavior."""

service_type: Literal[DataTransferType.s3]
name: str = Field(..., description="Name identifier for the storage.")
private_url: SecretStr = Field(
..., description="Private/internal endpoint URL for the storage."
Expand Down Expand Up @@ -247,6 +251,11 @@ class S3DataTransfer(BaseDataTransfer):
)


class WormholeTransfer(BaseDataTransfer):
service_type: Literal[DataTransferType.wormhole]
pass


class DataOperation(BaseModel):
max_ops_file_size: int = Field(
5 * 1024 * 1024,
Expand All @@ -255,9 +264,10 @@ class DataOperation(BaseModel):
"download. Larger files will go through the staging area."
),
)
data_transfer: Optional[S3DataTransfer] = Field(
data_transfer: Optional[S3DataTransfer | WormholeTransfer] = Field(
None,
description=("Data transfer service configuration"),
discriminator="service_type",
)


Expand Down
35 changes: 22 additions & 13 deletions src/firecrest/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from lib.auth.authZ.open_fga_client import OpenFGAClient
from lib.auth.authZ.authorization_service import AuthorizationService
from lib.datatransfers.s3.s3_datatransfer import S3Datatransfer
from lib.datatransfers.magic_wormhole.wormhole_datatransfer import WormholeDatatransfer
from lib.dependencies import AuthDependency

# clients
Expand Down Expand Up @@ -355,22 +356,30 @@ async def __call__(
scheduler_client = await self._get_scheduler_client(system_name)
ssh_client = await self._get_ssh_client(system_name)

system = ServiceAvailabilityDependency(service_type=HealthCheckType.scheduler)(
system_name=system_name
)
work_dir = next(
iter([fs.path for fs in system.file_systems if fs.default_work_dir]),
None,
)
if not work_dir:
raise ValueError(
f"The system {system_name} has no filesystem defined as default_work_dir"
)

match settings.data_operation.data_transfer.service_type:
case DataTransferType.s3:
case DataTransferType.wormhole:

system = ServiceAvailabilityDependency(
service_type=HealthCheckType.scheduler
)(system_name=system_name)
work_dir = next(
iter(
[fs.path for fs in system.file_systems if fs.default_work_dir]
),
None,
return WormholeDatatransfer(
scheduler_client=scheduler_client,
directives=system.datatransfer_jobs_directives,
work_dir=work_dir,
system_name=system_name,
)
if not work_dir:
raise ValueError(
f"The system {system_name} has no filesystem defined as default_work_dir"
)

case DataTransferType.s3:

async with self._get_s3_client(
settings.data_operation.data_transfer.public_url
) as s3_client_public:
Expand Down
26 changes: 17 additions & 9 deletions src/firecrest/filesystem/transfer/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,38 @@
# Please, refer to the LICENSE file in the root directory.
# SPDX-License-Identifier: BSD-3-Clause

from typing import Any, Optional
from typing import Any, Optional, Union
from pydantic import Field

# models
from firecrest.filesystem.models import FilesystemRequestBase
from lib.datatransfers.s3.models import S3DataTransferOperation
from lib.datatransfers.datatransfer_base import DataTransferOperation
from lib.datatransfers.magic_wormhole.models import WormholeDataTransferDirective
from lib.datatransfers.s3.models import S3DataTransferDirective
from lib.models.base_model import CamelModel
from firecrest.filesystem.ops.commands.tar_command import TarCommand


class PostFileUploadRequest(FilesystemRequestBase):
file_name: str = Field(..., description="Name of the local file to upload")
transfer_directives: Union[
WormholeDataTransferDirective | S3DataTransferDirective
] = Field(
..., description="Data transfer parameters specific to the transfer method"
)

account: Optional[str] = Field(
default=None, description="Name of the account in the scheduler"
)
file_size: int = Field(..., description="Size of the file to upload in bytes")
model_config = {
"json_schema_extra": {
"examples": [
{
"path": "/home/user/dir/file",
"file_name": "/path/local/file",
"account": "group",
"file_size": "7340032",
"transfer_directives": {
"transfer_method": "s3",
"account": "group",
"file_size": "7340032",
},
}
]
}
Expand Down Expand Up @@ -60,11 +68,11 @@ class TransferJob(CamelModel):
logs: TransferJobLogs


class UploadFileResponse(S3DataTransferOperation):
class UploadFileResponse(DataTransferOperation):
pass


class DownloadFileResponse(S3DataTransferOperation):
class DownloadFileResponse(DataTransferOperation):
pass


Expand Down
9 changes: 6 additions & 3 deletions src/firecrest/filesystem/transfer/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,16 @@ async def post_upload(
access_token = ApiAuthHelper.get_access_token()

source = DataTransferLocation(
host=None, system=None, path=None, size=upload_request.file_size
host=None,
system=None,
path=None,
transfer_directives=upload_request.transfer_directives,
)
target = DataTransferLocation(
host=None,
system=system_name,
path=f"{upload_request.path}/{upload_request.file_name}",
size=upload_request.file_size,
path=upload_request.path,
transfer_directives=upload_request.transfer_directives,
)

return await datatransfer.upload(
Expand Down
113 changes: 110 additions & 3 deletions src/lib/datatransfers/datatransfer_base.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
from abc import ABC, abstractmethod
from typing import Optional

from typing import List, Optional
import uuid
from enum import Enum
from jinja2 import Environment, FileSystemLoader
from importlib import resources as imp_resources
from typing import Literal, Union
from lib.models.base_model import CamelModel
from lib.scheduler_clients.scheduler_base_client import SchedulerBaseClient
from lib.datatransfers import scripts
from fastapi import HTTPException
from pydantic import ConfigDict, Field


class TransferJobLogs(CamelModel):
Expand All @@ -17,15 +24,115 @@ class TransferJob(CamelModel):
logs: TransferJobLogs


class DataTransferType(str, Enum):
"""Types of data transfer services"""

s3 = "s3"
wormhole = "wormhole"


class DataTransferDirective(CamelModel):
transfer_method: Literal[
DataTransferType.s3,
DataTransferType.wormhole,
]


class WormholeDataTransferDirective(DataTransferDirective):
wormhole_code: Optional[str] = None
transfer_method: Literal[DataTransferType.wormhole,]


class S3DataTransferDirective(DataTransferDirective):
download_url: Optional[str] = None
parts_upload_urls: Optional[List[str]] = None
complete_upload_url: Optional[str] = None
max_part_size: Optional[int] = None
file_size: Optional[int] = Field(
None, description="Size of the file to upload in bytes"
)
transfer_method: Literal[DataTransferType.s3,]


class DataTransferLocation(CamelModel):
host: Optional[str] = None
system: Optional[str] = None
path: Optional[str] = None
size: Optional[int] = None
transfer_directives: Optional[
Union[S3DataTransferDirective | WormholeDataTransferDirective]
] = Field(
None,
description=("Provide method specific transfer directives"),
discriminator="transfer_method",
)

model_config = ConfigDict(use_enum_values=True)


class DataTransferOperation(CamelModel):
transfer_job: TransferJob
transfer_directives: Union[
S3DataTransferDirective | WormholeDataTransferDirective
] = Field(
None,
description=("Provide method specific transfer directives"),
discriminator="transfer_method",
)

class Config:
json_encoders = {
DataTransferDirective: lambda d: {
k: v for k, v in d.__dict__.items() if v is not None
}
}


class JobHelper:
job_param = None
working_dir: str = None

def __init__(
self,
working_dir: str = None,
script: str = None,
job_name: str = None,
):
self.working_dir = working_dir
unique_id = uuid.uuid4()
self.job_param = {
"name": job_name,
"working_directory": working_dir,
"standard_input": "/dev/null",
"standard_output": f"{working_dir}/.f7t_file_handling_job_{unique_id}.log",
"standard_error": f"{working_dir}/.f7t_file_handling_job_error_{unique_id}.log",
"env": {"PATH": "/bin:/usr/bin/:/usr/local/bin/"},
"script": script,
}


def _format_directives(directives: List[str], account: str):

directives_str = "\n".join(directives)
if "{account}" in directives_str:
if account is None:
raise HTTPException(
status_code=400, detail="Account parameter is required on this system."
)
directives_str = directives_str.format(account=account)

return directives_str


def _build_script(filename: str, parameters):

script_environment = Environment(
loader=FileSystemLoader(imp_resources.files(scripts)), autoescape=True
)
script_template = script_environment.get_template(filename)

script_code = script_template.render(parameters)

return script_code


class DataTransferBase(ABC):
Expand Down
Empty file.
8 changes: 8 additions & 0 deletions src/lib/datatransfers/magic_wormhole/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from lib.datatransfers.datatransfer_base import (
WormholeDataTransferDirective,
DataTransferOperation,
)


class WormholeDataTransferOperation(DataTransferOperation):
transfer_directives: WormholeDataTransferDirective
Loading