diff --git a/deployment/group_vars/all b/deployment/group_vars/all index 50c185a..3f11c00 100644 --- a/deployment/group_vars/all +++ b/deployment/group_vars/all @@ -31,6 +31,7 @@ reductionist_env: REDUCTIONIST_CHUNK_CACHE_PRUNE_INTERVAL: "3600" REDUCTIONIST_CHUNK_CACHE_SIZE_LIMIT: "10GB" REDUCTIONIST_CHUNK_CACHE_QUEUE_SIZE: "32" + REDUCTIONIST_CHUNK_CACHE_KEY: "%source-%bucket-%object-%offset-%size" REDUCTIONIST_CHUNK_CACHE_BYPASS_AUTH: "false" # Path to certificates directory on remote host. reductionist_remote_certs_path: "{{ ansible_facts.env.HOME }}/certs" diff --git a/docs/deployment.md b/docs/deployment.md index 0a4fe00..d4e09cb 100644 --- a/docs/deployment.md +++ b/docs/deployment.md @@ -245,12 +245,48 @@ The ``reductionist_env`` parameter allows configuration of the environment varia | REDUCTIONIST_CHUNK_CACHE_PRUNE_INTERVAL | Time in seconds between periodic pruning of the cache | | REDUCTIONIST_CHUNK_CACHE_SIZE_LIMIT | Maximum cache size, i.e. "100GB" | | REDUCTIONIST_CHUNK_CACHE_QUEUE_SIZE | Tokio MPSC buffer size used to queue downloaded objects between the asynchronous web engine and the synchronous cache | +| REDUCTIONIST_CHUNK_CACHE_KEY | Overrides the key format used to uniquely identify a cached chunk, see section below | | REDUCTIONIST_CHUNK_CACHE_BYPASS_AUTH | Allow bypassing of S3 authentication when accessing cached data | Note, after changing any of the above parameters the Reductionist must be deployed, or redeployed, using the ansible playbook for the change to take effect. The idempotent nature of ansible necessitates that if redeploying then a running Reductionist container must be removed first. +### Chunk Cache Key + +This defines the name of the key which should uniquely identify a downloaded chunk. +The default value is "%source-%bucket-%object-%offset-%size". All the parameters used here would be used in the API call to download an S3 object and so should uniquely identify an object. +The assumption is made that the object on the S3 data store doesn't change, i.e. replaced using different compression. + +* Use insufficient parameters to uniquely identify a chunk and a request may be served with a cached chunk containing the wrong data +* Use too many parameters, unnecessary ones, and we're missing out on cache hits + +#### Authenticating Cached Chunks + +The original request to download data from S3 will be authenticated. Data cached from this request is likely subject to authentication aswell, to ensure a different Reductionist client can't read private data via the cache. +The Reductionist, by default, authenticates client requests against the S3 object store before serving cached chunks. This could have a performance impact due to the latency of the S3 authentication API call. + +One option is to bypass the cache authentication in the Reductionist configuration, leaving the cache unauthenticated but potentially yielding a performance boost. + +Another option is to incorporate the credentials of the original requestor into the cache key, so only they can retrieve the cached chunk. +The key name, once constructed from parameters, is [MD5](https://en.wikipedia.org/wiki/MD5) encoded so credentials aren't exposed via the chunk cache filesystem. + +#### Chunk Cache Key Tokens Available + +| Token | Description | +| - | - | +| `%source` | Source URL for S3 data store | +| `%bucket` | S3 bucket | +| `%object` | Object key | +| `%offset` | Offset of data byte range | +| `%size` | Size of data byte range | +| `%dtype` | Data type | +| `%byte_order` | Byte order of data | +| `%compression` | Type of compression used on data | +| `%auth` | Client credentials | + +Where request parameters are optional, so may not be present in all requests, their tokens will always be usable with null values constructing the cache key. + ## Usage Once deployed, the Reductionist API is accessible on port 8080 by HAProxy. The Prometheus UI is accessible on port 9090 on the host running Prometheus. The Jaeger UI is accessible on port 16686 on the host running Jaeger. diff --git a/src/app.rs b/src/app.rs index 6edbd5b..fa783c5 100644 --- a/src/app.rs +++ b/src/app.rs @@ -189,6 +189,7 @@ async fn schema() -> &'static str { /// * `client`: S3 client object /// * `request_data`: RequestData object for the request /// * `resource_manager`: ResourceManager object +/// * `mem_permits`: Memory permits for the request #[tracing::instrument(level = "DEBUG", skip(client, request_data, resource_manager))] async fn download_s3_object<'a>( client: &s3_client::S3Client, @@ -221,7 +222,10 @@ async fn download_s3_object<'a>( /// * `client`: S3 client object /// * `request_data`: RequestData object for the request /// * `resource_manager`: ResourceManager object +/// * `mem_permits`: Memory permits for the request /// * `chunk_cache`: ChunkCache object +/// * `chunk_cache_key`: Key template used for naming cache file entries +/// * `allow_cache_auth_bypass`: Whether to allow bypassing S3 auth checks #[tracing::instrument( level = "DEBUG", skip(client, request_data, resource_manager, mem_permits, chunk_cache) @@ -232,22 +236,28 @@ async fn download_and_cache_s3_object<'a>( resource_manager: &'a ResourceManager, mut mem_permits: Option>, chunk_cache: &ChunkCache, + chunk_cache_key: &str, allow_cache_auth_bypass: bool, ) -> Result { - // We chose a cache key such that any changes to request data - // which may feasibly indicate a change to the upstream object - // lead to a new cache key. - let key = format!( - "{}-{}-{}-{}-{:?}-{:?}", - request_data.source.as_str(), - request_data.bucket, - request_data.object, - request_data.dtype, - request_data.byte_order, - request_data.compression, - ); - - if let Some(metadata) = chunk_cache.get_metadata(&key).await { + // The default chunk key is "%source-%bucket-%object-%offset-%size" + // which is using the same parameters provided to an S3 object download. + // It assumes the data of the underlying object store remains unchanged. + let key = chunk_cache_key.to_string(); + let key = key + .replace("%source", request_data.source.as_str()) + .replace("%bucket", &request_data.bucket) + .replace("%object", &request_data.object) + .replace("%offset", &format!("{:?}", request_data.offset)) + .replace("%size", &format!("{:?}", request_data.size)) + .replace("%dtype", &format!("{}", request_data.dtype)) + .replace("%byte_order", &format!("{:?}", request_data.byte_order)) + .replace("%compression", &format!("{:?}", request_data.compression)) + .replace("%auth", &format!("{}", client)); + if key.find('%').is_some() { + panic!("Invalid cache key: {}", key); + } + + if let Some(metadata) = chunk_cache.get_metadata(&key).await? { if !allow_cache_auth_bypass { // To avoid having to include the S3 client ID as part of the cache key // (which means we'd have a separate cache for each authorised user and @@ -292,7 +302,7 @@ async fn download_and_cache_s3_object<'a>( let data = download_s3_object(client, request_data, resource_manager, mem_permits).await?; // Write data to cache - chunk_cache.set(&key, data.clone()).await?; + chunk_cache.set(&key, &data).await?; // Increment the prometheus metric for cache misses LOCAL_CACHE_MISSES.with_label_values(&["disk"]).inc(); @@ -352,6 +362,7 @@ async fn operation_handler( &state.resource_manager, _mem_permits, cache, + state.args.chunk_cache_key.as_str(), state.args.chunk_cache_bypass_auth ).await? } diff --git a/src/chunk_cache.rs b/src/chunk_cache.rs index 6c9710a..8c70a83 100644 --- a/src/chunk_cache.rs +++ b/src/chunk_cache.rs @@ -22,11 +22,11 @@ struct ChunkCacheEntry { impl ChunkCacheEntry { /// Return a ChunkCacheEntry object - fn new(key: &str, value: Bytes) -> Self { + fn new(key: &str, value: &Bytes) -> Self { let key = key.to_owned(); // Make sure we own the `Bytes` so we don't see unexpected, but not incorrect, // behaviour caused by the zero copy of `Bytes`. i.e. let us choose when to copy. - let value = Bytes::copy_from_slice(&value); + let value = Bytes::copy_from_slice(value); Self { key, value } } } @@ -97,13 +97,13 @@ impl ChunkCache { Self { cache, sender } } - /// Stores chunk `Bytes` in the cache against an unique key. + /// Stores chunk `Bytes` in the cache against a unique key. /// /// # Arguments /// /// * `key`: Unique key identifying the chunk /// * `value`: Chunk `Bytes` to be cached - pub async fn set(&self, key: &str, value: Bytes) -> Result<(), ActiveStorageError> { + pub async fn set(&self, key: &str, value: &Bytes) -> Result<(), ActiveStorageError> { match self.sender.send(ChunkCacheEntry::new(key, value)).await { Ok(_) => Ok(()), Err(e) => Err(ActiveStorageError::ChunkCacheError { @@ -112,17 +112,21 @@ impl ChunkCache { } } - /// Retrieves chunk metadata from the cache for an unique key. + /// Retrieves chunk metadata from the cache for a unique key. /// /// # Arguments /// /// * `key`: Unique key identifying the chunk - pub async fn get_metadata(&self, key: &str) -> Option { - let state = self.cache.load_state().await; - state.metadata.get(key).cloned() + pub async fn get_metadata(&self, key: &str) -> Result, ActiveStorageError> { + match self.cache.get_metadata(key).await { + Ok(value) => Ok(value), + Err(e) => Err(ActiveStorageError::ChunkCacheError { + error: format!("{:?}", e), + }), + } } - /// Retrieves chunk `Bytes` from the cache for an unique key. + /// Retrieves chunk `Bytes` from the cache for a unique key. /// /// # Arguments /// @@ -195,7 +199,7 @@ impl State { } } -/// The SimpleDiskCache takes chunks of `Bytes` data, identified by an unique key, +/// The SimpleDiskCache takes chunks of `Bytes` data, identified by a unique key, /// storing each chunk as a separate file on disk. Keys are stored in a hashmap /// serialised to a JSON state file on disk. /// Each chunk stored has a 'time to live' (TTL) stored as a number seconds from @@ -262,7 +266,13 @@ impl SimpleDiskCache { async fn load_state(&self) -> State { let file = self.dir.join(&self.name).join(SimpleDiskCache::STATE_FILE); if file.exists() { - serde_json::from_str(fs::read_to_string(file).await.unwrap().as_str()).unwrap() + serde_json::from_str( + fs::read_to_string(file) + .await + .expect("Failed to read cache state file") + .as_str(), + ) + .expect("Failed to deserialise cache state") } else { State::new(self.prune_interval_seconds) } @@ -277,7 +287,7 @@ impl SimpleDiskCache { let file = self.dir.join(&self.name).join(SimpleDiskCache::STATE_FILE); fs::write(file, serde_json::to_string(&data).unwrap()) .await - .unwrap(); + .expect("Failed to write cache state file"); } /// Converts a chunk key into a string that can be used for a filename. @@ -294,7 +304,7 @@ impl SimpleDiskCache { format!("{:?}", md5::compute(key)) } - /// Retrieves chunk `Bytes` from the cache for an unique key. + /// Retrieves chunk `Bytes` from the cache for a unique key. /// The chunk simply needs to exist on disk to be returned. /// For performance, metadata, including TTL, isn't checked and it's possible /// to retrieve an expired chunk within the time window between the chunk expiring @@ -320,7 +330,30 @@ impl SimpleDiskCache { } } - /// Stores chunk `Bytes` in the cache against an unique key. + /// Retrieves chunk metadata from the cache for a unique key. + /// The metadata simply needs to exist on disk to be returned. + /// This function does not modify the state of the cache and is thread safe. + /// + /// # Arguments + /// + /// * `key`: Unique key identifying the chunk + async fn get_metadata(&self, key: &str) -> Result, String> { + match fs::read_to_string( + self.dir + .join(&self.name) + .join(self.filename_for_key(key).await + ".meta"), + ) + .await + { + Ok(content) => Ok(Some(serde_json::from_str(content.as_str()).unwrap())), + Err(err) => match err.kind() { + std::io::ErrorKind::NotFound => Ok(None), + _ => Err(format!("{}", err)), + }, + } + } + + /// Stores chunk `Bytes` in the cache against a unique key. /// The cache is checked and if necessary pruned before storing the chunk. /// Where a maximum size limit has been set the check will take into account the size /// of the chunk being stored and ensure sufficient storage space is available. @@ -334,18 +367,24 @@ impl SimpleDiskCache { let size = value.len(); // Run the prune before storing to ensure we have sufficient space self.prune(/* headroom */ size).await?; - // Write the cache value and then update the metadata - let path = self - .dir - .join(&self.name) - .join(self.filename_for_key(key).await); - if let Err(e) = fs::write(path, value).await { + // Write the cache value to a file + let path = self.dir.join(&self.name); + if let Err(e) = fs::write(path.join(self.filename_for_key(key).await), value).await { + return Err(format!("{:?}", e)); + } + // Write the metadata to a separate file + let metadata = Metadata::new(size, self.ttl_seconds); + if let Err(e) = fs::write( + path.join(self.filename_for_key(key).await + ".meta"), + serde_json::to_string(&metadata).unwrap(), + ) + .await + { return Err(format!("{:?}", e)); } + // Update the global state let mut state = self.load_state().await; - state - .metadata - .insert(key.to_owned(), Metadata::new(size, self.ttl_seconds)); + state.metadata.insert(key.to_owned(), metadata); state.current_size_bytes += size; self.save_state(state).await; Ok(()) @@ -359,11 +398,16 @@ impl SimpleDiskCache { async fn remove(&self, key: &str) { let mut state = self.load_state().await; if let Some(data) = state.metadata.remove(key) { - let path = self - .dir - .join(&self.name) - .join(self.filename_for_key(key).await); - fs::remove_file(path).await.unwrap(); + let path = self.dir.join(&self.name); + // Remove the chunk file + fs::remove_file(path.join(self.filename_for_key(key).await)) + .await + .expect("Failed to remove chunk file"); + // Remove the metadata file + fs::remove_file(path.join(self.filename_for_key(key).await + ".meta")) + .await + .expect("Failed to remove chunk metadata file"); + // Update the global state state.current_size_bytes -= data.size_bytes; self.save_state(state).await; } @@ -491,6 +535,14 @@ mod tests { assert_eq!(metadata.len(), 1); assert_eq!(metadata.get(key_1).unwrap().size_bytes, value_1.len()); assert_eq!(cache_item_1.unwrap(), Some(value_1)); + assert_eq!( + cache.get_metadata(key_1).await.unwrap().unwrap().expires, + metadata.get(key_1).unwrap().expires + ); + assert_eq!( + cache.get_metadata(key_1).await.unwrap().unwrap().size_bytes, + metadata.get(key_1).unwrap().size_bytes + ); // Act let key_2 = "item-2"; @@ -503,6 +555,14 @@ mod tests { assert_eq!(metadata.len(), 2); assert_eq!(metadata.get(key_2).unwrap().size_bytes, value_2.len()); assert_eq!(cache_item_2.unwrap(), Some(value_2)); + assert_eq!( + cache.get_metadata(key_2).await.unwrap().unwrap().expires, + metadata.get(key_2).unwrap().expires + ); + assert_eq!( + cache.get_metadata(key_2).await.unwrap().unwrap().size_bytes, + metadata.get(key_2).unwrap().size_bytes + ); // Act cache.remove(key_1).await; @@ -514,6 +574,7 @@ mod tests { assert!(!metadata.contains_key(key_1)); assert!(metadata.contains_key(key_2)); assert_eq!(cache_item_1.unwrap(), None); + assert!(cache.get_metadata(key_1).await.unwrap().is_none()); } #[tokio::test] diff --git a/src/cli.rs b/src/cli.rs index dea982a..9abfb05 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -77,6 +77,13 @@ pub struct CommandLineArgs { /// Defaults to the number of CPUs detected. #[arg(long, env = "REDUCTIONIST_CHUNK_CACHE_QUEUE_SIZE")] pub chunk_cache_buffer_size: Option, + /// Override the default key used for chunk storage. + #[arg( + long, + default_value = "%source-%bucket-%object-%offset-%size", + env = "REDUCTIONIST_CHUNK_CACHE_KEY" + )] + pub chunk_cache_key: String, /// Whether to bypass the upstream S3 auth checks to improve performance /// when operating on cached chunks. Auth bypass should only be enabled /// if the server is running on a private network with sufficient access