diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml index 4e845e5ca2d0..4d5c7a1085b8 100644 --- a/object_store/Cargo.toml +++ b/object_store/Cargo.toml @@ -67,6 +67,7 @@ aws = ["cloud", "md-5"] http = ["cloud"] tls-webpki-roots = ["reqwest?/rustls-tls-webpki-roots"] integration = [] +experimental-azure-list-offset = [] [dev-dependencies] # In alphabetical order futures-test = "0.3" diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index b5e82c2a8585..8a6550a0ee48 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -559,6 +559,61 @@ impl GetClient for AzureClient { } } +#[cfg(feature = "experimental-azure-list-offset")] +fn marker_for_offset(offset: &str, is_emulator: bool) -> String { + if is_emulator { + offset.to_string() + } else { + // Here we reconstruct an Azure marker (continuation token) from a key to be able to seek + // into an arbitrary position in the key space. + // The current format (July 2024) for the marker is as follows: + // + // +-> current token version + // | + // | +-> unpadded length of base64 encoded field + // | | + // | | +-> base64 encoded field with characters (/,+,=) repaced with (_,*,-) + // | | | + // 2!72!MDAwMDA4IWZpbGUudHh0ITAwMDAyOCE5OTk5LTEyLTMxVDIzOjU5OjU5Ljk5OTk5OTlaIQ-- + // | | ^ + // terminators | + // | + // +------------+ + // Decoding the |base64 field| gives: + // +------------+ + // + // +-> length of key field padded to 6 digits + // | + // | +-> key to start listing at + // | | + // | | +-> length of timestamp field padded to 6 digits + // | | | + // | | | +-> constant max timestamp field + // | | | | + // 000008!file.txt!000028!9999-12-31T23:59:59.9999999Z! + // | | | | + // +----> field terminators <-------------------+ + // + // When recostructing we add a space character (ASCII 0x20) to the end of the key to change the + // `start_at` behavior into a `start_after` behavior as the space character is the first valid character + // in the lexicographical order. + // + // It appears that hadoop relies on this, code here: + // https://github.com/apache/hadoop/blob/059e996c02d64716707d8dfb905dc84bab317aef/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java#L1358 + + let encoded_part = BASE64_STANDARD + .encode(&format!( + "{:06}!{} !000028!9999-12-31T23:59:59.9999999Z!", + offset.len() + 1, + offset, + )) + .replace("/", "_") + .replace("+", "*") + .replace("=", "-"); + format!("2!{}!{}", encoded_part.len(), encoded_part) + } +} + #[async_trait] impl ListClient for AzureClient { /// Make an Azure List request @@ -569,6 +624,7 @@ impl ListClient for AzureClient { token: Option<&str>, offset: Option<&str>, ) -> Result<(ListResult, Option)> { + #[cfg(not(feature = "experimental-azure-list-offset"))] assert!(offset.is_none()); // Not yet supported let credential = self.get_credential().await?; @@ -586,6 +642,16 @@ impl ListClient for AzureClient { query.push(("delimiter", DELIMITER)) } + #[cfg(feature = "experimental-azure-list-offset")] + let token_string = match (token, offset) { + (Some(token), _) => Some(token.to_string()), + (None, Some(offset)) => Some(marker_for_offset(offset, self.config.is_emulator)), + (None, None) => None, + }; + + #[cfg(feature = "experimental-azure-list-offset")] + let token = token_string.as_deref(); + if let Some(token) = token { query.push(("marker", token)) } @@ -967,4 +1033,19 @@ mod tests { let _delegated_key_response_internal: UserDelegationKey = quick_xml::de::from_str(S).unwrap(); } + + #[cfg(feature = "experimental-azure-list-offset")] + #[test] + fn test_marker_for_offset() { + // BlobStorage + let marker = marker_for_offset("file.txt", false); + assert_eq!( + marker, + "2!72!MDAwMDA5IWZpbGUudHh0ICEwMDAwMjghOTk5OS0xMi0zMVQyMzo1OTo1OS45OTk5OTk5WiE-" + ); + + // Azurite + let marker = marker_for_offset("file.txt", true); + assert_eq!(marker, "file.txt"); + } } diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index f89a184f9523..74b86cea2842 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -123,6 +123,15 @@ impl ObjectStore for MicrosoftAzure { self.client.list(prefix) } + #[cfg(feature = "experimental-azure-list-offset")] + fn list_with_offset( + &self, + prefix: Option<&Path>, + offset: &Path, + ) -> BoxStream<'_, Result> { + self.client.list_with_offset(prefix, offset) + } + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { self.client.list_with_delimiter(prefix).await }