Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 51 additions & 2 deletions cfa/cloudops/_cloudclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
OnAllTasksComplete,
OnTaskFailure,
)

# from azure.batch.models import TaskAddParameter
from azure.mgmt.batch import models
from azure.mgmt.resource import SubscriptionClient

Expand Down Expand Up @@ -872,12 +870,43 @@ def create_blob_container(self, name: str) -> None:
create_storage_container_if_not_exists(name, self.blob_service_client)
logger.info(f"Blob container '{name}' created or already exists.")

def toggle_legal_hold_on_files(
self,
files: str | list[str],
container_name: str,
legal_hold: bool = False,
) -> None:
"""Update legal hold status on files in an Azure Blob Storage container.

Args:
files (str | list[str]): Path(s) to file(s) to upload. Can be a single file
path as a string or a list of file paths. Paths can be relative or absolute.
container_name (str): Name of the blob storage container to upload to. The
container must already exist.
legal_hold (bool, optional): Whether to apply a legal hold to the uploaded blobs which prevents deletion or modification of the blobs.
"""
logger.debug(
f"Toggling legal hold {legal_hold} on files {files} to container {container_name}."
)
blob.toggle_legal_hold_on_files(
file_paths=files,
blob_storage_container_name=container_name,
blob_service_client=self.blob_service_client,
legal_hold=legal_hold,
)
logger.info(
f"Toggled legal hold {legal_hold} on files {files} to container '{container_name}'."
)

def upload_files(
self,
files: str | list[str],
container_name: str,
local_root_dir: str = ".",
location_in_blob: str = ".",
legal_hold: bool = False,
immutability_lock_days: int = 0,
read_only: bool = False,
) -> None:
"""Upload files to an Azure Blob Storage container.

Expand All @@ -894,6 +923,9 @@ def upload_files(
Default is "." (current directory).
location_in_blob (str, optional): Remote directory path within the blob container
where files should be uploaded. Default is "." (container root).
legal_hold (bool, optional): Whether to apply a legal hold to the uploaded blobs which prevents deletion or modification of the blobs.
immutability_lock_days (int, optional): Number of days to set for immutability lock on the uploaded blobs.
read_only (bool, optional): Whether to set the uploaded blobs to read-only.

Example:
Upload a single file:
Expand Down Expand Up @@ -924,6 +956,9 @@ def upload_files(
blob_service_client=self.blob_service_client,
local_root_dir=local_root_dir,
remote_root_dir=location_in_blob,
legal_hold=legal_hold,
immutability_lock_days=immutability_lock_days,
read_only=read_only,
)
logger.info(f"Uploaded files to container '{container_name}'.")

Expand All @@ -936,6 +971,8 @@ def upload_folders(
exclude_patterns: str | list | None = None,
location_in_blob: str = ".",
force_upload: bool = False,
legal_hold: bool = False,
immutability_lock_days: int = 0,
) -> list[str]:
"""Upload entire folders to an Azure Blob Storage container with filtering options.

