Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions icechunk-python/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,7 @@ impl From<&PyCachingConfig> for CachingConfig {
num_transaction_changes: value.num_transaction_changes,
num_bytes_attributes: value.num_bytes_attributes,
num_bytes_chunks: value.num_bytes_chunks,
backend: Default::default(),
}
}
}
Expand Down
226 changes: 144 additions & 82 deletions icechunk/src/asset_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use async_stream::try_stream;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::{Stream, TryStreamExt};
use quick_cache::{Weighter, sync::Cache};
use serde::{Deserialize, Serialize};
use std::{
io::{BufReader, Read},
Expand All @@ -14,6 +13,7 @@ use tracing::{Span, debug, instrument, trace, warn};

use crate::{
Storage,
cache::{CacheBackend, EphemeralCache, NoOpCache, StorageCache},
config::CachingConfig,
format::{
ChunkId, ChunkOffset, IcechunkFormatErrorKind, ManifestId, SnapshotId,
Expand Down Expand Up @@ -42,6 +42,7 @@ pub struct AssetManager {
num_bytes_attributes: u64,
num_bytes_chunks: u64,
compression_level: u8,
cache_backend: CacheBackend,

max_concurrent_requests: u16,

Expand All @@ -51,13 +52,13 @@ pub struct AssetManager {
snapshot_cache_size_warned: AtomicBool,

#[serde(skip)]
snapshot_cache: Cache<SnapshotId, Arc<Snapshot>, FileWeighter>,
snapshot_cache: Arc<dyn StorageCache<SnapshotId, Arc<Snapshot>>>,
#[serde(skip)]
manifest_cache: Cache<ManifestId, Arc<Manifest>, FileWeighter>,
manifest_cache: Arc<dyn StorageCache<ManifestId, Arc<Manifest>>>,
#[serde(skip)]
transactions_cache: Cache<SnapshotId, Arc<TransactionLog>, FileWeighter>,
transactions_cache: Arc<dyn StorageCache<SnapshotId, Arc<TransactionLog>>>,
#[serde(skip)]
chunk_cache: Cache<(ChunkId, Range<ChunkOffset>), Bytes, FileWeighter>,
chunk_cache: Arc<dyn StorageCache<(ChunkId, Range<ChunkOffset>), Bytes>>,

#[serde(skip)]
request_semaphore: Semaphore,
Expand All @@ -75,6 +76,8 @@ struct AssetManagerSerializer {
num_bytes_attributes: u64,
num_bytes_chunks: u64,
compression_level: u8,
#[serde(default)]
cache_backend: CacheBackend,
max_concurrent_requests: u16,
}

Expand All @@ -90,6 +93,7 @@ impl From<AssetManagerSerializer> for AssetManager {
value.num_bytes_chunks,
value.compression_level,
value.max_concurrent_requests,
value.cache_backend,
)
}
}
Expand All @@ -106,25 +110,39 @@ impl AssetManager {
num_bytes_chunks: u64,
compression_level: u8,
max_concurrent_requests: u16,
cache_backend: CacheBackend,
) -> Self {
// Create caches based on backend type
let (snapshot_cache, manifest_cache, transactions_cache, chunk_cache) = match cache_backend {
CacheBackend::Ephemeral => (
Arc::new(EphemeralCache::new(num_snapshot_nodes)) as Arc<dyn StorageCache<SnapshotId, Arc<Snapshot>>>,
Arc::new(EphemeralCache::new(num_chunk_refs)) as Arc<dyn StorageCache<ManifestId, Arc<Manifest>>>,
Arc::new(EphemeralCache::new(num_transaction_changes)) as Arc<dyn StorageCache<SnapshotId, Arc<TransactionLog>>>,
Arc::new(EphemeralCache::new(num_bytes_chunks)) as Arc<dyn StorageCache<(ChunkId, Range<ChunkOffset>), Bytes>>,
),
CacheBackend::NoOp => (
Arc::new(NoOpCache::new()) as Arc<dyn StorageCache<SnapshotId, Arc<Snapshot>>>,
Arc::new(NoOpCache::new()) as Arc<dyn StorageCache<ManifestId, Arc<Manifest>>>,
Arc::new(NoOpCache::new()) as Arc<dyn StorageCache<SnapshotId, Arc<TransactionLog>>>,
Arc::new(NoOpCache::new()) as Arc<dyn StorageCache<(ChunkId, Range<ChunkOffset>), Bytes>>,
),
};

Self {
num_snapshot_nodes,
num_chunk_refs,
num_transaction_changes,
num_bytes_attributes,
num_bytes_chunks,
compression_level,
cache_backend,
max_concurrent_requests,
storage,
storage_settings,
snapshot_cache: Cache::with_weighter(1, num_snapshot_nodes, FileWeighter),
manifest_cache: Cache::with_weighter(1, num_chunk_refs, FileWeighter),
transactions_cache: Cache::with_weighter(
0,
num_transaction_changes,
FileWeighter,
),
chunk_cache: Cache::with_weighter(0, num_bytes_chunks, FileWeighter),
snapshot_cache,
manifest_cache,
transactions_cache,
chunk_cache,
snapshot_cache_size_warned: AtomicBool::new(false),
manifest_cache_size_warned: AtomicBool::new(false),
request_semaphore: Semaphore::new(max_concurrent_requests as usize),
Expand All @@ -147,6 +165,7 @@ impl AssetManager {
0,
compression_level,
max_concurrent_requests,
CacheBackend::NoOp,
)
}

Expand All @@ -167,6 +186,7 @@ impl AssetManager {
config.num_bytes_chunks(),
compression_level,
max_concurrent_requests,
config.backend(),
)
}

Expand Down Expand Up @@ -208,22 +228,40 @@ impl AssetManager {
manifest_id: &ManifestId,
manifest_size: u64,
) -> RepositoryResult<Arc<Manifest>> {
match self.manifest_cache.get_value_or_guard_async(manifest_id).await {
Ok(manifest) => Ok(manifest),
Err(guard) => {
let storage = Arc::clone(&self.storage);
let settings = self.storage_settings.clone();
let semaphore = &self.request_semaphore;
let manifest_id_clone = manifest_id.clone();

let manifest = self.manifest_cache.get_or_fetch(
manifest_id.clone(),
Box::pin(async move {
let manifest = fetch_manifest(
manifest_id,
&manifest_id_clone,
manifest_size,
self.storage.as_ref(),
&self.storage_settings,
&self.request_semaphore,
storage.as_ref(),
&settings,
semaphore,
)
.await?;
self.warn_if_manifest_cache_small(manifest.as_ref());
let _fail_is_ok = guard.insert(Arc::clone(&manifest));
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
Ok(manifest)
})
)
.await
// If we got an error from the cache, it's a RepositoryError already
.map_err(|e| {
// Downcast the error back to RepositoryError if possible
match e.downcast::<RepositoryError>() {
Ok(repo_err) => *repo_err,
Err(other_err) => RepositoryError::new(RepositoryErrorKind::StorageError(
crate::storage::StorageErrorKind::Other(other_err.to_string())
)),
}
}
})?;

self.warn_if_manifest_cache_small(manifest.as_ref());
Ok(manifest)
}

fn warn_if_manifest_cache_small(&self, manifest: &Manifest) {
Expand Down Expand Up @@ -288,21 +326,37 @@ impl AssetManager {
&self,
snapshot_id: &SnapshotId,
) -> RepositoryResult<Arc<Snapshot>> {
match self.snapshot_cache.get_value_or_guard_async(snapshot_id).await {
Ok(snapshot) => Ok(snapshot),
Err(guard) => {
let storage = Arc::clone(&self.storage);
let settings = self.storage_settings.clone();
let semaphore = &self.request_semaphore;
let snapshot_id_clone = snapshot_id.clone();

let snapshot = self.snapshot_cache.get_or_fetch(
snapshot_id.clone(),
Box::pin(async move {
let snapshot = fetch_snapshot(
snapshot_id,
self.storage.as_ref(),
&self.storage_settings,
&self.request_semaphore,
&snapshot_id_clone,
storage.as_ref(),
&settings,
semaphore,
)
.await?;
self.warn_if_snapshot_cache_small(snapshot.as_ref());
let _fail_is_ok = guard.insert(Arc::clone(&snapshot));
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
Ok(snapshot)
})
)
.await
.map_err(|e| {
match e.downcast::<RepositoryError>() {
Ok(repo_err) => *repo_err,
Err(other_err) => RepositoryError::new(RepositoryErrorKind::StorageError(
crate::storage::StorageErrorKind::Other(other_err.to_string())
)),
}
}
})?;

self.warn_if_snapshot_cache_small(snapshot.as_ref());
Ok(snapshot)
}

#[instrument(skip(self, log))]
Expand Down Expand Up @@ -330,20 +384,36 @@ impl AssetManager {
&self,
transaction_id: &SnapshotId,
) -> RepositoryResult<Arc<TransactionLog>> {
match self.transactions_cache.get_value_or_guard_async(transaction_id).await {
Ok(transaction) => Ok(transaction),
Err(guard) => {
let storage = Arc::clone(&self.storage);
let settings = self.storage_settings.clone();
let semaphore = &self.request_semaphore;
let transaction_id_clone = transaction_id.clone();

let transaction = self.transactions_cache.get_or_fetch(
transaction_id.clone(),
Box::pin(async move {
let transaction = fetch_transaction_log(
transaction_id,
self.storage.as_ref(),
&self.storage_settings,
&self.request_semaphore,
&transaction_id_clone,
storage.as_ref(),
&settings,
semaphore,
)
.await?;
let _fail_is_ok = guard.insert(Arc::clone(&transaction));
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
Ok(transaction)
})
)
.await
.map_err(|e| {
match e.downcast::<RepositoryError>() {
Ok(repo_err) => *repo_err,
Err(other_err) => RepositoryError::new(RepositoryErrorKind::StorageError(
crate::storage::StorageErrorKind::Other(other_err.to_string())
)),
}
}
})?;

Ok(transaction)
}

#[instrument(skip(self, bytes))]
Expand All @@ -365,20 +435,37 @@ impl AssetManager {
range: &Range<ChunkOffset>,
) -> RepositoryResult<Bytes> {
let key = (chunk_id.clone(), range.clone());
match self.chunk_cache.get_value_or_guard_async(&key).await {
Ok(chunk) => Ok(chunk),
Err(guard) => {
trace!(%chunk_id, ?range, "Downloading chunk");
let permit = self.request_semaphore.acquire().await?;
let chunk = self
.storage
.fetch_chunk(&self.storage_settings, chunk_id, range)
.await?;
let storage = Arc::clone(&self.storage);
let settings = self.storage_settings.clone();
let semaphore = &self.request_semaphore;
let chunk_id_clone = chunk_id.clone();
let range_clone = range.clone();

let chunk = self.chunk_cache.get_or_fetch(
key,
Box::pin(async move {
trace!(%chunk_id_clone, ?range_clone, "Downloading chunk");
let permit = semaphore.acquire().await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
let chunk = storage
.fetch_chunk(&settings, &chunk_id_clone, &range_clone)
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
drop(permit);
let _fail_is_ok = guard.insert(chunk.clone());
Ok(chunk)
})
)
.await
.map_err(|e| {
match e.downcast::<RepositoryError>() {
Ok(repo_err) => *repo_err,
Err(other_err) => RepositoryError::new(RepositoryErrorKind::StorageError(
crate::storage::StorageErrorKind::Other(other_err.to_string())
)),
}
}
})?;

Ok(chunk)
}

/// Returns the sequence of parents of the current session, in order of latest first.
Expand Down Expand Up @@ -773,32 +860,6 @@ async fn fetch_transaction_log(
.map(Arc::new)
}

#[derive(Debug, Clone)]
struct FileWeighter;

impl Weighter<ManifestId, Arc<Manifest>> for FileWeighter {
fn weight(&self, _: &ManifestId, val: &Arc<Manifest>) -> u64 {
val.len() as u64
}
}

impl Weighter<SnapshotId, Arc<Snapshot>> for FileWeighter {
fn weight(&self, _: &SnapshotId, val: &Arc<Snapshot>) -> u64 {
val.len() as u64
}
}

impl Weighter<(ChunkId, Range<ChunkOffset>), Bytes> for FileWeighter {
fn weight(&self, _: &(ChunkId, Range<ChunkOffset>), val: &Bytes) -> u64 {
val.len() as u64
}
}

impl Weighter<SnapshotId, Arc<TransactionLog>> for FileWeighter {
fn weight(&self, _: &SnapshotId, val: &Arc<TransactionLog>) -> u64 {
val.len() as u64
}
}

#[cfg(test)]
#[allow(clippy::panic, clippy::unwrap_used, clippy::expect_used)]
Expand Down Expand Up @@ -937,6 +998,7 @@ mod test {
num_transaction_changes: Some(0),
num_bytes_attributes: Some(0),
num_bytes_chunks: Some(0),
backend: crate::cache::CacheBackend::Ephemeral,
},
1,
100,
Expand Down
Loading
Loading