Skip to content

Investigate splitting private data into its own bucket #70

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 23 commits into
base: apl-setup
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
2623b06
Added todos to mark places DJANGO_DANDI_DANDISETS_BUCKET_NAME is used
NEStock May 12, 2025
cfa417b
initial creation of private data design.md
sandyhider May 14, 2025
156a5d2
more notes on embargo bucket redesign changes
NEStock May 14, 2025
415c741
more notes on embargo bucket redesig changes, from dandi pr 1890
NEStock May 15, 2025
fa3275a
design for private embargo data
sandyhider May 28, 2025
756f6c5
Update private-and-embargo-bucket-design.md
sandyhider Jun 11, 2025
4f2a557
Update private-and-embargo-bucket-design.md
sandyhider Jun 11, 2025
f3b7462
Update private-and-embargo-bucket-design.md
NEStock Jun 12, 2025
150328d
Update private-and-embargo-bucket-design.md
NEStock Jun 12, 2025
75c3ce0
Update private-and-embargo-bucket-design.md
NEStock Jun 12, 2025
5c76d01
Update private-and-embargo-bucket-design.md
NEStock Jun 12, 2025
f8df81a
Merge branch 'apl-setup' into 69-investigate-splitting-private-data-i…
NEStock Jun 12, 2025
6d207cf
Add variables for private bucket name, allow private, and use private…
NEStock Jun 12, 2025
f4f6af6
Fix lint and tests
NEStock Jun 12, 2025
48107ad
Don't require private bucket variable
NEStock Jun 13, 2025
efdfc02
fix environment variable required
NEStock Jun 13, 2025
c3cd8ad
Merge pull request #83 from aplbrain/69-investigate-splitting-private…
NEStock Jun 13, 2025
d1afc4c
Updating comments to reflect 3 potential buckets
NEStock Jun 13, 2025
a31d612
Add private field to ProcessedS3Log class
NEStock Jun 13, 2025
fadcf0e
Update usages of ProcessedS3Log to handle private vs open bucket
NEStock Jun 16, 2025
31e08cf
Add migration file
NEStock Jun 17, 2025
7562bf6
Add note about test
NEStock Jun 17, 2025
6542bfe
Merge pull request #84 from aplbrain/69-nicole--refactor-processeds3l…
NEStock Jun 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/backend-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,5 @@ jobs:
DJANGO_DANDI_DEV_EMAIL: [email protected]
DJANGO_DANDI_ADMIN_EMAIL: [email protected]
DANDI_ALLOW_LOCALHOST_URLS: 1
DJANGO_ALLOW_PRIVATE: true
DJANGO_USE_PRIVATE_BUCKET_FOR_EMBARGOED: true
4 changes: 4 additions & 0 deletions .github/workflows/frontend-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,18 @@ jobs:
DJANGO_MINIO_STORAGE_SECRET_KEY: minioSecretKey
DJANGO_STORAGE_BUCKET_NAME: dandi-bucket
DJANGO_DANDI_DANDISETS_BUCKET_NAME: dandi-bucket
DJANGO_DANDI_DANDISETS_PRIVATE_BUCKET_NAME: dandi-private-dandisets
DJANGO_DANDI_DANDISETS_LOG_BUCKET_NAME: dandiapi-dandisets-logs
DJANGO_DANDI_DANDISETS_EMBARGO_LOG_BUCKET_NAME: dandiapi-embargo-dandisets-logs
DJANGO_DANDI_DANDISETS_PRIVATE_LOG_BUCKET_NAME: dandi-private-dandisets-logs
DJANGO_DANDI_WEB_APP_URL: http://localhost:8085
DJANGO_DANDI_API_URL: http://localhost:8000
DJANGO_DANDI_JUPYTERHUB_URL: https://hub.dandiarchive.org/
DJANGO_DANDI_DEV_EMAIL: [email protected]
DJANGO_DANDI_ADMIN_EMAIL: [email protected]
DANDI_ALLOW_LOCALHOST_URLS: 1
DJANGO_ALLOW_PRIVATE: true
DJANGO_USE_PRIVATE_BUCKET_FOR_EMBARGOED: true