Expand All @@ -962,6 +999,9 @@ def upload_folders(
force_upload (bool, optional): Whether to force upload files even if they
already exist in the container with the same size. Default is False
(skip existing files with same size).
legal_hold (bool, optional): Whether to apply a legal hold to the uploaded blobs
which prevents deletion or modification of the blobs.
immutability_lock_days (int, optional): Number of days to set for immutability lock on the uploaded blobs.

Returns:
list[str]: List of file paths that were successfully uploaded to the container.
Expand Down Expand Up @@ -1006,6 +1046,8 @@ def upload_folders(
location_in_blob=location_in_blob,
blob_service_client=self.blob_service_client,
force_upload=force_upload,
legal_hold=legal_hold,
immutability_lock_days=immutability_lock_days,
)
_files += _uploaded_files
logger.debug(f"uploaded {_files}")
Expand Down Expand Up @@ -1611,6 +1653,8 @@ def async_upload_folder(
exclude_extensions: str | list | None = None,
location_in_blob: str = ".",
max_concurrent_uploads: int = 20,
legal_hold: bool = False,
immutability_lock_days: int = 0,
):
"""Upload entire folders to an Azure Blob Storage container asynchronously.

Expand All @@ -1633,6 +1677,9 @@ def async_upload_folder(
where folders should be uploaded. Default is "." (container root).
max_concurrent_uploads (int, optional): Maximum number of concurrent
uploads to perform. Higher values may increase speed but use more RAM.
legal_hold (bool, optional): Whether to set a legal hold on the uploaded blobs
which prevents deletion or modification of the blobs. Default is False.
immutability_lock_days (int, optional): Number of days to set an immutability policy.

Returns:
list[str]: List of file paths that were successfully uploaded to the container.
Expand Down Expand Up @@ -1671,6 +1718,8 @@ def async_upload_folder(
location_in_blob=location_in_blob,
max_concurrent_uploads=max_concurrent_uploads,
credential=cred,
legal_hold=legal_hold,
immutability_lock_days=immutability_lock_days,
)
logger.info(
f"Asynchronously uploaded folder '{folder}' to container '{container_name}'."
Expand Down
122 changes: 119 additions & 3 deletions cfa/cloudops/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,18 @@

import logging
import os
from datetime import datetime, timedelta, timezone
from pathlib import Path

import anyio
from azure.batch import models
from azure.identity import ManagedIdentityCredential
from azure.storage.blob import BlobServiceClient, aio
from azure.storage.blob import (
BlobImmutabilityPolicyMode,
BlobServiceClient,
ImmutabilityPolicy,
aio,
)

from .client import get_blob_service_client
from .util import ensure_listlike
Expand Down Expand Up @@ -108,6 +114,9 @@ def upload_to_storage_container(
local_root_dir: str = ".",
remote_root_dir: str = ".",
tags: dict = None,
legal_hold: bool = False,
immutability_lock_days: int = 0,
read_only: bool = False,
) -> None:
"""Upload a file or list of files to an Azure blob storage container.

Expand All @@ -125,6 +134,10 @@ def upload_to_storage_container(
remote_root_dir: Root directory for the relative file paths within the blob
storage container. Defaults to "." (start at the blob storage container root).
tags: dict (optional): A dictionary of tags to apply to the uploaded blobs.
legal_hold: bool, optional): Whether to apply a legal hold on the uploaded blobs
which prevents deletion or modification of the blobs. Defaults to False.
immutability_lock_days: int, optional): Number of days to set immutability lock
read_only: bool, optional): Whether to set the blob to read-only. Defaults to False.

