Skip to content

Draft Fix chunk cache key and metadata retrieval #104

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions deployment/group_vars/all
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ reductionist_env:
REDUCTIONIST_CHUNK_CACHE_PRUNE_INTERVAL: "3600"
REDUCTIONIST_CHUNK_CACHE_SIZE_LIMIT: "10GB"
REDUCTIONIST_CHUNK_CACHE_QUEUE_SIZE: "32"
REDUCTIONIST_CHUNK_CACHE_KEY: "%source-%bucket-%object-%offset-%size"
REDUCTIONIST_CHUNK_CACHE_BYPASS_AUTH: "false"
# Path to certificates directory on remote host.
reductionist_remote_certs_path: "{{ ansible_facts.env.HOME }}/certs"
Expand Down
36 changes: 36 additions & 0 deletions docs/deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,48 @@ The ``reductionist_env`` parameter allows configuration of the environment varia
| REDUCTIONIST_CHUNK_CACHE_PRUNE_INTERVAL | Time in seconds between periodic pruning of the cache |
| REDUCTIONIST_CHUNK_CACHE_SIZE_LIMIT | Maximum cache size, i.e. "100GB" |
| REDUCTIONIST_CHUNK_CACHE_QUEUE_SIZE | Tokio MPSC buffer size used to queue downloaded objects between the asynchronous web engine and the synchronous cache |
| REDUCTIONIST_CHUNK_CACHE_KEY | Overrides the key format used to uniquely identify a cached chunk, see section below |
| REDUCTIONIST_CHUNK_CACHE_BYPASS_AUTH | Allow bypassing of S3 authentication when accessing cached data |


Note, after changing any of the above parameters the Reductionist must be deployed, or redeployed, using the ansible playbook for the change to take effect.
The idempotent nature of ansible necessitates that if redeploying then a running Reductionist container must be removed first.

### Chunk Cache Key

This defines the name of the key which should uniquely identify a downloaded chunk.
The default value is "%source-%bucket-%object-%offset-%size". All the parameters used here would be used in the API call to download an S3 object and so should uniquely identify an object.
The assumption is made that the object on the S3 data store doesn't change, i.e. replaced using different compression.

* Use insufficient parameters to uniquely identify a chunk and a request may be served with a cached chunk containing the wrong data
* Use too many parameters, unnecessary ones, and we're missing out on cache hits

#### Authenticating Cached Chunks

The original request to download data from S3 will be authenticated. Data cached from this request is likely subject to authentication aswell, to ensure a different Reductionist client can't read private data via the cache.
The Reductionist, by default, authenticates client requests against the S3 object store before serving cached chunks. This could have a performance impact due to the latency of the S3 authentication API call.

One option is to bypass the cache authentication in the Reductionist configuration, leaving the cache unauthenticated but potentially yielding a performance boost.

