Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

COPDS-2331 #136

Merged
merged 7 commits into from
Jan 10, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions cacholote/clean.py
Original file line number Diff line number Diff line change
@@ -256,19 +256,21 @@ def delete_cache_files(
return

entries_to_delete = []
files_to_delete: set[str] = set()
self.logger.info("getting cache entries to delete")
with config.get().instantiated_sessionmaker() as session:
for cache_entry in session.scalars(
sa.select(database.CacheEntry).filter(*filters).order_by(*sorters)
):
files = _get_files_from_cache_entry(cache_entry, key="file:size")
if any(file.startswith(self.urldir) for file in files):
if (
not self.stop_cleaning(maxsize)
and any(file.startswith(self.urldir) for file in files)
) or (set(files) & files_to_delete):
entries_to_delete.append(cache_entry)
for file in files:
self.pop_file_size(file)

if self.stop_cleaning(maxsize):
break
files_to_delete.add(file)

if entries_to_delete:
self.logger.info(
28 changes: 20 additions & 8 deletions cacholote/extra_encoders.py
Original file line number Diff line number Diff line change
@@ -129,6 +129,10 @@ def _logging_timer(event: str, **kwargs: Any) -> Generator[float, None, None]:
context.upload_log(f"end {event}. {_kwargs_to_str(**kwargs)}")


class InPlaceFile(io.FileIO):
pass


class FileInfoModel(pydantic.BaseModel):
type: str
href: str
@@ -389,27 +393,35 @@ def _store_io_object(

def dictify_io_object(obj: _UNION_IO_TYPES) -> dict[str, Any]:
"""Encode a file object to JSON deserialized data (``dict``)."""
is_in_place = isinstance(obj, InPlaceFile)
settings = config.get()

cache_files_urlpath = settings.cache_files_urlpath
fs_out, *_ = fsspec.get_fs_token_paths(
cache_files_urlpath,
storage_options=settings.cache_files_storage_options,
)

if urlpath_in := getattr(obj, "path", getattr(obj, "name", "")):
fs_in = getattr(obj, "fs", fsspec.filesystem("file"))
root = f"{fs_in.checksum(urlpath_in):x}"
ext = pathlib.Path(urlpath_in).suffix
urlpath_out = posixpath.join(cache_files_urlpath, f"{root}{ext}")
if is_in_place:
urlpath_out = urlpath_in
else:
root = f"{fs_in.checksum(urlpath_in):x}"
ext = pathlib.Path(urlpath_in).suffix
urlpath_out = posixpath.join(cache_files_urlpath, f"{root}{ext}")
else:
root = hashlib.md5(f"{hash(obj)}".encode()).hexdigest() # fsspec uses md5
urlpath_out = posixpath.join(cache_files_urlpath, root)

if is_in_place:
fs_out = fs_in
else:
fs_out, *_ = fsspec.get_fs_token_paths(
cache_files_urlpath,
storage_options=settings.cache_files_storage_options,
)

with utils.FileLock(
fs_out, urlpath_out, timeout=settings.lock_timeout
) as file_exists:
if not file_exists:
if not (file_exists or is_in_place):
if urlpath_in:
_store_file_object(fs_in, urlpath_in, fs_out, urlpath_out)
else:
14 changes: 14 additions & 0 deletions tests/test_50_io_encoder.py
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@
from typing import Any

import fsspec
import fsspec.implementations.local
import pytest
import pytest_httpserver
import pytest_structlog
@@ -235,3 +236,16 @@ def test_io_logging(
},
]
assert log.events == expected


def test_io_in_place_file(tmp_path: pathlib.Path) -> None:
@cache.cacheable
def cached_in_place_open(path: str) -> io.FileIO:
return extra_encoders.InPlaceFile(path, "rb")

tmpfile = tmp_path / "test.txt"
fsspec.filesystem("file").touch(tmpfile)

output = cached_in_place_open(str(tmpfile))
assert isinstance(output, fsspec.implementations.local.LocalFileOpener)
assert output.name == str(tmpfile)
30 changes: 29 additions & 1 deletion tests/test_60_clean.py
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@
import os
import pathlib
import time
from typing import Any, Literal
from typing import Any, BinaryIO, Literal

import fsspec
import pydantic
@@ -411,3 +411,31 @@ def test_clean_multiple_urlpaths(tmp_path: pathlib.Path, use_database: bool) ->
clean.clean_cache_files(maxsize=0, use_database=use_database, depth=2)
assert not cached_file1.exists()
assert cached_file2.exists()


def test_clean_duplicates(tmp_path: pathlib.Path) -> None:
con = config.get().engine.raw_connection()
cur = con.cursor()

# Create file
tmpfile = tmp_path / "file.txt"
fsspec.filesystem("file").pipe_file(tmpfile, ONE_BYTE)

@cache.cacheable
def func1(path: pathlib.Path) -> BinaryIO:
return path.open("rb")

@cache.cacheable
def func2(path: pathlib.Path) -> BinaryIO:
return path.open("rb")

fp1 = func1(tmpfile)
fp2 = func2(tmpfile)
assert fp1.name == fp2.name

cur.execute("SELECT COUNT(*) FROM cache_entries", ())
assert cur.fetchone() == (2,)

clean.clean_cache_files(maxsize=0)
cur.execute("SELECT COUNT(*) FROM cache_entries", ())
assert cur.fetchone() == (0,)