Skip to content

Add local disk cache for remote data chunks #99

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 52 commits into from
Apr 1, 2025
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
9c95a15
Update Ansible deployment to allow for unprivileged deployment using …
maxstack Jan 17, 2025
c98d16a
WIP: cached crate experiments
sd109 Jan 24, 2025
99e61ba
Add Bytes serde feature
sd109 Jan 24, 2025
df2ed74
Functional disk cache implementation
sd109 Jan 24, 2025
0d70386
Add prometheus counter for cache misses
sd109 Jan 24, 2025
f809321
Ensure client ID is included in chunk cache lookup key
sd109 Jan 24, 2025
d9dfd2d
Refactor to allow reinstating resource manager
sd109 Feb 7, 2025
528d532
Update the podman installation play to enable linger on the unprivile…
maxstack Feb 13, 2025
f3b994d
Allow Prometheus and HAProxy to be installed as the non-privileged user.
maxstack Feb 14, 2025
8fb4a95
Implement an object chunk cache on disk where we can configure:
maxstack Feb 27, 2025
2882c47
Refactor the chunk cache so it stores each chunk in a separate file.
Mar 4, 2025
a414f0c
Add example of simplified cache implementation
sd109 Mar 4, 2025
3e6ed6d
Add SimpleChunkCache pruning, with tests, and integrate with the Redu…
maxstack Mar 10, 2025
37f0c95
Remove unneeded dependencies, added during development of the caching…
maxstack Mar 10, 2025
4715e3e
Provide a S3 download handler and a cached S3 download handler with t…
maxstack Mar 11, 2025
5d408c8
Panic if the cache directory already exists.
maxstack Mar 11, 2025
c549b29
Remove test portion relating to cache wipe on init.
maxstack Mar 11, 2025
979d8a3
Channel all cache commits through a mpsc channel to serialise commits…
maxstack Mar 13, 2025
04e1f55
Allow configuration of the pruning interval, in seconds, of cache chu…
maxstack Mar 13, 2025
91eb805
Make "cargo clippy" happy.
maxstack Mar 13, 2025
c405c84
Merge remote-tracking branch 'origin/feat/disk-cache' into deployment…
maxstack Mar 13, 2025
4cb4ae8
Configure chunk cache test deployment.
maxstack Mar 13, 2025
093e9e4
Deploy a local build of the Reductionist.
maxstack Mar 13, 2025
ec5464a
Update Reductionist tag.
maxstack Mar 13, 2025
416cce3
Tweak what I think I think is needed to deploy our local build.
maxstack Mar 13, 2025
6fa2203
Correct the volume setup for the chunk cache.
maxstack Mar 13, 2025
46a3162
Tweak Reductionist deployment in group vars.
maxstack Mar 14, 2025
4b34f33
Tweak the Reductionist's ansible build so we can:
maxstack Mar 14, 2025
cfcbaf3
Allow the tokio mpsc buffer size to be configured, the number of chun…
maxstack Mar 14, 2025
0486393
Merge branch 'feat/disk-cache' into deployment/test
maxstack Mar 14, 2025
07e5e60
Add the chunk cache queue size to the group_vars.
maxstack Mar 14, 2025
15e3e60
Merge branch 'deployment/test' of https://github.com/stackhpc/reducti…
maxstack Mar 14, 2025
53d2a4e
Print out our command line args.
maxstack Mar 14, 2025
988e196
Add "headroom_bytes" to the pruning so we can ensure this many bytes …
maxstack Mar 17, 2025
a00c942
Tidy up adding code documentation.
maxstack Mar 18, 2025
72fc70a
Change over to tempfile::TempDir for temporary directory creation due…
maxstack Mar 19, 2025
9e4d08a
Change boolean assert_eq to assert to keep clippy happy.
maxstack Mar 19, 2025
4902ae1
Remove dupplicated cache error.
maxstack Mar 19, 2025
b20e193
Debug trait no longer needed on ResourceManager.
maxstack Mar 19, 2025
36a655e
Merge remote-tracking branch 'origin/main' into feat/disk-cache
maxstack Mar 19, 2025
cd6876b
Add chunk cache configuration to group_vars/all.
maxstack Mar 19, 2025
f1659c8
Fix for group_vars/all
maxstack Mar 19, 2025
9a7023e
More sensible default for the cache path that works with our without …
maxstack Mar 19, 2025
063a084
Misc fixes
sd109 Mar 24, 2025
e0c911b
Re-adopt an existing cache.
maxstack Mar 25, 2025
71b18de
Fix broken resource manager memory permits implementation
sd109 Mar 26, 2025
1bad961
Add S3 client auth check to cached object
sd109 Mar 27, 2025
195e663
Run compliance suite with cache enabled as part of CI test suite
sd109 Mar 27, 2025
b7f7201
Add chunk_cache_bypass_auth flag
sd109 Mar 27, 2025
d394384
Merge branch 'main' into feat/disk-cache
sd109 Mar 28, 2025
06286b2
Update documentation inline with the chunk cache additions
maxstack Mar 31, 2025
43fa08f
Fix minor typos
sd109 Apr 1, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
278 changes: 271 additions & 7 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ aws-smithy-types = "1.2"
aws-types = "1.3"
axum = { version = "0.6", features = ["headers"] }
axum-server = { version = "0.4.7", features = ["tls-rustls"] }
byte-unit = "5.1.6"
bytes = { version = "1.9.0", features = ["serde"] }
clap = { version = "~4.5", features = ["derive", "env"] }
expanduser = "1.2.2"
flate2 = "1.0"
Expand All @@ -42,6 +44,7 @@ http = "1.1"
hyper = { version = "0.14", features = ["full"] }
lazy_static = "1.5"
maligned = "0.2.1"
md5 = "0.7.0"
mime = "0.3"
ndarray = "0.15"
ndarray-stats = "0.5"
Expand All @@ -65,6 +68,7 @@ tracing = "0.1"
tracing-opentelemetry = "0.21"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
url = { version = "2", features = ["serde"] }
uuid = { version = "1.12.1", features = ["v4"] }
validator = { version = "0.16", features = ["derive"] }
zerocopy = { version = "0.6.1", features = ["alloc", "simd"] }
zune-inflate = "0.2.54"
Expand Down
135 changes: 112 additions & 23 deletions src/app.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
//! Active Storage server API

