diff --git a/src/local.rs b/src/local.rs index 5b46a7d8..3c8da406 100644 --- a/src/local.rs +++ b/src/local.rs @@ -699,14 +699,57 @@ impl LocalFileSystem { prefix: Option<&Path>, maybe_offset: Option<&Path>, ) -> BoxStream<'static, Result> { - let root_path = match prefix { - Some(prefix) => match config.prefix_to_filesystem(prefix) { - Ok(path) => path, - Err(e) => return futures::future::ready(Err(e)).into_stream().boxed(), - }, - None => config.root.to_file_path().unwrap(), + // To match cloud storage behavior and Path::prefix_matches semantics, + // we need to walk from the parent directory and filter results. + // However, if the prefix points to an exact file (not a directory), + // we should return empty (cloud storage doesn't list individual files). + let (root_path, filter_prefix) = match prefix { + Some(prefix) => { + // First try to convert the prefix to a filesystem path + let prefix_fs_path = match config.prefix_to_filesystem(prefix) { + Ok(path) => path, + Err(e) => return futures::future::ready(Err(e)).into_stream().boxed(), + }; + + // Check if the prefix points to an existing file + // If it does, use the original behavior (walk from that path with min_depth(1), + // which returns empty for files) + if prefix_fs_path.is_file() { + return Self::walk_directory(config, prefix_fs_path, maybe_offset, None); + } + + // Otherwise, walk from parent directory and filter + let parts: Vec<_> = prefix.parts().collect(); + let parent = if parts.is_empty() || parts.len() == 1 { + // Empty or single component - walk from root + Path::from("") + } else { + // Multi-component - walk from parent directory + let parent_parts: Vec = parts[..parts.len() - 1] + .iter() + .map(|p| p.as_ref().to_string()) + .collect(); + Path::from_iter(parent_parts) + }; + + let root_path = match config.prefix_to_filesystem(&parent) { + Ok(path) => path, + Err(e) => return futures::future::ready(Err(e)).into_stream().boxed(), + }; + (root_path, Some(prefix.clone())) + } + None => (config.root.to_file_path().unwrap(), None), }; + Self::walk_directory(config, root_path, maybe_offset, filter_prefix) + } + + fn walk_directory( + config: Arc, + root_path: PathBuf, + maybe_offset: Option<&Path>, + filter_prefix: Option, + ) -> BoxStream<'static, Result> { let walkdir = WalkDir::new(root_path) // Don't include the root directory itself .min_depth(1) @@ -736,10 +779,19 @@ impl LocalFileSystem { } match config.filesystem_to_path(entry.path()) { - Ok(path) => match is_valid_file_path(&path) { - true => convert_entry(entry, path).transpose(), - false => None, - }, + Ok(path) => { + // Apply prefix filter using Path's built-in prefix_matches method + if let Some(ref prefix) = filter_prefix { + if !path.prefix_matches(prefix) { + return None; + } + } + + match is_valid_file_path(&path) { + true => convert_entry(entry, path).transpose(), + false => None, + } + } Err(e) => Some(Err(e)), } }); @@ -1158,7 +1210,7 @@ mod tests { #[cfg(target_family = "unix")] use tempfile::NamedTempFile; - use crate::{ObjectStoreExt, integration::*}; + use crate::{ObjectStore, ObjectStoreExt, integration::*}; use super::*; @@ -1744,16 +1796,327 @@ mod tests { integration.delete(&location).await.unwrap(); assert!(fs::read_dir(root.path()).unwrap().count() == 0); } + + #[tokio::test] + async fn test_prefix_filtering_regression() { + // Regression test for prefix filtering bug where LocalFileSystem + // treated prefix as a directory path instead of using Path::prefix_matches + let root = TempDir::new().unwrap(); + let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap(); + + // Create test files with similar prefixes + let files = vec![ + "logs/2024/jan.txt", + "logs/2024/feb.txt", + "logs/archive/old.txt", + "logs_archive/file.txt", // Should NOT match "logs/" prefix + "data/logs/info.txt", // Should NOT match "logs" prefix + ]; + + for file in &files { + let path = Path::parse(file).unwrap(); + integration.put(&path, "test".into()).await.unwrap(); + } + + // Test 1: Prefix "logs" should match "logs/*" but NOT "logs_archive/*" + let prefix = Path::parse("logs").unwrap(); + let mut list = flatten_list_stream(&integration, Some(&prefix)) + .await + .unwrap(); + list.sort(); + + let expected = vec![ + Path::parse("logs/2024/feb.txt").unwrap(), + Path::parse("logs/2024/jan.txt").unwrap(), + Path::parse("logs/archive/old.txt").unwrap(), + ]; + assert_eq!( + list, expected, + "Prefix 'logs' should only match paths starting with 'logs/', not 'logs_archive/'" + ); + + // Test 2: Prefix "logs/2024" should match only files under logs/2024/ + let prefix = Path::parse("logs/2024").unwrap(); + let mut list = flatten_list_stream(&integration, Some(&prefix)) + .await + .unwrap(); + list.sort(); + + let expected = vec![ + Path::parse("logs/2024/feb.txt").unwrap(), + Path::parse("logs/2024/jan.txt").unwrap(), + ]; + assert_eq!( + list, expected, + "Prefix 'logs/2024' should only match files under logs/2024/" + ); + + // Test 3: Prefix "logs_archive" should only match files under logs_archive/ + let prefix = Path::parse("logs_archive").unwrap(); + let list = flatten_list_stream(&integration, Some(&prefix)) + .await + .unwrap(); + + let expected = vec![Path::parse("logs_archive/file.txt").unwrap()]; + assert_eq!( + list, expected, + "Prefix 'logs_archive' should only match its own directory" + ); + + // Test 4: Non-existent prefix should return empty + let prefix = Path::parse("nonexistent/prefix").unwrap(); + let list = flatten_list_stream(&integration, Some(&prefix)) + .await + .unwrap(); + assert_eq!(list, vec![], "Non-existent prefix should return empty list"); + } + + #[tokio::test] + async fn test_prefix_filtering_partial_names() { + // Test that partial name matches work correctly with prefix_matches semantics + // Path::prefix_matches is segment-based, so "test" only matches segment "test" + let root = TempDir::new().unwrap(); + let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap(); + + let files = vec![ + "test/file1.txt", + "test/file2.txt", + "test_data/file3.txt", + "testing/file4.txt", + ]; + + for file in &files { + let path = Path::parse(file).unwrap(); + integration.put(&path, "test".into()).await.unwrap(); + } + + // Prefix "test" should only match "test/*" (exact segment match) + // It should NOT match "test_data" or "testing" (different segments) + let prefix = Path::parse("test").unwrap(); + let mut list = flatten_list_stream(&integration, Some(&prefix)) + .await + .unwrap(); + list.sort(); + + let expected = vec![ + Path::parse("test/file1.txt").unwrap(), + Path::parse("test/file2.txt").unwrap(), + ]; + assert_eq!( + list, expected, + "Prefix 'test' should only match segment 'test/', not 'test_data' or 'testing'" + ); + + // Prefix "test_data" should only match "test_data/*" + let prefix = Path::parse("test_data").unwrap(); + let list = flatten_list_stream(&integration, Some(&prefix)) + .await + .unwrap(); + + let expected = vec![Path::parse("test_data/file3.txt").unwrap()]; + assert_eq!( + list, expected, + "Prefix 'test_data' should only match segment 'test_data/'" + ); + } + + #[tokio::test] + async fn test_prefix_with_file_path() { + // Test that listing with a prefix that points to an exact file returns empty + let root = TempDir::new().unwrap(); + let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap(); + + let file_path = Path::parse("dir/file.txt").unwrap(); + integration.put(&file_path, "content".into()).await.unwrap(); + + // Listing with exact file path as prefix should return empty + let prefix = Path::parse("dir/file.txt").unwrap(); + let list = flatten_list_stream(&integration, Some(&prefix)) + .await + .unwrap(); + assert_eq!( + list, + vec![], + "Listing with exact file path as prefix should return empty" + ); + } + + #[tokio::test] + async fn test_list_with_delimiter_prefix_filtering() { + // Test that list_with_delimiter also respects prefix filtering + let root = TempDir::new().unwrap(); + let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap(); + + let files = vec![ + "logs/2024/jan/day1.txt", + "logs/2024/jan/day2.txt", + "logs/2024/feb/day1.txt", + "logs_archive/old.txt", + ]; + + for file in &files { + let path = Path::parse(file).unwrap(); + integration.put(&path, "test".into()).await.unwrap(); + } + + // Test with prefix "logs/2024" + let prefix = Path::parse("logs/2024").unwrap(); + let result = integration + .list_with_delimiter(Some(&prefix)) + .await + .unwrap(); + + // Should only see common_prefixes and objects under logs/2024/ + let common_prefixes: Vec<_> = result.common_prefixes.iter().map(|p| p.as_ref()).collect(); + + // We should see logs/2024/jan/ and logs/2024/feb/ as common prefixes + assert!( + common_prefixes.contains(&"logs/2024/jan"), + "Should contain logs/2024/jan prefix, got: {:?}", + common_prefixes + ); + assert!( + common_prefixes.contains(&"logs/2024/feb"), + "Should contain logs/2024/feb prefix, got: {:?}", + common_prefixes + ); + + // Should NOT see anything from logs_archive/ + for obj in &result.objects { + assert!( + obj.location.as_ref().starts_with("logs/2024/"), + "All objects should be under logs/2024/, found: {}", + obj.location + ); + } + } + + #[tokio::test] + async fn test_prefix_parent_walking() { + // Regression test: Ensure prefix filtering works by walking parent directory + // This is the core fix - old code would treat prefix as exact directory path + let root = TempDir::new().unwrap(); + let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap(); + + // Create files in nested structure + let files = vec![ + "data/subset_a/file1.txt", + "data/subset_a/file2.txt", + "data/subset_b/file3.txt", + ]; + + for file in &files { + let path = Path::parse(file).unwrap(); + integration.put(&path, "test".into()).await.unwrap(); + } + + // List with prefix "data/subset_a" + // Old bug: Would only walk "data/subset_a/" directory + // New fix: Walks "data/" and filters by prefix_matches("data/subset_a") + let prefix = Path::parse("data/subset_a").unwrap(); + let mut list = flatten_list_stream(&integration, Some(&prefix)) + .await + .unwrap(); + list.sort(); + + let expected = vec![ + Path::parse("data/subset_a/file1.txt").unwrap(), + Path::parse("data/subset_a/file2.txt").unwrap(), + ]; + assert_eq!( + list, expected, + "Should correctly filter by prefix using parent directory walking" + ); + + // Verify it doesn't match subset_b + assert!( + !list.iter().any(|p| p.as_ref().contains("subset_b")), + "Should not include paths from subset_b" + ); + } + + #[tokio::test] + async fn test_prefix_filtering_empty_parent() { + // Test prefix filtering when parent would be empty (root level) + // Path::prefix_matches requires exact segment matches + let root = TempDir::new().unwrap(); + let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap(); + + let files = vec!["myfile/data.txt", "myfile2/data.txt", "data/file3.txt"]; + + for file in &files { + let path = Path::parse(file).unwrap(); + integration.put(&path, "test".into()).await.unwrap(); + } + + // Prefix "myfile" at root level - should only match "myfile/*" + let prefix = Path::parse("myfile").unwrap(); + let list = flatten_list_stream(&integration, Some(&prefix)) + .await + .unwrap(); + + let expected = vec![Path::parse("myfile/data.txt").unwrap()]; + assert_eq!( + list, expected, + "Root-level prefix should only match exact segment 'myfile/', not 'myfile2/'" + ); + } + + #[tokio::test] + async fn test_prefix_filtering_with_offset() { + // Test that prefix filtering works correctly with list_with_offset + let root = TempDir::new().unwrap(); + let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap(); + + let files = vec![ + "logs/file1.txt", + "logs/file2.txt", + "logs/file3.txt", + "logs_archive/file4.txt", + ]; + + for file in &files { + let path = Path::parse(file).unwrap(); + integration.put(&path, "test".into()).await.unwrap(); + } + + let prefix = Path::parse("logs").unwrap(); + let offset = Path::parse("logs/file1.txt").unwrap(); + + let mut list: Vec<_> = integration + .list_with_offset(Some(&prefix), &offset) + .map_ok(|meta| meta.location) + .try_collect() + .await + .unwrap(); + + // Sort to ensure consistent ordering + list.sort(); + + // Should only see logs/file2.txt and logs/file3.txt (after offset) + // Should NOT see logs_archive/file4.txt + assert_eq!(list.len(), 2, "Should have 2 files after offset"); + + let expected = vec![ + Path::parse("logs/file2.txt").unwrap(), + Path::parse("logs/file3.txt").unwrap(), + ]; + assert_eq!( + list, expected, + "Should only get files under logs/ prefix after offset" + ); + } } #[cfg(not(target_arch = "wasm32"))] #[cfg(test)] mod not_wasm_tests { + use futures::TryStreamExt; use std::time::Duration; use tempfile::TempDir; use crate::local::LocalFileSystem; - use crate::{ObjectStoreExt, Path, PutPayload}; + use crate::{ObjectStore, ObjectStoreExt, Path, PutPayload}; #[tokio::test] async fn test_cleanup_intermediate_files() {