Another option is to incorporate the credentials of the original requestor into the cache key, so only they can retrieve the cached chunk.
The key name, once constructed from parameters, is [MD5](https://en.wikipedia.org/wiki/MD5) encoded so credentials aren't exposed via the chunk cache filesystem.

#### Chunk Cache Key Tokens Available

| Token | Description |
| - | - |
| `%source` | Source URL for S3 data store |
| `%bucket` | S3 bucket |
| `%object` | Object key |
| `%offset` | Offset of data byte range |
| `%size` | Size of data byte range |
| `%dtype` | Data type |
| `%byte_order` | Byte order of data |
| `%compression` | Type of compression used on data |
| `%auth` | Client credentials |

Where request parameters are optional, so may not be present in all requests, their tokens will always be usable with null values constructing the cache key.

## Usage

Once deployed, the Reductionist API is accessible on port 8080 by HAProxy. The Prometheus UI is accessible on port 9090 on the host running Prometheus. The Jaeger UI is accessible on port 16686 on the host running Jaeger.
41 changes: 26 additions & 15 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ async fn schema() -> &'static str {
/// * `client`: S3 client object
/// * `request_data`: RequestData object for the request
/// * `resource_manager`: ResourceManager object
/// * `mem_permits`: Memory permits for the request
#[tracing::instrument(level = "DEBUG", skip(client, request_data, resource_manager))]
async fn download_s3_object<'a>(
client: &s3_client::S3Client,
Expand Down Expand Up @@ -221,7 +222,10 @@ async fn download_s3_object<'a>(
/// * `client`: S3 client object
/// * `request_data`: RequestData object for the request
/// * `resource_manager`: ResourceManager object
/// * `mem_permits`: Memory permits for the request
/// * `chunk_cache`: ChunkCache object
/// * `chunk_cache_key`: Key template used for naming cache file entries
/// * `allow_cache_auth_bypass`: Whether to allow bypassing S3 auth checks
#[tracing::instrument(
level = "DEBUG",
skip(client, request_data, resource_manager, mem_permits, chunk_cache)
Expand All @@ -232,22 +236,28 @@ async fn download_and_cache_s3_object<'a>(
resource_manager: &'a ResourceManager,
mut mem_permits: Option<SemaphorePermit<'a>>,
chunk_cache: &ChunkCache,
chunk_cache_key: &str,
allow_cache_auth_bypass: bool,
) -> Result<Bytes, ActiveStorageError> {
// We chose a cache key such that any changes to request data
// which may feasibly indicate a change to the upstream object
// lead to a new cache key.
let key = format!(
"{}-{}-{}-{}-{:?}-{:?}",
request_data.source.as_str(),
request_data.bucket,
request_data.object,
request_data.dtype,
request_data.byte_order,
request_data.compression,
);

if let Some(metadata) = chunk_cache.get_metadata(&key).await {
// The default chunk key is "%source-%bucket-%object-%offset-%size"
// which is using the same parameters provided to an S3 object download.
// It assumes the data of the underlying object store remains unchanged.
let key = chunk_cache_key.to_string();
let key = key
.replace("%source", request_data.source.as_str())
.replace("%bucket", &request_data.bucket)
.replace("%object", &request_data.object)
.replace("%offset", &format!("{:?}", request_data.offset))
.replace("%size", &format!("{:?}", request_data.size))
.replace("%dtype", &format!("{}", request_data.dtype))
.replace("%byte_order", &format!("{:?}", request_data.byte_order))
.replace("%compression", &format!("{:?}", request_data.compression))
.replace("%auth", &format!("{}", client));
if key.find('%').is_some() {
panic!("Invalid cache key: {}", key);
}

if let Some(metadata) = chunk_cache.get_metadata(&key).await? {
if !allow_cache_auth_bypass {
// To avoid having to include the S3 client ID as part of the cache key
// (which means we'd have a separate cache for each authorised user and
Expand Down Expand Up @@ -292,7 +302,7 @@ async fn download_and_cache_s3_object<'a>(
let data = download_s3_object(client, request_data, resource_manager, mem_permits).await?;

// Write data to cache
chunk_cache.set(&key, data.clone()).await?;
chunk_cache.set(&key, &data).await?;

// Increment the prometheus metric for cache misses
LOCAL_CACHE_MISSES.with_label_values(&["disk"]).inc();
Expand Down Expand Up @@ -352,6 +362,7 @@ async fn operation_handler<T: operation::Operation>(
&state.resource_manager,
_mem_permits,
cache,
state.args.chunk_cache_key.as_str(),
state.args.chunk_cache_bypass_auth
).await?
}
Expand Down
117 changes: 89 additions & 28 deletions src/chunk_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ struct ChunkCacheEntry {

impl ChunkCacheEntry {
/// Return a ChunkCacheEntry object
fn new(key: &str, value: Bytes) -> Self {
fn new(key: &str, value: &Bytes) -> Self {
let key = key.to_owned();
// Make sure we own the `Bytes` so we don't see unexpected, but not incorrect,
// behaviour caused by the zero copy of `Bytes`. i.e. let us choose when to copy.
let value = Bytes::copy_from_slice(&value);
let value = Bytes::copy_from_slice(value);
Self { key, value }
}
}
Expand Down Expand Up @@ -97,13 +97,13 @@ impl ChunkCache {
Self { cache, sender }
}

/// Stores chunk `Bytes` in the cache against an unique key.
/// Stores chunk `Bytes` in the cache against a unique key.
///
/// # Arguments
///
/// * `key`: Unique key identifying the chunk
/// * `value`: Chunk `Bytes` to be cached
pub async fn set(&self, key: &str, value: Bytes) -> Result<(), ActiveStorageError> {
pub async fn set(&self, key: &str, value: &Bytes) -> Result<(), ActiveStorageError> {
match self.sender.send(ChunkCacheEntry::new(key, value)).await {
Ok(_) => Ok(()),
Err(e) => Err(ActiveStorageError::ChunkCacheError {
Expand All @@ -112,17 +112,21 @@ impl ChunkCache {
}
}

/// Retrieves chunk metadata from the cache for an unique key.
/// Retrieves chunk metadata from the cache for a unique key.
///
/// # Arguments
///
/// * `key`: Unique key identifying the chunk
pub async fn get_metadata(&self, key: &str) -> Option<Metadata> {
let state = self.cache.load_state().await;
state.metadata.get(key).cloned()
pub async fn get_metadata(&self, key: &str) -> Result<Option<Metadata>, ActiveStorageError> {
match self.cache.get_metadata(key).await {
Ok(value) => Ok(value),
Err(e) => Err(ActiveStorageError::ChunkCacheError {
error: format!("{:?}", e),
}),
}
}

/// Retrieves chunk `Bytes` from the cache for an unique key.
/// Retrieves chunk `Bytes` from the cache for a unique key.
///
/// # Arguments
///
Expand Down Expand Up @@ -195,7 +199,7 @@ impl State {
}
}

/// The SimpleDiskCache takes chunks of `Bytes` data, identified by an unique key,
/// The SimpleDiskCache takes chunks of `Bytes` data, identified by a unique key,
/// storing each chunk as a separate file on disk. Keys are stored in a hashmap
/// serialised to a JSON state file on disk.
/// Each chunk stored has a 'time to live' (TTL) stored as a number seconds from
Expand Down Expand Up @@ -262,7 +266,13 @@ impl SimpleDiskCache {
async fn load_state(&self) -> State {
let file = self.dir.join(&self.name).join(SimpleDiskCache::STATE_FILE);
if file.exists() {
serde_json::from_str(fs::read_to_string(file).await.unwrap().as_str()).unwrap()
serde_json::from_str(
fs::read_to_string(file)
.await
.expect("Failed to read cache state file")
.as_str(),
)
.expect("Failed to deserialise cache state")
} else {
State::new(self.prune_interval_seconds)
}
Expand All @@ -277,7 +287,7 @@ impl SimpleDiskCache {
let file = self.dir.join(&self.name).join(SimpleDiskCache::STATE_FILE);
fs::write(file, serde_json::to_string(&data).unwrap())
.await
.unwrap();
.expect("Failed to write cache state file");
}

/// Converts a chunk key into a string that can be used for a filename.
Expand All @@ -294,7 +304,7 @@ impl SimpleDiskCache {
format!("{:?}", md5::compute(key))
}

/// Retrieves chunk `Bytes` from the cache for an unique key.
/// Retrieves chunk `Bytes` from the cache for a unique key.
/// The chunk simply needs to exist on disk to be returned.
/// For performance, metadata, including TTL, isn't checked and it's possible
/// to retrieve an expired chunk within the time window between the chunk expiring
Expand All @@ -320,7 +330,30 @@ impl SimpleDiskCache {
}
}

/// Stores chunk `Bytes` in the cache against an unique key.
/// Retrieves chunk metadata from the cache for a unique key.
/// The metadata simply needs to exist on disk to be returned.
/// This function does not modify the state of the cache and is thread safe.
///
/// # Arguments
///
/// * `key`: Unique key identifying the chunk
async fn get_metadata(&self, key: &str) -> Result<Option<Metadata>, String> {
match fs::read_to_string(
self.dir
.join(&self.name)
.join(self.filename_for_key(key).await + ".meta"),
)
.await
{
Ok(content) => Ok(Some(serde_json::from_str(content.as_str()).unwrap())),
Err(err) => match err.kind() {
std::io::ErrorKind::NotFound => Ok(None),
_ => Err(format!("{}", err)),
},
}
}

/// Stores chunk `Bytes` in the cache against a unique key.
/// The cache is checked and if necessary pruned before storing the chunk.
/// Where a maximum size limit has been set the check will take into account the size
/// of the chunk being stored and ensure sufficient storage space is available.
Expand All @@ -334,18 +367,24 @@ impl SimpleDiskCache {
let size = value.len();
// Run the prune before storing to ensure we have sufficient space
self.prune(/* headroom */ size).await?;
// Write the cache value and then update the metadata
let path = self
.dir
.join(&self.name)
.join(self.filename_for_key(key).await);
if let Err(e) = fs::write(path, value).await {
// Write the cache value to a file
let path = self.dir.join(&self.name);
if let Err(e) = fs::write(path.join(self.filename_for_key(key).await), value).await {
return Err(format!("{:?}", e));
}
// Write the metadata to a separate file
let metadata = Metadata::new(size, self.ttl_seconds);
if let Err(e) = fs::write(
path.join(self.filename_for_key(key).await + ".meta"),
serde_json::to_string(&metadata).unwrap(),
)
.await
{
return Err(format!("{:?}", e));
}
// Update the global state
let mut state = self.load_state().await;
state
.metadata
.insert(key.to_owned(), Metadata::new(size, self.ttl_seconds));
state.metadata.insert(key.to_owned(), metadata);
state.current_size_bytes += size;
self.save_state(state).await;
Ok(())
Expand All @@ -359,11 +398,16 @@ impl SimpleDiskCache {
async fn remove(&self, key: &str) {
let mut state = self.load_state().await;
if let Some(data) = state.metadata.remove(key) {
let path = self
.dir
.join(&self.name)
.join(self.filename_for_key(key).await);
fs::remove_file(path).await.unwrap();
let path = self.dir.join(&self.name);
// Remove the chunk file
fs::remove_file(path.join(self.filename_for_key(key).await))
.await
.expect("Failed to remove chunk file");
// Remove the metadata file
fs::remove_file(path.join(self.filename_for_key(key).await + ".meta"))
.await
.expect("Failed to remove chunk metadata file");
// Update the global state
state.current_size_bytes -= data.size_bytes;
self.save_state(state).await;
}
Expand Down Expand Up @@ -491,6 +535,14 @@ mod tests {
assert_eq!(metadata.len(), 1);
assert_eq!(metadata.get(key_1).unwrap().size_bytes, value_1.len());
assert_eq!(cache_item_1.unwrap(), Some(value_1));
assert_eq!(
cache.get_metadata(key_1).await.unwrap().unwrap().expires,
metadata.get(key_1).unwrap().expires
);
assert_eq!(
cache.get_metadata(key_1).await.unwrap().unwrap().size_bytes,
metadata.get(key_1).unwrap().size_bytes
);

// Act
let key_2 = "item-2";
Expand All @@ -503,6 +555,14 @@ mod tests {
assert_eq!(metadata.len(), 2);
assert_eq!(metadata.get(key_2).unwrap().size_bytes, value_2.len());
assert_eq!(cache_item_2.unwrap(), Some(value_2));
assert_eq!(
cache.get_metadata(key_2).await.unwrap().unwrap().expires,
metadata.get(key_2).unwrap().expires
);
assert_eq!(
cache.get_metadata(key_2).await.unwrap().unwrap().size_bytes,
metadata.get(key_2).unwrap().size_bytes
);

// Act
cache.remove(key_1).await;
Expand All @@ -514,6 +574,7 @@ mod tests {
assert!(!metadata.contains_key(key_1));
assert!(metadata.contains_key(key_2));
assert_eq!(cache_item_1.unwrap(), None);
assert!(cache.get_metadata(key_1).await.unwrap().is_none());
}

#[tokio::test]
Expand Down
7 changes: 7 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ pub struct CommandLineArgs {
/// Defaults to the number of CPUs detected.
#[arg(long, env = "REDUCTIONIST_CHUNK_CACHE_QUEUE_SIZE")]
pub chunk_cache_buffer_size: Option<usize>,
/// Override the default key used for chunk storage.
#[arg(
long,
default_value = "%source-%bucket-%object-%offset-%size",
env = "REDUCTIONIST_CHUNK_CACHE_KEY"
)]
pub chunk_cache_key: String,
/// Whether to bypass the upstream S3 auth checks to improve performance
/// when operating on cached chunks. Auth bypass should only be enabled
/// if the server is running on a private network with sufficient access
Expand Down
Loading