-
Notifications
You must be signed in to change notification settings - Fork 37
feat: consolidate image and bundle caches into LocalDB #790
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
8ba6688
144d85a
3a0acff
e0b54ba
bd0bfcf
65f9636
768cd77
72cfaf7
0fcad58
729ac80
a0d3b07
7b81bef
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,11 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import asyncio | ||
| import hashlib | ||
| import json | ||
| import random | ||
| import sqlite3 | ||
| import time | ||
| import typing | ||
| from importlib.metadata import entry_points | ||
| from typing import TYPE_CHECKING, ClassVar, Dict, Optional, Tuple | ||
|
|
@@ -13,8 +17,11 @@ | |
| from flyte._image import Architecture, Image | ||
| from flyte._initialize import _get_init_config | ||
| from flyte._logging import logger | ||
| from flyte._persistence._db import LocalDB | ||
| from flyte._status import status | ||
|
|
||
| _IMAGE_CACHE_TTL_DAYS = 30 | ||
|
|
||
| if TYPE_CHECKING: | ||
| from flyte._build import ImageBuild | ||
|
|
||
|
|
@@ -36,7 +43,15 @@ class ImageChecker(Protocol): | |
| @classmethod | ||
| async def image_exists( | ||
| cls, repository: str, tag: str, arch: Tuple[Architecture, ...] = ("linux/amd64",) | ||
| ) -> Optional[str]: ... | ||
| ) -> Optional[str]: | ||
| """ | ||
| Check whether an image exists in a registry or cache. | ||
|
|
||
| Returns the image URI if found, or None if the image definitively does not exist. | ||
| Raise an exception if existence cannot be determined (e.g. cache miss, network failure) | ||
| so the next checker in the chain gets a chance. | ||
| """ | ||
| ... | ||
|
|
||
|
|
||
| class DockerAPIImageChecker(ImageChecker): | ||
|
|
@@ -93,6 +108,65 @@ async def image_exists( | |
| return None | ||
|
|
||
|
|
||
| def _cache_key(repository: str, tag: str, arch: Tuple[str, ...]) -> str: | ||
| """Return a stable cache key for an image, scoped to the current endpoint/project/domain.""" | ||
| from flyte._persistence._db import _cache_scope | ||
|
|
||
| raw = f"{_cache_scope()}:{repository}:{tag}:{','.join(sorted(arch))}" | ||
| return hashlib.sha256(raw.encode()).hexdigest() | ||
|
|
||
|
|
||
| def _read_image_cache(repository: str, tag: str, arch: Tuple[str, ...]) -> Optional[str]: | ||
| """Look up a previously verified image URI by repository, tag, and arch. Returns image_uri or None.""" | ||
| try: | ||
| conn = LocalDB.get_sync() | ||
| cutoff = time.time() - _IMAGE_CACHE_TTL_DAYS * 86400 | ||
| row = conn.execute( | ||
| "SELECT image_uri FROM image_cache WHERE key = ? AND created_at > ?", | ||
| (_cache_key(repository, tag, arch), cutoff), | ||
| ).fetchone() | ||
| # Prune expired entries ~5% of the time to avoid doing it on every read | ||
| if random.random() < 0.05: | ||
| with LocalDB._write_lock: | ||
| conn.execute("DELETE FROM image_cache WHERE created_at <= ?", (cutoff,)) | ||
| conn.commit() | ||
| if row: | ||
| return row[0] | ||
| except (OSError, sqlite3.Error) as e: | ||
| logger.debug(f"Failed to read image cache: {e}") | ||
| return None | ||
|
|
||
|
|
||
| def _write_image_cache(repository: str, tag: str, arch: Tuple[str, ...], image_uri: str) -> None: | ||
| """Persist a verified image URI to the SQLite cache.""" | ||
| try: | ||
| conn = LocalDB.get_sync() | ||
| with LocalDB._write_lock: | ||
| conn.execute( | ||
| "INSERT OR REPLACE INTO image_cache (key, image_uri, created_at) VALUES (?, ?, ?)", | ||
| (_cache_key(repository, tag, arch), image_uri, time.time()), | ||
| ) | ||
| conn.commit() | ||
| except (OSError, sqlite3.Error) as e: | ||
| logger.debug(f"Failed to write image cache: {e}") | ||
|
|
||
|
|
||
| class PersistentCacheImageChecker(ImageChecker): | ||
| """Check if image was previously verified and cached in SQLite (~0ms).""" | ||
|
|
||
| @classmethod | ||
| async def image_exists( | ||
| cls, repository: str, tag: str, arch: Tuple[Architecture, ...] = ("linux/amd64",) | ||
| ) -> Optional[str]: | ||
| uri = _read_image_cache(repository, tag, arch) | ||
| if uri: | ||
| logger.debug(f"Image {uri} found in persistent cache") | ||
| return uri | ||
| # Cache miss — raise so the next checker in the chain gets a chance. | ||
| # Returning None would mean "image definitely doesn't exist". | ||
| raise LookupError(f"Image {repository}:{tag} not found in persistent cache") | ||
|
|
||
|
|
||
| class LocalDockerCommandImageChecker(ImageChecker): | ||
| command_name: ClassVar[str] = "docker" | ||
|
|
||
|
|
@@ -174,12 +248,17 @@ async def image_exists(image: Image) -> Optional[str]: | |
| image_uri = await checker.image_exists(repository, tag, tuple(image.platform)) | ||
| if image_uri: | ||
| logger.debug(f"Image {image_uri} in registry") | ||
| return image_uri | ||
| # Persist to disk so future process invocations skip network checks | ||
| if checker is not PersistentCacheImageChecker: | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One of the problem is the persistent cache never invalidates. If an image is deleted from the registry, the cache will still say it exists, and the build will be skipped
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. but it makes UX way more better
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can solve this, by just having a very short TTL on the cache. this is why i am suggesting using sqlite. Anyways the data is tiny and one row is enough?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Short TTL sounds good to me. I'll update it to use sqlite
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. Add TTL and cache for code bundle. |
||
| _write_image_cache(repository, tag, tuple(image.platform), image_uri) | ||
| return image_uri | ||
| # Checker ran successfully and returned None — image not found | ||
| return None | ||
| except Exception as e: | ||
| logger.debug(f"Error checking image existence with {checker.__name__}: {e}") | ||
| continue | ||
|
|
||
| # If all checkers fail, then assume the image exists. This is current flytekit behavior | ||
| # All checkers raised exceptions (e.g. network failures) — assume image exists | ||
| status.info(f"All checkers failed to check existence of {image.uri}, assuming it exists") | ||
| return image.uri | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should not crash if database not found