use crate::chunk_cache::ChunkCache;
use crate::cli::CommandLineArgs;
use crate::error::ActiveStorageError;
use crate::filter_pipeline;
use crate::metrics::{metrics_handler, track_metrics};
use crate::metrics::{metrics_handler, track_metrics, LOCAL_CACHE_MISSES};
use crate::models;
use crate::operation;
use crate::operations;
Expand All @@ -14,17 +15,16 @@ use crate::validated_json::ValidatedJson;

use axum::middleware;
use axum::{
body::Bytes,
extract::{Path, State},
headers::authorization::{Authorization, Basic},
http::header,
response::{IntoResponse, Response},
routing::{get, post},
Router, TypedHeader,
};
use bytes::Bytes;

use std::sync::Arc;
use tokio::sync::SemaphorePermit;
use tower::Layer;
use tower::ServiceBuilder;
use tower_http::normalize_path::NormalizePathLayer;
Expand Down Expand Up @@ -56,6 +56,9 @@ struct AppState {

/// Resource manager.
resource_manager: ResourceManager,

/// Object chunk cache
chunk_cache: Option<ChunkCache>,
}

impl AppState {
Expand All @@ -64,10 +67,17 @@ impl AppState {
let task_limit = args.thread_limit.or_else(|| Some(num_cpus::get() - 1));
let resource_manager =
ResourceManager::new(args.s3_connection_limit, args.memory_limit, task_limit);
let chunk_cache = if args.use_chunk_cache {
Some(ChunkCache::new(args))
} else {
None
};

Self {
args: args.clone(),
s3_client_map: s3_client::S3ClientMap::new(),
resource_manager,
chunk_cache,
}
}
}
Expand Down Expand Up @@ -167,27 +177,96 @@ async fn schema() -> &'static str {
///
/// * `client`: S3 client object
/// * `request_data`: RequestData object for the request
#[tracing::instrument(
level = "DEBUG",
skip(client, request_data, resource_manager, mem_permits)
)]
async fn download_object<'a>(
/// * `resource_manager`: ResourceManager object
async fn download_s3_object<'a>(
client: &s3_client::S3Client,
request_data: &models::RequestData,
resource_manager: &'a ResourceManager,
) -> Result<Bytes, ActiveStorageError> {

// If we're given a size in the request data then use this to
// get an initial guess at the required memory resources.
let memory = request_data.size.unwrap_or(0);
let mut mem_permits = resource_manager.memory(memory).await?;

let range = s3_client::get_range(request_data.offset, request_data.size);
let _conn_permits = resource_manager.s3_connection().await?;

let data = client
.download_object(
&request_data.bucket,
&request_data.object,
range,
resource_manager,
&mut mem_permits,
)
.await;

data
}