# Web client env vars
VITE_APP_DANDI_API_ROOT: http://localhost:8000/api/
Expand Down
32 changes: 32 additions & 0 deletions dandiapi/analytics/migrations/0003_processeds3log_private.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Generated by Django 4.2.19 on 2025-06-17 13:56
from __future__ import annotations

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
(
'analytics',
'0002_remove_processeds3log_analytics_processeds3log_unique_name_embargoed_and_more',
),
]

operations = [
migrations.RemoveConstraint(
model_name='processeds3log',
name='analytics_processeds3log_unique_name_embargoed',
),
migrations.AddField(
model_name='processeds3log',
name='private',
field=models.BooleanField(default=False),
),
migrations.AddConstraint(
model_name='processeds3log',
constraint=models.UniqueConstraint(
fields=('name', 'private', 'historically_embargoed'),
name='analytics_processeds3log_unique_name_embargoed',
),
),
]
14 changes: 11 additions & 3 deletions dandiapi/analytics/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,23 @@ class ProcessedS3Log(models.Model):
],
)

# Represents if this s3 log file is private (including embargoed) or public.
# If private is True, the log file lives in the S3 bucket pointed to by
# DANDI_DANDISETS_PRIVATE_LOG_BUCKET_NAME.
# If private is False...
# & historically_embargoed is False, the log file lives in the S3
# bucket pointed to by DANDI_DANDISETS_LOG_BUCKET_NAME.
# & historically_embargoed is True, the log file lives in the S3
# bucket pointed to by DANDI_DANDISETS_EMBARGO_LOG_BUCKET_NAME.
private = models.BooleanField(default=False)

# Represents if this s3 log file was embargoed prior to the embargo re-design.
# If this field is True, the log file lives in the S3 bucket pointed to by the
# DANDI_DANDISETS_EMBARGO_LOG_BUCKET_NAME setting.
historically_embargoed = models.BooleanField(default=False)

class Meta:
constraints = [
models.UniqueConstraint(
fields=['name', 'historically_embargoed'],
fields=['name', 'private', 'historically_embargoed'],
name='%(app_label)s_%(class)s_unique_name_embargoed',
)
]
Expand Down
51 changes: 37 additions & 14 deletions dandiapi/analytics/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from dandiapi.analytics.models import ProcessedS3Log
from dandiapi.api.models.asset import AssetBlob
from dandiapi.api.storage import get_boto_client, get_storage
from dandiapi.api.storage import get_boto_client, get_private_storage, get_storage

if TYPE_CHECKING:
from collections.abc import Generator
Expand All @@ -24,43 +24,66 @@

# should be one of the DANDI_DANDISETS_*_LOG_BUCKET_NAME settings
LogBucket = str
# Log buckets actively used in the system
ACTIVE_LOG_BUCKETS = {
settings.DANDI_DANDISETS_LOG_BUCKET_NAME,
settings.DANDI_DANDISETS_PRIVATE_LOG_BUCKET_NAME,
}


def _bucket_objects_after(after: str | None) -> Generator[dict, None, None]:
s3 = get_boto_client(get_storage())
def _bucket_objects_after(bucket: str, after: str | None) -> Generator[dict, None, None]:
# Check that bucket name is valid
if bucket not in ACTIVE_LOG_BUCKETS:
raise ValueError(f'Non-log bucket: {bucket}')
private = bucket == settings.DANDI_DANDISETS_PRIVATE_LOG_BUCKET_NAME

s3 = get_boto_client(get_storage() if not private else get_private_storage())

kwargs = {}
if after:
kwargs['StartAfter'] = after

paginator = s3.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=settings.DANDI_DANDISETS_LOG_BUCKET_NAME, **kwargs):
for page in paginator.paginate(Bucket=bucket, **kwargs):
yield from page.get('Contents', [])


@shared_task(queue='s3-log-processing', soft_time_limit=60, time_limit=80)
def collect_s3_log_records_task() -> None:
def collect_s3_log_records_task(bucket: LogBucket) -> None:
"""Dispatch a task per S3 log file to process for download counts."""
after = ProcessedS3Log.objects.aggregate(last_log=Max('name'))['last_log']
# Check that bucket name is valid
if bucket not in ACTIVE_LOG_BUCKETS:
raise RuntimeError
private = bucket == settings.DANDI_DANDISETS_PRIVATE_LOG_BUCKET_NAME

