Skip to content

Fix gcs integration tests and use default multipart size #5765

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ services:
- all
- fake-gcs-server
volumes:
- fake_gcs_server_data:/data/sample-bucket
- fake_gcs_server_data:/data
command: -scheme http

grafana:
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub use self::object_storage::{
#[cfg(feature = "gcs")]
pub use self::opendal_storage::GoogleCloudStorageFactory;
#[cfg(all(feature = "gcs", feature = "integration-testsuite"))]
pub use self::opendal_storage::new_emulated_google_cloud_storage;
pub use self::opendal_storage::test_config_helpers;
pub use self::ram_storage::{RamStorage, RamStorageBuilder};
pub use self::split::{SplitPayload, SplitPayloadBuilder};
#[cfg(any(test, feature = "testsuite"))]
Expand Down
3 changes: 1 addition & 2 deletions quickwit/quickwit-storage/src/object_storage/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ impl MultiPartPolicy {
}
}

// Default values from https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not true since target_part_num_bytes was set to 5GB

// The best default value may however differ depending on vendors.
// The best default value may differ depending on vendors.
impl Default for MultiPartPolicy {
fn default() -> Self {
MultiPartPolicy {
Expand Down
25 changes: 15 additions & 10 deletions quickwit/quickwit-storage/src/opendal_storage/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,19 @@ use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
use crate::metrics::object_storage_get_slice_in_flight_guards;
use crate::storage::SendableAsync;
use crate::{
BulkDeleteError, OwnedBytes, PutPayload, Storage, StorageError, StorageErrorKind,
StorageResolverError, StorageResult,
BulkDeleteError, MultiPartPolicy, OwnedBytes, PutPayload, Storage, StorageError,
StorageErrorKind, StorageResolverError, StorageResult,
};

/// OpenDAL based storage implementation.
/// # TODO
///
/// - Implement REQUEST_SEMAPHORE to control the concurrency.
/// - Implement STORAGE_METRICS for metrics.
/// - Add multipart_policy to control write at once or via multiple.
#[derive(Clone)]
pub struct OpendalStorage {
uri: Uri,
op: Operator,
multipart_policy: MultiPartPolicy,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be configured now because OpenDAL does use multipart upload

}

impl fmt::Debug for OpendalStorage {
Expand All @@ -58,7 +57,16 @@ impl OpendalStorage {
cfg: opendal::services::Gcs,
) -> Result<Self, StorageResolverError> {
let op = Operator::new(cfg)?.finish();
Ok(Self { uri, op })
Ok(Self {
uri,
op,
// limits are the same as on S3
multipart_policy: MultiPartPolicy::default(),
})
}

pub fn set_policy(&mut self, multipart_policy: MultiPartPolicy) {
self.multipart_policy = multipart_policy;
}
}

Expand All @@ -69,17 +77,14 @@ impl Storage for OpendalStorage {
Ok(())
}

/// # TODO
///
/// We can implement something like `multipart_policy` determine whether to use copy.
/// If the payload is small enough, we can call `op.write()` at once.
async fn put(&self, path: &Path, payload: Box<dyn PutPayload>) -> StorageResult<()> {
let path = path.as_os_str().to_string_lossy();
let mut payload_reader = payload.byte_stream().await?.into_async_read();

let mut storage_writer = self
.op
.writer_with(&path)
.chunk(self.multipart_policy.part_num_bytes(payload.len()) as usize)
.await?
.into_futures_async_write()
.compat_write();
Expand Down Expand Up @@ -148,7 +153,7 @@ impl Storage for OpendalStorage {
#[cfg(feature = "integration-testsuite")]
{
let storage_info = self.op.info();
if storage_info.name() == "sample-bucket"
if storage_info.name().starts_with("sample-bucket")
&& storage_info.scheme() == opendal::Scheme::Gcs
{
let mut bulk_error = BulkDeleteError::default();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,18 @@ impl StorageFactory for GoogleCloudStorageFactory {
}
}

/// Creates an emulated storage for testing.
/// Helpers to configure the GCP local test setup.
#[cfg(feature = "integration-testsuite")]
pub fn new_emulated_google_cloud_storage(
uri: &Uri,
) -> Result<OpendalStorage, StorageResolverError> {
pub mod test_config_helpers {
use super::*;

/// URL of the local GCP emulator.
pub const LOCAL_GCP_EMULATOR_ENDPOINT: &str = "http://127.0.0.1:4443";

/// reqsign::GoogleTokenLoad implementation for testing.
#[derive(Debug)]
struct DummyTokenLoader;
pub struct DummyTokenLoader;

#[async_trait]
impl reqsign::GoogleTokenLoad for DummyTokenLoader {
async fn load(&self, _: reqwest::Client) -> anyhow::Result<Option<reqsign::GoogleToken>> {
Expand All @@ -67,16 +72,21 @@ pub fn new_emulated_google_cloud_storage(
)))
}
}
let (bucket, root) = parse_google_uri(uri).expect("must be valid google uri");

let cfg = opendal::services::Gcs::default()
.bucket(&bucket)
.root(&root.to_string_lossy())
.endpoint("http://127.0.0.1:4443")
.customized_token_loader(Box::new(DummyTokenLoader));

let store = OpendalStorage::new_google_cloud_storage(uri.clone(), cfg)?;
Ok(store)
/// Creates a storage connecting to a local emulated google cloud storage.
pub fn new_emulated_google_cloud_storage(
uri: &Uri,
) -> Result<OpendalStorage, StorageResolverError> {
let (bucket, root) = parse_google_uri(uri).expect("must be valid google uri");

let cfg = opendal::services::Gcs::default()
.bucket(&bucket)
.root(&root.to_string_lossy())
.endpoint(LOCAL_GCP_EMULATOR_ENDPOINT)
.customized_token_loader(Box::new(DummyTokenLoader));
let store = OpendalStorage::new_google_cloud_storage(uri.clone(), cfg)?;
Ok(store)
}
}

fn from_uri(
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-storage/src/opendal_storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ mod google_cloud_storage;

pub use google_cloud_storage::GoogleCloudStorageFactory;
#[cfg(feature = "integration-testsuite")]
pub use google_cloud_storage::new_emulated_google_cloud_storage;
pub use google_cloud_storage::test_config_helpers;
101 changes: 81 additions & 20 deletions quickwit/quickwit-storage/tests/google_cloud_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,90 @@
// to Fake GCS Server (the emulated google cloud storage environment)

#[cfg(all(feature = "integration-testsuite", feature = "gcs"))]
#[tokio::test]
#[cfg_attr(not(feature = "ci-test"), ignore)]
async fn google_cloud_storage_test_suite() -> anyhow::Result<()> {
mod gcp_storage_test_suite {
use std::str::FromStr;

use anyhow::Context;
use quickwit_common::rand::append_random_suffix;
use quickwit_common::setup_logging_for_tests;
use quickwit_common::uri::Uri;
use quickwit_storage::new_emulated_google_cloud_storage;
let _ = tracing_subscriber::fmt::try_init();

let mut object_storage =
new_emulated_google_cloud_storage(&Uri::from_str("gs://sample-bucket")?)?;
quickwit_storage::storage_test_suite(&mut object_storage).await?;

let mut object_storage = new_emulated_google_cloud_storage(&Uri::from_str(
"gs://sample-bucket/integration-tests/test-azure-compatible-storage",
)?)?;
quickwit_storage::storage_test_single_part_upload(&mut object_storage)
.await
.context("test single-part upload failed")?;

quickwit_storage::storage_test_multi_part_upload(&mut object_storage)
.await
.context("test multipart upload failed")?;
Ok(())
use quickwit_storage::test_config_helpers::{
DummyTokenLoader, LOCAL_GCP_EMULATOR_ENDPOINT, new_emulated_google_cloud_storage,
};
use reqsign::GoogleTokenLoad;

pub async fn sign_gcs_request(req: &mut reqwest::Request) -> anyhow::Result<()> {
let client = reqwest::Client::new();
let token = DummyTokenLoader
.load(client.clone())
.await?
.ok_or_else(|| anyhow::anyhow!("Failed to obtain authentication token"))?;

let signer = reqsign::GoogleSigner::new("storage");
signer.sign(req, &token)?;

Ok(())
}

async fn create_gcs_bucket(bucket_name: &str) -> anyhow::Result<()> {
let client = reqwest::Client::new();
let url = format!("{LOCAL_GCP_EMULATOR_ENDPOINT}/storage/v1/b");
let mut request = client
.post(url)
.body(serde_json::to_vec(&serde_json::json!({
"name": bucket_name,
}))?)
.header(reqwest::header::CONTENT_TYPE, "application/json")
.build()?;

sign_gcs_request(&mut request).await?;

let response = client.execute(request).await?;

if !response.status().is_success() {
let error_text = response.text().await?;
anyhow::bail!("Failed to create bucket: {}", error_text);
};
Ok(())
}

#[tokio::test]
async fn google_cloud_storage_test_suite() -> anyhow::Result<()> {
setup_logging_for_tests();

let bucket_name = append_random_suffix("sample-bucket").to_lowercase();
create_gcs_bucket(bucket_name.as_str())
.await
.context("Failed to create test GCS bucket")?;

let mut object_storage =
new_emulated_google_cloud_storage(&Uri::from_str(&format!("gs://{bucket_name}"))?)?;

quickwit_storage::storage_test_suite(&mut object_storage).await?;

let mut object_storage = new_emulated_google_cloud_storage(&Uri::from_str(&format!(
"gs://{bucket_name}/integration-tests/test-gcs-storage"
))?)?;

quickwit_storage::storage_test_single_part_upload(&mut object_storage)
.await
.context("test single-part upload failed")?;

// TODO: Uncomment storage_test_multi_part_upload when the XML API is
// supported in the emulated GCS server
// (https://github.com/fsouza/fake-gcs-server/pull/1164)

Comment on lines +89 to +92
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

were you able to confirm this test would pass on a real GCS storage?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, and I checked in the GCS audit logs that it was indeed a multipart upload (it didn't show the part size but there were 3 parts which matches 15MB / 5MB)

// object_storage.set_policy(MultiPartPolicy {
// target_part_num_bytes: 5 * 1_024 * 1_024,
// max_num_parts: 10_000,
// multipart_threshold_num_bytes: 10_000_000,
// max_object_num_bytes: 5_000_000_000_000,
// max_concurrent_uploads: 100,
// });
// quickwit_storage::storage_test_multi_part_upload(&mut object_storage)
// .await
// .context("test multipart upload failed")?;
Ok(())
}
}