diff --git a/docker-compose.yml b/docker-compose.yml index 0667d9ac434..149e44890f0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -157,7 +157,7 @@ services: - all - fake-gcs-server volumes: - - fake_gcs_server_data:/data/sample-bucket + - fake_gcs_server_data:/data command: -scheme http grafana: diff --git a/quickwit/quickwit-storage/src/lib.rs b/quickwit/quickwit-storage/src/lib.rs index a9e3714d7e0..a02d7b56524 100644 --- a/quickwit/quickwit-storage/src/lib.rs +++ b/quickwit/quickwit-storage/src/lib.rs @@ -74,7 +74,7 @@ pub use self::object_storage::{ #[cfg(feature = "gcs")] pub use self::opendal_storage::GoogleCloudStorageFactory; #[cfg(all(feature = "gcs", feature = "integration-testsuite"))] -pub use self::opendal_storage::new_emulated_google_cloud_storage; +pub use self::opendal_storage::test_config_helpers; pub use self::ram_storage::{RamStorage, RamStorageBuilder}; pub use self::split::{SplitPayload, SplitPayloadBuilder}; #[cfg(any(test, feature = "testsuite"))] diff --git a/quickwit/quickwit-storage/src/object_storage/policy.rs b/quickwit/quickwit-storage/src/object_storage/policy.rs index fc63efa77da..6ce48ab7a94 100644 --- a/quickwit/quickwit-storage/src/object_storage/policy.rs +++ b/quickwit/quickwit-storage/src/object_storage/policy.rs @@ -63,8 +63,7 @@ impl MultiPartPolicy { } } -// Default values from https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java -// The best default value may however differ depending on vendors. +// The best default value may differ depending on vendors. impl Default for MultiPartPolicy { fn default() -> Self { MultiPartPolicy { diff --git a/quickwit/quickwit-storage/src/opendal_storage/base.rs b/quickwit/quickwit-storage/src/opendal_storage/base.rs index f0eefed8f74..985fcc1ce63 100644 --- a/quickwit/quickwit-storage/src/opendal_storage/base.rs +++ b/quickwit/quickwit-storage/src/opendal_storage/base.rs @@ -26,8 +26,8 @@ use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt}; use crate::metrics::object_storage_get_slice_in_flight_guards; use crate::storage::SendableAsync; use crate::{ - BulkDeleteError, OwnedBytes, PutPayload, Storage, StorageError, StorageErrorKind, - StorageResolverError, StorageResult, + BulkDeleteError, MultiPartPolicy, OwnedBytes, PutPayload, Storage, StorageError, + StorageErrorKind, StorageResolverError, StorageResult, }; /// OpenDAL based storage implementation. @@ -35,11 +35,10 @@ use crate::{ /// /// - Implement REQUEST_SEMAPHORE to control the concurrency. /// - Implement STORAGE_METRICS for metrics. -/// - Add multipart_policy to control write at once or via multiple. -#[derive(Clone)] pub struct OpendalStorage { uri: Uri, op: Operator, + multipart_policy: MultiPartPolicy, } impl fmt::Debug for OpendalStorage { @@ -58,7 +57,16 @@ impl OpendalStorage { cfg: opendal::services::Gcs, ) -> Result { let op = Operator::new(cfg)?.finish(); - Ok(Self { uri, op }) + Ok(Self { + uri, + op, + // limits are the same as on S3 + multipart_policy: MultiPartPolicy::default(), + }) + } + + pub fn set_policy(&mut self, multipart_policy: MultiPartPolicy) { + self.multipart_policy = multipart_policy; } } @@ -69,10 +77,6 @@ impl Storage for OpendalStorage { Ok(()) } - /// # TODO - /// - /// We can implement something like `multipart_policy` determine whether to use copy. - /// If the payload is small enough, we can call `op.write()` at once. async fn put(&self, path: &Path, payload: Box) -> StorageResult<()> { let path = path.as_os_str().to_string_lossy(); let mut payload_reader = payload.byte_stream().await?.into_async_read(); @@ -80,6 +84,7 @@ impl Storage for OpendalStorage { let mut storage_writer = self .op .writer_with(&path) + .chunk(self.multipart_policy.part_num_bytes(payload.len()) as usize) .await? .into_futures_async_write() .compat_write(); @@ -148,7 +153,7 @@ impl Storage for OpendalStorage { #[cfg(feature = "integration-testsuite")] { let storage_info = self.op.info(); - if storage_info.name() == "sample-bucket" + if storage_info.name().starts_with("sample-bucket") && storage_info.scheme() == opendal::Scheme::Gcs { let mut bulk_error = BulkDeleteError::default(); diff --git a/quickwit/quickwit-storage/src/opendal_storage/google_cloud_storage.rs b/quickwit/quickwit-storage/src/opendal_storage/google_cloud_storage.rs index b108b11454a..f722c6545a8 100644 --- a/quickwit/quickwit-storage/src/opendal_storage/google_cloud_storage.rs +++ b/quickwit/quickwit-storage/src/opendal_storage/google_cloud_storage.rs @@ -50,13 +50,18 @@ impl StorageFactory for GoogleCloudStorageFactory { } } -/// Creates an emulated storage for testing. +/// Helpers to configure the GCP local test setup. #[cfg(feature = "integration-testsuite")] -pub fn new_emulated_google_cloud_storage( - uri: &Uri, -) -> Result { +pub mod test_config_helpers { + use super::*; + + /// URL of the local GCP emulator. + pub const LOCAL_GCP_EMULATOR_ENDPOINT: &str = "http://127.0.0.1:4443"; + + /// reqsign::GoogleTokenLoad implementation for testing. #[derive(Debug)] - struct DummyTokenLoader; + pub struct DummyTokenLoader; + #[async_trait] impl reqsign::GoogleTokenLoad for DummyTokenLoader { async fn load(&self, _: reqwest::Client) -> anyhow::Result> { @@ -67,16 +72,21 @@ pub fn new_emulated_google_cloud_storage( ))) } } - let (bucket, root) = parse_google_uri(uri).expect("must be valid google uri"); - let cfg = opendal::services::Gcs::default() - .bucket(&bucket) - .root(&root.to_string_lossy()) - .endpoint("http://127.0.0.1:4443") - .customized_token_loader(Box::new(DummyTokenLoader)); - - let store = OpendalStorage::new_google_cloud_storage(uri.clone(), cfg)?; - Ok(store) + /// Creates a storage connecting to a local emulated google cloud storage. + pub fn new_emulated_google_cloud_storage( + uri: &Uri, + ) -> Result { + let (bucket, root) = parse_google_uri(uri).expect("must be valid google uri"); + + let cfg = opendal::services::Gcs::default() + .bucket(&bucket) + .root(&root.to_string_lossy()) + .endpoint(LOCAL_GCP_EMULATOR_ENDPOINT) + .customized_token_loader(Box::new(DummyTokenLoader)); + let store = OpendalStorage::new_google_cloud_storage(uri.clone(), cfg)?; + Ok(store) + } } fn from_uri( diff --git a/quickwit/quickwit-storage/src/opendal_storage/mod.rs b/quickwit/quickwit-storage/src/opendal_storage/mod.rs index 3b887b9ff0d..a7d7bb57983 100644 --- a/quickwit/quickwit-storage/src/opendal_storage/mod.rs +++ b/quickwit/quickwit-storage/src/opendal_storage/mod.rs @@ -19,4 +19,4 @@ mod google_cloud_storage; pub use google_cloud_storage::GoogleCloudStorageFactory; #[cfg(feature = "integration-testsuite")] -pub use google_cloud_storage::new_emulated_google_cloud_storage; +pub use google_cloud_storage::test_config_helpers; diff --git a/quickwit/quickwit-storage/tests/google_cloud_storage.rs b/quickwit/quickwit-storage/tests/google_cloud_storage.rs index 36a37c30c28..17fa37f8c7a 100644 --- a/quickwit/quickwit-storage/tests/google_cloud_storage.rs +++ b/quickwit/quickwit-storage/tests/google_cloud_storage.rs @@ -16,29 +16,90 @@ // to Fake GCS Server (the emulated google cloud storage environment) #[cfg(all(feature = "integration-testsuite", feature = "gcs"))] -#[tokio::test] #[cfg_attr(not(feature = "ci-test"), ignore)] -async fn google_cloud_storage_test_suite() -> anyhow::Result<()> { +mod gcp_storage_test_suite { use std::str::FromStr; use anyhow::Context; + use quickwit_common::rand::append_random_suffix; + use quickwit_common::setup_logging_for_tests; use quickwit_common::uri::Uri; - use quickwit_storage::new_emulated_google_cloud_storage; - let _ = tracing_subscriber::fmt::try_init(); - - let mut object_storage = - new_emulated_google_cloud_storage(&Uri::from_str("gs://sample-bucket")?)?; - quickwit_storage::storage_test_suite(&mut object_storage).await?; - - let mut object_storage = new_emulated_google_cloud_storage(&Uri::from_str( - "gs://sample-bucket/integration-tests/test-azure-compatible-storage", - )?)?; - quickwit_storage::storage_test_single_part_upload(&mut object_storage) - .await - .context("test single-part upload failed")?; - - quickwit_storage::storage_test_multi_part_upload(&mut object_storage) - .await - .context("test multipart upload failed")?; - Ok(()) + use quickwit_storage::test_config_helpers::{ + DummyTokenLoader, LOCAL_GCP_EMULATOR_ENDPOINT, new_emulated_google_cloud_storage, + }; + use reqsign::GoogleTokenLoad; + + pub async fn sign_gcs_request(req: &mut reqwest::Request) -> anyhow::Result<()> { + let client = reqwest::Client::new(); + let token = DummyTokenLoader + .load(client.clone()) + .await? + .ok_or_else(|| anyhow::anyhow!("Failed to obtain authentication token"))?; + + let signer = reqsign::GoogleSigner::new("storage"); + signer.sign(req, &token)?; + + Ok(()) + } + + async fn create_gcs_bucket(bucket_name: &str) -> anyhow::Result<()> { + let client = reqwest::Client::new(); + let url = format!("{LOCAL_GCP_EMULATOR_ENDPOINT}/storage/v1/b"); + let mut request = client + .post(url) + .body(serde_json::to_vec(&serde_json::json!({ + "name": bucket_name, + }))?) + .header(reqwest::header::CONTENT_TYPE, "application/json") + .build()?; + + sign_gcs_request(&mut request).await?; + + let response = client.execute(request).await?; + + if !response.status().is_success() { + let error_text = response.text().await?; + anyhow::bail!("Failed to create bucket: {}", error_text); + }; + Ok(()) + } + + #[tokio::test] + async fn google_cloud_storage_test_suite() -> anyhow::Result<()> { + setup_logging_for_tests(); + + let bucket_name = append_random_suffix("sample-bucket").to_lowercase(); + create_gcs_bucket(bucket_name.as_str()) + .await + .context("Failed to create test GCS bucket")?; + + let mut object_storage = + new_emulated_google_cloud_storage(&Uri::from_str(&format!("gs://{bucket_name}"))?)?; + + quickwit_storage::storage_test_suite(&mut object_storage).await?; + + let mut object_storage = new_emulated_google_cloud_storage(&Uri::from_str(&format!( + "gs://{bucket_name}/integration-tests/test-gcs-storage" + ))?)?; + + quickwit_storage::storage_test_single_part_upload(&mut object_storage) + .await + .context("test single-part upload failed")?; + + // TODO: Uncomment storage_test_multi_part_upload when the XML API is + // supported in the emulated GCS server + // (https://github.com/fsouza/fake-gcs-server/pull/1164) + + // object_storage.set_policy(MultiPartPolicy { + // target_part_num_bytes: 5 * 1_024 * 1_024, + // max_num_parts: 10_000, + // multipart_threshold_num_bytes: 10_000_000, + // max_object_num_bytes: 5_000_000_000_000, + // max_concurrent_uploads: 100, + // }); + // quickwit_storage::storage_test_multi_part_upload(&mut object_storage) + // .await + // .context("test multipart upload failed")?; + Ok(()) + } }