for s3_log_object in _bucket_objects_after(after):
process_s3_log_file_task.delay(s3_log_object['Key'])
after = ProcessedS3Log.objects.filter(private=private).aggregate(last_log=Max('name'))[
'last_log'
]

for s3_log_object in _bucket_objects_after(bucket, after):
process_s3_log_file_task.delay(bucket, s3_log_object['Key'])


@shared_task(queue='s3-log-processing', soft_time_limit=120, time_limit=140)
def process_s3_log_file_task(s3_log_key: str) -> None:
def process_s3_log_file_task(bucket: LogBucket, s3_log_key: str) -> None:
"""
Process a single S3 log file for download counts.

Creates a ProcessedS3Log entry and updates the download counts for the relevant
asset blobs. Prevents duplicate processing with a unique constraint on the ProcessedS3Log name.
"""
# Check that bucket name is valid
if bucket not in ACTIVE_LOG_BUCKETS:
raise RuntimeError
private = bucket == settings.DANDI_DANDISETS_PRIVATE_LOG_BUCKET_NAME

# short circuit if the log file has already been processed. note that this doesn't guarantee
# exactly once processing, that's what the unique constraint on ProcessedS3Log is for.
if ProcessedS3Log.objects.filter(name=s3_log_key.split('/')[-1]).exists():
if ProcessedS3Log.objects.filter(name=s3_log_key.split('/')[-1], private=private).exists():
return

s3 = get_boto_client(get_storage())
data = s3.get_object(Bucket=settings.DANDI_DANDISETS_LOG_BUCKET_NAME, Key=s3_log_key)
s3 = get_boto_client(get_storage() if not private else get_private_storage())
data = s3.get_object(Bucket=bucket, Key=s3_log_key)
download_counts = Counter()