Raises:
Exception: If the blob storage container does not exist.
Expand All @@ -150,6 +163,14 @@ def upload_to_storage_container(

logger.debug(f"Uploading {n_total_files} files to blob storage")

immutability_policy = None
if immutability_lock_days > 0:
immutability_policy = ImmutabilityPolicy(
expiry_time=datetime.now(timezone.utc)
+ timedelta(days=immutability_lock_days),
policy_mode=BlobImmutabilityPolicyMode.UNLOCKED,
)

for i_file, file_path in enumerate(file_paths):
if i_file % (1 + int(n_total_files / 10)) == 0:
print("Uploading file {} of {}".format(i_file, n_total_files))
Expand All @@ -167,9 +188,22 @@ def upload_to_storage_container(
logger.debug(f"Created blob client for '{remote_file_path}'")

with open(local_file_path, "rb") as upload_data:
blob_client.upload_blob(upload_data, overwrite=True, tags=tags)
blob_client.upload_blob(
upload_data,
overwrite=True,
tags=tags,
legal_hold=legal_hold,
immutability_policy=immutability_policy,
seal_append_blob=read_only,
)
logger.debug(f"Successfully uploaded '{file_path}'")

if immutability_policy:
blob_client.lock_blob_immutability_policy()
logger.info(
f"Blob immutability policy is now LOCKED with retention policy if {immutability_lock_days} days."
)

logger.info(
f"Uploaded {n_total_files} file(s) to container '{blob_storage_container_name}'."
)
Expand Down Expand Up @@ -494,6 +528,9 @@ async def _async_upload_file_to_blob(
blob_name: str,
semaphore: anyio.Semaphore,
tags: dict = None,
legal_hold: bool = False,
immutability_lock_days: int = 0,
read_only: bool = False,
):
"""
Uploads a single file to a blob asynchronously, respecting a concurrency limit.
Expand All @@ -504,6 +541,9 @@ async def _async_upload_file_to_blob(
blob_name (str): Name of the blob in the container.
semaphore (anyio.Semaphore): Semaphore to limit concurrent uploads.
tags: dict (optional): A dictionary of tags to apply to the uploaded blobs.
legal_hold (bool, optional): Whether to apply a legal hold on the uploaded blob which prevents deletion or modification of the blob. Defaults to False.
immutability_lock_days (int, optional): Number of days to set immutability lock on the uploaded blob. Defaults to 0 (no immutability).
read_only (bool, optional): Whether to set the blob to read-only. Defaults to False.

Raises:
Exception: Logs errors if upload fails.
Expand All @@ -522,7 +562,26 @@ async def _async_upload_file_to_blob(
logger.debug(f"Opening file for upload: '{local_file_path}'")
async with await local_file_path.open("rb") as f:
logger.debug(f"Uploading blob data to: '{blob_name}'")
await blob_client.upload_blob(f, overwrite=True, tags=tags)
immutability_policy = None
if immutability_lock_days > 0:
immutability_policy = ImmutabilityPolicy(
expiry_time=datetime.now(timezone.utc)
+ timedelta(days=immutability_lock_days),
policy_mode=BlobImmutabilityPolicyMode.UNLOCKED,
)
await blob_client.upload_blob(
f,
overwrite=True,
tags=tags,
legal_hold=legal_hold,
immutability_policy=immutability_policy,
seal_append_blob=read_only,
)
if immutability_policy:
await blob_client.lock_blob_immutability_policy()
logger.info(
f"Blob immutability policy is now LOCKED with retention policy if {immutability_lock_days} days."
)

logger.debug(f"Successfully uploaded: '{local_file_path}' -> '{blob_name}'")
except Exception as e:
Expand Down Expand Up @@ -652,6 +711,9 @@ async def _async_upload_blob_folder(
include_extensions: str | list | None = None,
exclude_extensions: str | list | None = None,
tags: dict = None,
legal_hold: bool = False,
immutability_lock_days: int = 0,
read_only: bool = False,
):
"""
Uploads all matching files from a local folder to a blob container asynchronously.
Expand All @@ -664,6 +726,9 @@ async def _async_upload_blob_folder(
include_extensions (str | list, optional): File extensions to include (e.g., ".txt", [".json", ".csv"]).
exclude_extensions (str | list, optional): File extensions to exclude (e.g., ".log", [".tmp", ".bak"]).
tags: dict (optional): A dictionary of tags to apply to the uploaded blobs.
legal_hold: bool, optional): Whether to apply a legal hold on the uploaded blobs which prevents deletion or modification of the blobs. Defaults to False.
immutability_lock_days: int, optional): Number of days to set immutability lock on the uploaded blobs. Defaults to 0 (no immutability).
read_only: bool, optional): Whether to set the blobs to read-only. Defaults to False.