/// Download and cache an object from S3
///
/// Requests a byte range if `offset` or `size` is specified in the request.
///
/// # Arguments
///
/// * `client`: S3 client object
/// * `request_data`: RequestData object for the request
/// * `resource_manager`: ResourceManager object
/// * `chunk_cache`: ChunkCache object
async fn download_and_cache_s3_object<'a>(
client: &s3_client::S3Client,
request_data: &models::RequestData,
resource_manager: &'a ResourceManager,
mem_permits: &mut Option<SemaphorePermit<'a>>,
chunk_cache: &ChunkCache,
) -> Result<Bytes, ActiveStorageError> {

let key = format!("{},{:?}", client, request_data);

match chunk_cache.get(&key).await {
Ok(value) => {
if let Some(bytes) = value {
return Ok(bytes);
}
},
Err(e) => {
return Err(e);
}
}

// If we're given a size in the request data then use this to
// get an initial guess at the required memory resources.
let memory = request_data.size.unwrap_or(0);
let mut mem_permits = resource_manager.memory(memory).await?;

let range = s3_client::get_range(request_data.offset, request_data.size);
let _conn_permits = resource_manager.s3_connection().await?;
client

let data = client
.download_object(
&request_data.bucket,
&request_data.object,
range,
resource_manager,
mem_permits,
&mut mem_permits,
)
.await
.await;

if let Ok(data_bytes) = &data {
// Store the data against this key if the chunk cache is enabled.
match chunk_cache.set(&key, data_bytes.clone()).await {
Ok(_) => {},
Err(e) => {
return Err(e);
}
}
}

// Increment the prometheus metric for cache misses
LOCAL_CACHE_MISSES.with_label_values(&["disk"]).inc();

data
}

/// Handler for Active Storage operations
Expand All @@ -209,8 +288,6 @@ async fn operation_handler<T: operation::Operation>(
auth: Option<TypedHeader<Authorization<Basic>>>,
ValidatedJson(request_data): ValidatedJson<models::RequestData>,
) -> Result<models::Response, ActiveStorageError> {
let memory = request_data.size.unwrap_or(0);
let mut _mem_permits = state.resource_manager.memory(memory).await?;
let credentials = if let Some(TypedHeader(auth)) = auth {
s3_client::S3Credentials::access_key(auth.username(), auth.password())
} else {
Expand All @@ -221,15 +298,27 @@ async fn operation_handler<T: operation::Operation>(
.get(&request_data.source, credentials)
.instrument(tracing::Span::current())
.await;
let data = download_object(
&s3_client,
&request_data,
&state.resource_manager,
&mut _mem_permits,
)
.instrument(tracing::Span::current())
.await?;
// All remaining work is synchronous. If the use_rayon argument was specified, delegate to the

let data = if state.args.use_chunk_cache {
download_and_cache_s3_object(
&s3_client,
&request_data,
&state.resource_manager,
&state.chunk_cache.as_ref().unwrap(),
)
.instrument(tracing::Span::current())
.await?
} else {
download_s3_object(
&s3_client,
&request_data,
&state.resource_manager,
)
.instrument(tracing::Span::current())
.await?
};

// All remaining work i s synchronous. If the use_rayon argument was specified, delegate to the
// Rayon thread pool. Otherwise, execute as normal using Tokio.
if state.args.use_rayon {
tokio_rayon::spawn(move || operation::<T>(request_data, data)).await
Expand Down
Loading