for log_entry in s3logparse.parse_log_lines(
Expand All @@ -71,14 +94,14 @@ def process_s3_log_file_task(s3_log_key: str) -> None:

with transaction.atomic():
try:
log = ProcessedS3Log(name=s3_log_key.split('/')[-1])
log = ProcessedS3Log(name=s3_log_key.split('/')[-1], private=private)
# disable constraint validation checking so duplicate errors can be detected and
# ignored. the rest of the full_clean errors should still be raised.
log.full_clean(validate_constraints=False)
log.save()
except IntegrityError as e:
if '_unique_name' in str(e):
logger.info('Already processed log file %s', s3_log_key)
logger.info('Already processed log file %s, private=%s', s3_log_key, private)
return

# we need to store all of the fully hydrated blob objects in memory in order to use
Expand Down
27 changes: 17 additions & 10 deletions dandiapi/analytics/tests/test_download_counts.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,24 @@

from dandiapi.analytics.models import ProcessedS3Log
from dandiapi.analytics.tasks import collect_s3_log_records_task, process_s3_log_file_task
from dandiapi.api.storage import create_s3_storage, get_boto_client
from dandiapi.api.storage import (
create_s3_storage,
get_boto_client,
get_private_storage,
get_storage,
)


@pytest.fixture
def s3_log_bucket():
# This file is testing the processing of s3 logs, so we believe we only need to test 1 bucket
return create_s3_storage(settings.DANDI_DANDISETS_LOG_BUCKET_NAME).bucket_name


@pytest.fixture
def s3_log_file(s3_log_bucket, asset_blob):
s3 = get_boto_client()
private = s3_log_bucket == settings.DANDI_DANDISETS_PRIVATE_LOG_BUCKET_NAME
s3 = get_boto_client(get_storage() if not private else get_private_storage())

log_file_name = '2019-02-06-00-00-38-5C5B0E0CA8F2B1B5'
s3.put_object(
Expand Down Expand Up @@ -45,34 +52,34 @@ def s3_log_file(s3_log_bucket, asset_blob):


@pytest.mark.django_db
def test_processing_s3_log_files(s3_log_file, asset_blob):
collect_s3_log_records_task()
def test_processing_s3_log_files(s3_log_bucket, s3_log_file, asset_blob):
collect_s3_log_records_task(s3_log_bucket)
asset_blob.refresh_from_db()

assert ProcessedS3Log.objects.count() == 1
assert asset_blob.download_count == 1


@pytest.mark.django_db
def test_processing_s3_log_files_idempotent(s3_log_file, asset_blob):
def test_processing_s3_log_files_idempotent(s3_log_bucket, s3_log_file, asset_blob):
# this tests that the outer task which collects the log files to process is
# idempotent, in other words, it uses StartAfter correctly.
collect_s3_log_records_task()
collect_s3_log_records_task(s3_log_bucket)
# run the task again, it should skip the existing log record
collect_s3_log_records_task()
collect_s3_log_records_task(s3_log_bucket)
asset_blob.refresh_from_db()

assert ProcessedS3Log.objects.count() == 1
assert asset_blob.download_count == 1


@pytest.mark.django_db
def test_processing_s3_log_file_task_idempotent(s3_log_file, asset_blob):
def test_processing_s3_log_file_task_idempotent(s3_log_bucket, s3_log_file, asset_blob):
# this tests that the inner task which processes a single log file is
# idempotent, utilizing the unique constraint on ProcessedS3Log correctly.
process_s3_log_file_task(s3_log_file)
process_s3_log_file_task(s3_log_bucket, s3_log_file)
# run the task again, it should ignore the new log
process_s3_log_file_task(s3_log_file)
process_s3_log_file_task(s3_log_bucket, s3_log_file)
asset_blob.refresh_from_db()

assert ProcessedS3Log.objects.count() == 1
Expand Down
1 change: 1 addition & 0 deletions dandiapi/api/management/commands/cleanup_blobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from dandiapi.api.models.upload import AssetBlob

# TODO: handle private bucket
BUCKET = settings.DANDI_DANDISETS_BUCKET_NAME


Expand Down
4 changes: 4 additions & 0 deletions dandiapi/api/manifests.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

def _s3_url(path: str) -> str:
"""Turn an object path into a fully qualified S3 URL."""
# TODO: determine which bucket name to pass in
# if embargoed:
# storage = create_s3_storage(settings.DANDI_DANDISETS_EMBARGO_BUCKET_NAME)
# else:
storage = create_s3_storage(settings.DANDI_DANDISETS_BUCKET_NAME)
signed_url = storage.url(path)
# Strip off the query parameters from the presigning, as they are different every time
Expand Down
2 changes: 2 additions & 0 deletions dandiapi/api/models/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ class AssetBlob(TimeStampedModel):
SHA256_REGEX = r'[0-9a-f]{64}'
ETAG_REGEX = r'[0-9a-f]{32}(-[1-9][0-9]*)?'

# TODO: do we need an indicator of embargo vs. private?
embargoed = models.BooleanField(default=False)
# TODO: storage and upload_to will be dependent on bucket
blob = models.FileField(blank=True, storage=get_storage, upload_to=get_storage_prefix)
blob_id = models.UUIDField(unique=True)
sha256 = models.CharField( # noqa: DJ001
Expand Down
2 changes: 2 additions & 0 deletions dandiapi/api/models/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class Upload(models.Model): # noqa: DJ008

dandiset = models.ForeignKey(Dandiset, related_name='uploads', on_delete=models.CASCADE)

# TODO: storage and upload_to will be dependent on bucket
blob = models.FileField(blank=True, storage=get_storage, upload_to=get_storage_prefix)
embargoed = models.BooleanField(default=False)

Expand All @@ -45,6 +46,7 @@ class Meta:
def object_key(upload_id):
upload_id = str(upload_id)
return (
# TODO: determine which bucket
f'{settings.DANDI_DANDISETS_BUCKET_PREFIX}'
f'blobs/{upload_id[0:3]}/{upload_id[3:6]}/{upload_id}'
)
Expand Down
1 change: 1 addition & 0 deletions dandiapi/api/services/asset/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ def add_asset_to_version(
# embargoed blob results in that blob being unembargoed.
# NOTE: This only applies to asset blobs, as zarrs cannot belong to
# multiple dandisets at once.
# TODO: unsure if this logic would need to change? are there unintended side-effects?
if (
asset_blob is not None
and asset_blob.embargoed
Expand Down
1 change: 1 addition & 0 deletions dandiapi/api/services/embargo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
@transaction.atomic()
def unembargo_dandiset(ds: Dandiset, user: User):
"""Unembargo a dandiset by copying all embargoed asset blobs to the public bucket."""
# TODO: Move embargoed dandiset to public bucket
logger.info('Unembargoing Dandiset %s', ds.identifier)
logger.info('\t%s assets', ds.draft_version.assets.count())

Expand Down
2 changes: 2 additions & 0 deletions dandiapi/api/services/embargo/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def newfn(*args, **kwargs):

@retry(times=3, exceptions=(Exception,))
def _delete_object_tags(client: S3Client, blob: str):
# TODO: determine which bucket name to pass in
client.delete_object_tagging(
Bucket=settings.DANDI_DANDISETS_BUCKET_NAME,
Key=blob,
Expand All @@ -62,6 +63,7 @@ def _delete_object_tags(client: S3Client, blob: str):
@retry(times=3, exceptions=(Exception,))
def _delete_zarr_object_tags(client: S3Client, zarr: str):
paginator = client.get_paginator('list_objects_v2')
# TODO: determine which bucket name to pass in
pages = paginator.paginate(
Bucket=settings.DANDI_DANDISETS_BUCKET_NAME, Prefix=zarr_s3_path(zarr_id=zarr)
)
Expand Down
8 changes: 8 additions & 0 deletions dandiapi/api/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,3 +374,11 @@ def get_storage() -> Storage:

def get_storage_prefix(instance: Any, filename: str) -> str:
return f'{settings.DANDI_DANDISETS_BUCKET_PREFIX}{filename}'


def get_private_storage() -> Storage:
return create_s3_storage(settings.DANDI_DANDISETS_PRIVATE_BUCKET_NAME)


def get_private_storage_prefix(instance: Any, filename: str) -> str:
return f'{settings.DANDI_DANDISETS_PRIVATE_BUCKET_PREFIX}{filename}'
10 changes: 7 additions & 3 deletions dandiapi/api/tasks/scheduled.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def garbage_collection() -> None:
def register_scheduled_tasks(sender: Celery, **kwargs):
"""Register tasks with a celery beat schedule."""
logger.info(
'Registering scheduled tasks for %s. ' 'DANDI_VALIDATION_JOB_INTERVAL is %s seconds.',
'Registering scheduled tasks for %s. DANDI_VALIDATION_JOB_INTERVAL is %s seconds.',
sender,
settings.DANDI_VALIDATION_JOB_INTERVAL,
)
Expand All @@ -157,8 +157,12 @@ def register_scheduled_tasks(sender: Celery, **kwargs):
# Refresh the materialized view used by asset search every 10 mins.
sender.add_periodic_task(timedelta(minutes=10), refresh_materialized_view_search.s())

# Process new S3 logs every hour
sender.add_periodic_task(timedelta(hours=1), collect_s3_log_records_task.s())
# Process new S3 logs every hour, from each bucket
for log_bucket in [
settings.DANDI_DANDISETS_LOG_BUCKET_NAME,
settings.DANDI_DANDISETS_PRIVATE_LOG_BUCKET_NAME,
]:
sender.add_periodic_task(timedelta(hours=1), collect_s3_log_records_task.s(log_bucket))

# Run garbage collection once a day
# TODO: enable this once we're ready to run garbage collection automatically
Expand Down
2 changes: 2 additions & 0 deletions dandiapi/api/tests/test_audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ def test_audit_finalize_zarr(
boto = get_boto_client()
zarr_archive = ZarrArchive.objects.get(zarr_id=zarr.zarr_id)
for path in paths:
# TODO: test both buckets?
boto.put_object(
Bucket=settings.DANDI_DANDISETS_BUCKET_NAME, Key=zarr_archive.s3_path(path), Body=b'a'
)
Expand Down Expand Up @@ -421,6 +422,7 @@ def test_audit_delete_zarr_chunks(
boto = get_boto_client()
zarr_archive = ZarrArchive.objects.get(zarr_id=zarr.zarr_id)
for path in paths:
# TODO: test both buckets?
boto.put_object(
Bucket=settings.DANDI_DANDISETS_BUCKET_NAME, Key=zarr_archive.s3_path(path), Body=b'a'
)
Expand Down
Loading