diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 45754467..2978ad86 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -53,7 +53,6 @@ jobs: strategy: fail-fast: false matrix: - # TODO: add windows which does not support container os: [ ubuntu-24.04 ] runs-on: ${{ matrix.os }} container: @@ -77,6 +76,13 @@ jobs: path: ./cov-reports if-no-files-found: 'error' + rust-tests-windows: + runs-on: windows-2025 + steps: + - uses: actions/checkout@v5 + - name: Run unit tests on Windows with no coverage report + run: cargo test --no-fail-fast --all-targets --all-features --workspace + python-tests: strategy: fail-fast: false diff --git a/crates/core/src/file_group/file_slice.rs b/crates/core/src/file_group/file_slice.rs index 7218188d..899f2bb4 100644 --- a/crates/core/src/file_group/file_slice.rs +++ b/crates/core/src/file_group/file_slice.rs @@ -19,11 +19,11 @@ use crate::error::CoreError; use crate::file_group::base_file::BaseFile; use crate::file_group::log_file::LogFile; +use crate::storage::util::join_path_segments; use crate::storage::Storage; use crate::Result; use std::collections::BTreeSet; use std::fmt::Display; -use std::path::PathBuf; /// Within a [crate::file_group::FileGroup], /// a [FileSlice] is a logical group of [BaseFile] and [LogFile]s. @@ -78,10 +78,10 @@ impl FileSlice { } fn relative_path_for_file(&self, file_name: &str) -> Result { - let path = PathBuf::from(self.partition_path.as_str()).join(file_name); - path.to_str().map(|s| s.to_string()).ok_or_else(|| { - CoreError::FileGroup(format!("Failed to get relative path for file: {file_name}",)) - }) + Ok(join_path_segments(&[ + self.partition_path.as_str(), + file_name, + ])?) } /// Returns the relative path of the [BaseFile] in the [FileSlice]. diff --git a/crates/core/src/file_group/log_file/reader.rs b/crates/core/src/file_group/log_file/reader.rs index 424994aa..425992bb 100644 --- a/crates/core/src/file_group/log_file/reader.rs +++ b/crates/core/src/file_group/log_file/reader.rs @@ -289,36 +289,47 @@ mod tests { use crate::storage::util::parse_uri; use apache_avro::schema::Schema as AvroSchema; use std::fs::canonicalize; - use std::path::PathBuf; fn get_valid_log_avro_data() -> (String, String) { - let dir = PathBuf::from("tests/data/log_files/valid_log_avro_data"); ( - canonicalize(dir).unwrap().to_str().unwrap().to_string(), + canonicalize("tests/data/log_files/valid_log_avro_data") + .unwrap() + .to_str() + .unwrap() + .to_string(), ".ff32ab89-5ad0-4968-83b4-89a34c95d32f-0_20250316025816068.log.1_0-54-122".to_string(), ) } fn get_valid_log_parquet_data() -> (String, String) { - let dir = PathBuf::from("tests/data/log_files/valid_log_parquet_data"); ( - canonicalize(dir).unwrap().to_str().unwrap().to_string(), + canonicalize("tests/data/log_files/valid_log_parquet_data") + .unwrap() + .to_str() + .unwrap() + .to_string(), ".ee2ace10-7667-40f5-9848-0a144b5ea064-0_20250113230302428.log.1_0-188-387".to_string(), ) } fn get_valid_log_delete() -> (String, String) { - let dir = PathBuf::from("tests/data/log_files/valid_log_delete"); ( - canonicalize(dir).unwrap().to_str().unwrap().to_string(), + canonicalize("tests/data/log_files/valid_log_delete") + .unwrap() + .to_str() + .unwrap() + .to_string(), ".6d3d1d6e-2298-4080-a0c1-494877d6f40a-0_20250618054711154.log.1_0-26-85".to_string(), ) } fn get_valid_log_rollback() -> (String, String) { - let dir = PathBuf::from("tests/data/log_files/valid_log_rollback"); ( - canonicalize(dir).unwrap().to_str().unwrap().to_string(), + canonicalize("tests/data/log_files/valid_log_rollback") + .unwrap() + .to_str() + .unwrap() + .to_string(), ".0712b9f9-d2d5-4cae-bcf4-8fd7146af503-0_20250126040823628.log.2_1-0-1".to_string(), ) } diff --git a/crates/core/src/file_group/reader.rs b/crates/core/src/file_group/reader.rs index 53abbfb8..3b82fc57 100644 --- a/crates/core/src/file_group/reader.rs +++ b/crates/core/src/file_group/reader.rs @@ -275,51 +275,34 @@ mod tests { use arrow::record_batch::RecordBatch; use arrow_schema::{DataType, Field, Schema}; use std::fs::canonicalize; - use std::path::PathBuf; use std::sync::Arc; - use url::Url; fn get_non_existent_base_uri() -> String { "file:///non-existent-path/table".to_string() } fn get_base_uri_with_valid_props() -> String { - let url = Url::from_file_path( - canonicalize( - PathBuf::from("tests") - .join("data") - .join("table_props_valid"), - ) - .unwrap(), - ) - .unwrap(); - url.as_ref().to_string() + canonicalize("tests/data/table_props_valid") + .unwrap() + .to_str() + .unwrap() + .to_string() } fn get_base_uri_with_valid_props_minimum() -> String { - let url = Url::from_file_path( - canonicalize( - PathBuf::from("tests") - .join("data") - .join("table_props_valid_minimum"), - ) - .unwrap(), - ) - .unwrap(); - url.as_ref().to_string() + canonicalize("tests/data/table_props_valid_minimum") + .unwrap() + .to_str() + .unwrap() + .to_string() } fn get_base_uri_with_invalid_props() -> String { - let url = Url::from_file_path( - canonicalize( - PathBuf::from("tests") - .join("data") - .join("table_props_invalid"), - ) - .unwrap(), - ) - .unwrap(); - url.as_ref().to_string() + canonicalize("tests/data/table_props_invalid") + .unwrap() + .to_str() + .unwrap() + .to_string() } #[test] diff --git a/crates/core/src/schema/resolver.rs b/crates/core/src/schema/resolver.rs index 803f8c6c..dcf3e50d 100644 --- a/crates/core/src/schema/resolver.rs +++ b/crates/core/src/schema/resolver.rs @@ -20,13 +20,12 @@ use crate::avro_to_arrow::to_arrow_schema; use crate::config::table::HudiTableConfig; use crate::error::{CoreError, Result}; use crate::schema::prepend_meta_fields; +use crate::storage::util::join_path_segments; use crate::storage::Storage; use crate::table::Table; use apache_avro::schema::Schema as AvroSchema; use arrow_schema::{Schema, SchemaRef}; use serde_json::{Map, Value}; -use std::path::PathBuf; -use std::str::FromStr; use std::sync::Arc; /// Resolves the [`arrow_schema::Schema`] for a given Hudi table. @@ -154,17 +153,8 @@ async fn resolve_schema_from_base_file( "Failed to resolve the latest schema: no file path found".to_string(), ) })?; - let parquet_file_path_buf = PathBuf::from_str(partition_path) - .map_err(|e| { - CoreError::CommitMetadata(format!("Failed to resolve the latest schema: {}", e)) - })? - .join(base_file); - let path = parquet_file_path_buf.to_str().ok_or_else(|| { - CoreError::CommitMetadata( - "Failed to resolve the latest schema: invalid file path".to_string(), - ) - })?; - Ok(storage.get_parquet_file_schema(path).await?) + let parquet_file_path = join_path_segments(&[partition_path, base_file])?; + Ok(storage.get_parquet_file_schema(&parquet_file_path).await?) } None => Err(CoreError::CommitMetadata( "Failed to resolve the latest schema: no file path found".to_string(), diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs index deb5963f..6e1308f9 100644 --- a/crates/core/src/storage/mod.rs +++ b/crates/core/src/storage/mod.rs @@ -173,8 +173,8 @@ impl Storage { Ok(bytes) } - pub async fn get_file_data_from_absolute_path(&self, absolute_path: &str) -> Result { - let obj_path = ObjPath::from_absolute_path(PathBuf::from(absolute_path))?; + pub async fn get_file_data_from_url_path(&self, path: impl AsRef) -> Result { + let obj_path = ObjPath::from_url_path(path)?; let result = self.object_store.get(&obj_path).await?; let bytes = result.bytes().await?; Ok(bytes) @@ -283,10 +283,12 @@ pub async fn get_leaf_dirs(storage: &Storage, subdir: Option<&str>) -> Result Result { - let mut url = match Url::parse(uri) { - Ok(url) => url, - Err(e) => Url::from_directory_path(uri).map_err(|_| UrlParseError(e))?, - }; - - if url.path().ends_with('/') { - let err = InvalidPath(format!("Url {:?} cannot be a base", url)); - url.path_segments_mut().map_err(|_| err)?.pop(); - } + let url = Url::parse(uri).or_else(|e| { + let assumed_path = PathBuf::from(uri); + Url::from_file_path(assumed_path).map_err(|_| UrlParseError(e)) + })?; Ok(url) } @@ -44,85 +41,273 @@ pub fn get_scheme_authority(url: &Url) -> String { } /// Joins a base URL with a list of segments. +/// +/// # Arguments +/// * `base_url` - Base URL to join segments with +/// * `segments` - Path segments to append +/// +/// # Returns +/// The joined URL, or an error if the URL cannot be a base pub fn join_url_segments(base_url: &Url, segments: &[&str]) -> Result { let mut url = base_url.clone(); - if url.path().ends_with('/') { - url.path_segments_mut().unwrap().pop(); - } + // Verify URL can be used as a base and get mutable path segments + let mut path_segments = url + .path_segments_mut() + .map_err(|_| InvalidPath(format!("URL '{}' cannot be a base", base_url)))?; + + // Remove trailing empty segment if path ends with '/' + path_segments.pop_if_empty(); - for &seg in segments { - let segs: Vec<_> = seg.split('/').filter(|&s| !s.is_empty()).collect(); - let err = InvalidPath(format!("Url {:?} cannot be a base", url)); - url.path_segments_mut().map_err(|_| err)?.extend(segs); + // Add new segments, normalizing backslashes to forward slashes + for segment in segments { + // Normalize backslashes to forward slashes for URLs + let normalized = segment.replace('\\', "/"); + for part in normalized.split('/') { + if !part.is_empty() { + path_segments.push(part); + } + } } + drop(path_segments); + Ok(url) } +/// Joins path segments into a single path string. +/// +/// # Arguments +/// * `segments` - Path segments to join +/// +/// # Returns +/// The joined path as a string, or an error if the path contains invalid UTF-8 +pub fn join_path_segments(segments: &[&str]) -> Result { + let path = PathBuf::from_iter(Vec::from(segments)); + path.to_str() + .map(|s| s.to_string()) + .ok_or_else(|| InvalidPath(format!("Path contains invalid UTF-8: {:?}", path))) +} + #[cfg(test)] mod tests { use super::*; - use std::str::FromStr; - - #[test] - fn parse_valid_uri_in_various_forms() { - let urls = vec![ - parse_uri("/foo/").unwrap(), - parse_uri("file:/foo/").unwrap(), - parse_uri("file:///foo/").unwrap(), - parse_uri("hdfs://foo/").unwrap(), - parse_uri("s3://foo").unwrap(), - parse_uri("s3://foo/").unwrap(), - parse_uri("s3a://foo/bar/").unwrap(), - parse_uri("gs://foo/").unwrap(), - parse_uri("wasb://foo/bar").unwrap(), - parse_uri("wasbs://foo/").unwrap(), - ]; - let schemes = vec![ - "file", "file", "file", "hdfs", "s3", "s3", "s3a", "gs", "wasb", "wasbs", - ]; - let paths = vec![ - "/foo", "/foo", "/foo", "/", "", "/", "/bar", "/", "/bar", "/", - ]; - assert_eq!(urls.iter().map(|u| u.scheme()).collect::>(), schemes); - assert_eq!(urls.iter().map(|u| u.path()).collect::>(), paths); - } - - #[test] - fn join_base_url_with_segments() { - let base_url = Url::from_str("file:///base").unwrap(); - assert_eq!( - join_url_segments(&base_url, &["foo"]).unwrap(), - Url::from_str("file:///base/foo").unwrap() - ); + #[test] + fn test_parse_uri_with_valid_url() { + let uri = "s3://bucket/path/to/file"; + let result = parse_uri(uri); + assert!(result.is_ok()); + let url = result.unwrap(); + assert_eq!(url.scheme(), "s3"); + assert_eq!(url.host_str(), Some("bucket")); + assert_eq!(url.path(), "/path/to/file"); + } - assert_eq!( - join_url_segments(&base_url, &["/foo"]).unwrap(), - Url::from_str("file:///base/foo").unwrap() - ); + #[test] + fn test_parse_uri_with_http_url() { + let uri = "http://example.com:8080/path?query=value"; + let result = parse_uri(uri); + assert!(result.is_ok()); + let url = result.unwrap(); + assert_eq!(url.scheme(), "http"); + assert_eq!(url.host_str(), Some("example.com")); + assert_eq!(url.port(), Some(8080)); + assert_eq!(url.path(), "/path"); + assert_eq!(url.query(), Some("query=value")); + } - assert_eq!( - join_url_segments(&base_url, &["/foo", "bar/", "/baz/"]).unwrap(), - Url::from_str("file:///base/foo/bar/baz").unwrap() - ); + #[test] + fn test_parse_uri_with_file_path() { + // Test parsing a file path - should convert to file:// URL + let uri = "/tmp/test/file.txt"; + let result = parse_uri(uri); + assert!(result.is_ok()); + let url = result.unwrap(); + assert_eq!(url.scheme(), "file"); + } - assert_eq!( - join_url_segments(&base_url, &["foo/", "", "bar/baz"]).unwrap(), - Url::from_str("file:///base/foo/bar/baz").unwrap() - ); + #[test] + fn test_parse_uri_with_relative_path() { + let uri = "relative/path/to/file"; + let result = parse_uri(uri); + // Relative paths should fail with Url::parse but might succeed with from_file_path + // depending on the current working directory + assert!(result.is_ok() || result.is_err()); + } + + #[test] + fn test_parse_uri_with_windows_path() { + let uri = "C:\\Users\\test\\file.txt"; + let result = parse_uri(uri); + // Windows paths should be converted to file:// URLs + assert!(result.is_ok()); + } + + #[test] + fn test_parse_uri_with_hdfs_url() { + let uri = "hdfs://namenode:9000/user/hive/warehouse"; + let result = parse_uri(uri); + assert!(result.is_ok()); + let url = result.unwrap(); + assert_eq!(url.scheme(), "hdfs"); + assert_eq!(url.host_str(), Some("namenode")); + assert_eq!(url.port(), Some(9000)); + } + + #[test] + fn test_get_scheme_authority_with_s3() { + let url = Url::parse("s3://my-bucket/path/to/file").unwrap(); + let result = get_scheme_authority(&url); + assert_eq!(result, "s3://my-bucket"); + } + + #[test] + fn test_get_scheme_authority_with_http() { + let url = Url::parse("http://example.com:8080/path").unwrap(); + let result = get_scheme_authority(&url); + assert_eq!(result, "http://example.com:8080"); + } + + #[test] + fn test_get_scheme_authority_with_file() { + let url = Url::parse("file:///tmp/test").unwrap(); + let result = get_scheme_authority(&url); + assert_eq!(result, "file://"); + } + #[test] + fn test_join_url_segments_basic() { + let base_url = Url::parse("s3://bucket/base").unwrap(); + let segments = vec!["path", "to", "file"]; + let result = join_url_segments(&base_url, &segments); + assert!(result.is_ok()); + assert_eq!(result.unwrap().path(), "/base/path/to/file"); + } + + #[test] + fn test_join_url_segments_with_trailing_slash() { + let base_url = Url::parse("s3://bucket/base/").unwrap(); + let segments = vec!["path", "to", "file"]; + let result = join_url_segments(&base_url, &segments); + assert!(result.is_ok()); + assert_eq!(result.unwrap().path(), "/base/path/to/file"); + } + + #[test] + fn test_join_url_segments_with_empty_segments() { + let base_url = Url::parse("s3://bucket/base").unwrap(); + let segments = vec!["path", "", "file"]; + let result = join_url_segments(&base_url, &segments); + assert!(result.is_ok()); + assert_eq!(result.unwrap().path(), "/base/path/file"); + } + + #[test] + fn test_join_url_segments_with_backslashes() { + let base_url = Url::parse("s3://bucket/base").unwrap(); + let segments = vec!["path\\to", "file"]; + let result = join_url_segments(&base_url, &segments); + assert!(result.is_ok()); + // Backslashes should be converted to forward slashes + assert_eq!(result.unwrap().path(), "/base/path/to/file"); + } + + #[test] + fn test_join_url_segments_with_nested_slashes() { + let base_url = Url::parse("s3://bucket/base").unwrap(); + let segments = vec!["path/to/nested", "file"]; + let result = join_url_segments(&base_url, &segments); + assert!(result.is_ok()); + assert_eq!(result.unwrap().path(), "/base/path/to/nested/file"); + } + + #[test] + fn test_join_url_segments_with_non_base_url() { + // Cannot-be-a-base URLs like "mailto:" should fail + let base_url = Url::parse("mailto:user@example.com").unwrap(); + let segments = vec!["path"]; + let result = join_url_segments(&base_url, &segments); + assert!(result.is_err()); + } + + #[test] + fn test_join_url_segments_empty_segments_list() { + let base_url = Url::parse("s3://bucket/base").unwrap(); + let segments: Vec<&str> = vec![]; + let result = join_url_segments(&base_url, &segments); + assert!(result.is_ok()); + assert_eq!(result.unwrap().path(), "/base"); + } + + #[test] + fn test_join_path_segments_basic() { + let segments = vec!["path", "to", "file"]; + let result = join_path_segments(&segments); + assert!(result.is_ok()); + let path = result.unwrap(); + // Result will vary based on OS + assert!(path.contains("path")); + assert!(path.contains("to")); + assert!(path.contains("file")); + } + + #[test] + fn test_join_path_segments_empty() { + let segments: Vec<&str> = vec![]; + let result = join_path_segments(&segments); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), ""); + } + + #[test] + fn test_join_path_segments_single() { + let segments = vec!["file"]; + let result = join_path_segments(&segments); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), "file"); + } + + #[test] + fn test_parse_uri_with_gcs_url() { + let uri = "gs://my-bucket/path/to/data"; + let result = parse_uri(uri); + assert!(result.is_ok()); + let url = result.unwrap(); + assert_eq!(url.scheme(), "gs"); + assert_eq!(url.host_str(), Some("my-bucket")); + assert_eq!(url.path(), "/path/to/data"); + } + + #[test] + fn test_parse_uri_with_azure_url() { + let uri = "wasbs://container@account.blob.core.windows.net/path"; + let result = parse_uri(uri); + assert!(result.is_ok()); + let url = result.unwrap(); + assert_eq!(url.scheme(), "wasbs"); + } + + #[test] + fn test_join_url_segments_with_special_characters() { + let base_url = Url::parse("s3://bucket/base").unwrap(); + let segments = vec!["path-with-dash", "file_with_underscore"]; + let result = join_url_segments(&base_url, &segments); + assert!(result.is_ok()); assert_eq!( - join_url_segments(&base_url, &["foo1/bar1", "foo2/bar2"]).unwrap(), - Url::from_str("file:///base/foo1/bar1/foo2/bar2").unwrap() + result.unwrap().path(), + "/base/path-with-dash/file_with_underscore" ); } #[test] - fn join_failed_due_to_invalid_base() { - let base_url = Url::from_str("foo:text/plain,bar").unwrap(); - let result = join_url_segments(&base_url, &["foo"]); - assert!(result.is_err()); + fn test_join_url_segments_preserves_query_params() { + let base_url = Url::parse("s3://bucket/base?key=value").unwrap(); + let segments = vec!["path"]; + let result = join_url_segments(&base_url, &segments); + assert!(result.is_ok()); + let url = result.unwrap(); + assert_eq!(url.path(), "/base/path"); + assert_eq!(url.query(), Some("key=value")); } } diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs index d6b61b49..dd1b30d9 100644 --- a/crates/core/src/table/builder.rs +++ b/crates/core/src/table/builder.rs @@ -19,12 +19,12 @@ use paste::paste; use std::collections::HashMap; -use std::path::PathBuf; use std::sync::Arc; use crate::config::table::HudiTableConfig; use crate::config::util::{parse_data_for_options, split_hudi_options_from_others}; use crate::config::{HudiConfigs, HUDI_CONF_DIR}; +use crate::storage::util::{join_url_segments, parse_uri}; use crate::storage::Storage; use crate::table::fs_view::FileSystemView; use crate::table::validation::validate_configs; @@ -262,21 +262,22 @@ impl OptionResolver { options: &mut HashMap, storage: Arc, ) -> Result<()> { - let global_config_path = std::env::var(HUDI_CONF_DIR) - .map(PathBuf::from) - .unwrap_or_else(|_| PathBuf::from("/etc/hudi/conf")) - .join("hudi-defaults.conf"); + let global_hudi_conf = parse_uri( + &std::env::var(HUDI_CONF_DIR).unwrap_or_else(|_| "/etc/hudi/conf".to_string()), + ) + .and_then(|url| join_url_segments(&url, &["hudi-defaults.conf"]))?; if let Ok(bytes) = storage - .get_file_data_from_absolute_path(global_config_path.to_str().unwrap()) + .get_file_data_from_url_path(global_hudi_conf.path()) .await { if let Ok(global_configs) = parse_data_for_options(&bytes, " \t=") { - for (key, value) in global_configs { - if key.starts_with("hoodie.") && !options.contains_key(&key) { - options.insert(key.to_string(), value.to_string()); - } - } + global_configs + .into_iter() + .filter(|(key, _)| key.starts_with("hoodie.")) + .for_each(|(key, value)| { + options.entry(key).or_insert(value); + }); } } diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index b0d6f4d9..77029e47 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -775,7 +775,6 @@ mod tests { use hudi_test::{assert_arrow_field_names_eq, assert_avro_field_names_eq, SampleTable}; use std::collections::HashSet; use std::fs::canonicalize; - use std::path::PathBuf; use std::{env, panic}; /// Test helper to create a new `Table` instance without validating the configuration. @@ -784,12 +783,9 @@ mod tests { /// /// * `table_dir_name` - Name of the table root directory; all under `crates/core/tests/data/`. fn get_test_table_without_validation(table_dir_name: &str) -> Table { - let base_url = Url::from_file_path( - canonicalize(PathBuf::from("tests").join("data").join(table_dir_name)).unwrap(), - ) - .unwrap(); + let base_path = canonicalize("tests/data").unwrap().join(table_dir_name); Table::new_with_options_blocking( - base_url.as_str(), + base_path.to_str().unwrap(), [("hoodie.internal.skip.config.validation", "true")], ) .unwrap() diff --git a/crates/core/src/timeline/instant.rs b/crates/core/src/timeline/instant.rs index e3e191e1..385890aa 100644 --- a/crates/core/src/timeline/instant.rs +++ b/crates/core/src/timeline/instant.rs @@ -19,11 +19,10 @@ use crate::config::table::TimelineTimezoneValue; use crate::error::CoreError; use crate::metadata::HUDI_METADATA_DIR; -use crate::storage::error::StorageError; +use crate::storage::util::join_path_segments; use crate::Result; use chrono::{DateTime, Local, NaiveDateTime, TimeZone, Timelike, Utc}; use std::cmp::Ordering; -use std::path::PathBuf; use std::str::FromStr; #[derive(Clone, Debug, Eq, PartialEq)] @@ -217,16 +216,7 @@ impl Instant { } pub fn relative_path(&self) -> Result { - let mut commit_file_path = PathBuf::from(HUDI_METADATA_DIR); - commit_file_path.push(self.file_name()); - commit_file_path - .to_str() - .ok_or(StorageError::InvalidPath(format!( - "Failed to get file path for {:?}", - self - ))) - .map_err(CoreError::Storage) - .map(|s| s.to_string()) + Ok(join_path_segments(&[HUDI_METADATA_DIR, &self.file_name()])?) } pub fn is_replacecommit(&self) -> bool { diff --git a/python/src/internal.rs b/python/src/internal.rs index 45f545b1..30bcd971 100644 --- a/python/src/internal.rs +++ b/python/src/internal.rs @@ -20,7 +20,6 @@ use arrow::pyarrow::ToPyArrow; use std::collections::HashMap; use std::convert::From; -use std::path::PathBuf; use std::sync::OnceLock; use tokio::runtime::Runtime; @@ -30,7 +29,7 @@ use hudi::error::CoreError; use hudi::file_group::file_slice::FileSlice; use hudi::file_group::reader::FileGroupReader; use hudi::file_group::FileGroup; -use hudi::storage::error::StorageError; +use hudi::storage::util::join_path_segments; use hudi::table::builder::TableBuilder; use hudi::table::Table; use hudi::timeline::instant::Instant; @@ -151,35 +150,14 @@ pub struct HudiFileSlice { #[pymethods] impl HudiFileSlice { fn base_file_relative_path(&self) -> PyResult { - let path = PathBuf::from(&self.partition_path) - .join(&self.base_file_name) - .to_str() - .map(String::from) - .ok_or_else(|| { - StorageError::InvalidPath(format!( - "Failed to get base file relative path for file slice: {:?}", - self - )) - }) - .map_err(CoreError::from) - .map_err(PythonError::from)?; + let path = join_path_segments(&[&self.partition_path, &self.base_file_name]) + .map_err(convert_to_py_err)?; Ok(path) } fn log_files_relative_paths(&self) -> PyResult> { let mut paths = Vec::::new(); for name in self.log_file_names.iter() { - let p = PathBuf::from(&self.partition_path) - .join(name) - .to_str() - .map(String::from) - .ok_or_else(|| { - StorageError::InvalidPath(format!( - "Failed to get log file relative path for file slice: {:?}", - self - )) - }) - .map_err(CoreError::from) - .map_err(PythonError::from)?; + let p = join_path_segments(&[&self.partition_path, name]).map_err(convert_to_py_err)?; paths.push(p) } Ok(paths)