Raises:
Exception: If both include_extensions and exclude_extensions are provided.
Expand Down Expand Up @@ -732,6 +797,9 @@ async def walk_files(base: anyio.Path):
os.path.join(location_in_blob, str(rel_path)),
semaphore,
tags,
legal_hold,
immutability_lock_days,
read_only,
)
except Exception as e:
logger.error(f"Error walking files in folder {folder}: {e}")
Expand Down Expand Up @@ -843,6 +911,9 @@ def async_upload_folder(
max_concurrent_uploads: int = 20,
credential: any = None,
tags: dict = None,
legal_hold: bool = False,
immutability_lock_days: int = 0,
read_only: bool = False,
) -> None:
"""
Upload all files from a local folder to an Azure blob container asynchronously.
Expand All @@ -860,6 +931,9 @@ def async_upload_folder(
max_concurrent_uploads (int, optional): Maximum number of simultaneous uploads allowed. Defaults to 20.
credential (any, optional): Azure credential object. If None, ManagedIdentityCredential is used.
tags: dict (optional): A dictionary of tags to apply to the uploaded blobs.
legal_hold (bool, optional): Whether to apply a legal hold on the uploaded blobs which prevents deletion or modification of the blobs. Defaults to False.
immutability_lock_days (int, optional): Number of days to set immutability lock on the uploaded blobs. Defaults to 0 (no immutability).
read_only (bool, optional): Whether to set the blobs to read-only. Defaults to False.

Raises:
KeyboardInterrupt: If the user cancels the upload operation.
Expand Down Expand Up @@ -921,6 +995,9 @@ async def _runner(credential):
exclude_extensions=exclude_extensions,
max_concurrent_uploads=max_concurrent_uploads,
tags=tags,
legal_hold=legal_hold,
immutability_lock_days=immutability_lock_days,
read_only=read_only,
)
except Exception as e:
logger.error(f"Error during upload: {e}")
Expand All @@ -936,3 +1013,42 @@ async def _runner(credential):
f"Completed async upload of folder '{folder}' to container '{container_name}'."
)
return folder


def toggle_legal_hold_on_files(
file_paths: str | list[str],
blob_storage_container_name: str,
blob_service_client: BlobServiceClient,
legal_hold: bool = False,
) -> None:
"""Toggle legal hold on files in an Azure Blob Storage container.

Args:
file_paths (str | list[str]): Path(s) to file(s) to upload. Can be a single file
path as a string or a list of file paths. Paths can be relative or absolute.
blob_storage_container_name (str): Name of the blob storage container to upload to. The
container must already exist.
blob_service_client: The blob service client to use when looking for and potentially creating the storage container.
legal_hold (bool, optional): Whether to apply a legal hold to the uploaded blobs which prevents deletion or modification of the blobs.
"""
logger.debug(
f"Toggling legal hold to {legal_hold} on files in container '{blob_storage_container_name}'"
)

files = ensure_listlike(file_paths)
n_total_files = len(files)

logger.debug(f"Processing {n_total_files} files for legal hold toggle")

for i_file, file_path in enumerate(files):
if i_file % (1 + int(n_total_files / 10)) == 0:
logger.debug(f"Progress: {i_file}/{n_total_files} files processed")

blob_client = blob_service_client.get_blob_client(
container=blob_storage_container_name, blob=file_path
)
blob_client.set_legal_hold(legal_hold=legal_hold)
logger.debug(f"Set legal hold to {legal_hold} for blob '{file_path}'")
logger.info(
f"Toggled legal hold to {legal_hold} on {n_total_files} file(s) in container '{blob_storage_container_name}'."
)
10 changes: 10 additions & 0 deletions cfa/cloudops/blob_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ def upload_files_in_folder(
blob_service_client=None,
force_upload: bool = True,
tags: dict = None,
legal_hold: bool = False,
immutability_lock_days: int = 0,
read_only: bool = False,
) -> list[str]:
"""Upload all files from a local folder to Azure Blob Storage with filtering options.

Expand All @@ -90,6 +93,10 @@ def upload_files_in_folder(
force_upload (bool, optional): Whether to force upload without user confirmation
for large numbers of files (>50). Default is True.
tags: dict (optional): A dictionary of tags to apply to the uploaded blobs.
legal_hold (bool, optional): Whether to set a legal hold on the uploaded blobs
which prevents deletion or modification of the blobs.
immutability_lock_days (int, optional): Number of days to set for immutability
read_only (bool, optional): Whether to set the uploaded blobs to read-only.

Returns:
list[str]: List of local file paths that were processed for upload.
Expand Down Expand Up @@ -246,6 +253,9 @@ def upload_files_in_folder(
local_root_dir=".",
remote_root_dir=path.join(location_in_blob),
tags=tags,
legal_hold=legal_hold,
immutability_lock_days=immutability_lock_days,
read_only=read_only,
)
if final_list:
logger.info(
Expand Down
Loading
Loading