diff --git a/Makefile b/Makefile index b9dbc7d..8563cce 100644 --- a/Makefile +++ b/Makefile @@ -105,12 +105,16 @@ build-buckets-mdapi: | $(CARGO_EXEC) $(CARGO) build --release .PHONY: test -test: test-unit +test: test-unit test-integration .PHONY: test-unit test-unit: | $(CARGO_EXEC) $(CARGO) test --lib +.PHONY: test-integration +test-integration: | $(CARGO_EXEC) + $(CARGO) test --test rpc_handlers -- --nocapture + include ./deps/eng/tools/mk/Makefile.deps include ./deps/eng/tools/mk/Makefile.agent_prebuilt.targ include ./deps/eng/tools/mk/Makefile.smf.targ diff --git a/README.md b/README.md index 1fee98c..8147b8e 100644 --- a/README.md +++ b/README.md @@ -128,3 +128,10 @@ The functional tests require that postgresql is installed as well as and configure a temporary postgres database and once the test has completed the temporary database is removed within a few seconds. The tests use the same code used by the `schema-manager` to prepare the database for use by the test suite. + +To build `ephemeralpg` from source on SmartOS, the socket library must be linked +explicitly: + +``` +LDFLAGS=-lsocket make +``` diff --git a/buckets-mdapi/src/discovery.rs b/buckets-mdapi/src/discovery.rs new file mode 100644 index 0000000..d4ea640 --- /dev/null +++ b/buckets-mdapi/src/discovery.rs @@ -0,0 +1,494 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright 2026 Edgecast Cloud LLC. + +use serde_derive::{Deserialize, Serialize}; +use serde_json::Error as SerdeError; +use serde_json::Value; +use slog::{debug, error, Logger}; +use uuid::Uuid; + +use cueball_postgres_connection::PostgresConnection; +use fast_rpc::protocol::{FastMessage, FastMessageData}; + +use crate::error::BucketsMdapiError; +use crate::metrics::RegisteredMetrics; +use crate::sql; +use crate::types::{HandlerResponse, HasRequestId}; +use crate::util::array_wrap; + +// --------------------------------------------------------------------------- +// listvnodes +// --------------------------------------------------------------------------- + +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +pub struct ListVnodesPayload { + pub request_id: Uuid, +} + +impl HasRequestId for ListVnodesPayload { + fn request_id(&self) -> Uuid { + self.request_id + } +} + +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +pub struct ListVnodesResponse { + pub vnodes: Vec, +} + +pub(crate) fn decode_listvnodes_msg( + value: &Value, +) -> Result, SerdeError> { + serde_json::from_value::>(value.clone()) +} + +// --------------------------------------------------------------------------- +// listowners +// --------------------------------------------------------------------------- + +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +pub struct ListOwnersPayload { + pub vnode: u64, + pub request_id: Uuid, +} + +impl HasRequestId for ListOwnersPayload { + fn request_id(&self) -> Uuid { + self.request_id + } +} + +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +pub struct ListOwnersResponse { + pub owners: Vec, +} + +pub(crate) fn decode_listowners_msg( + value: &Value, +) -> Result, SerdeError> { + serde_json::from_value::>(value.clone()) +} + +// --------------------------------------------------------------------------- +// listvnodes action +// --------------------------------------------------------------------------- + +/// Schema prefix for per-vnode partitions (e.g. `manta_bucket_0`). +const SCHEMA_PREFIX: &str = "manta_bucket_"; + +// PostgreSQL does not support parameterised identifiers (schema names), +// so we use format!. Both interpolated values are compile-time constants. +fn listvnodes_sql() -> String { + format!( + "SELECT schema_name \ + FROM information_schema.schemata \ + WHERE left(schema_name, {}) = '{}' \ + ORDER BY schema_name", + SCHEMA_PREFIX.len(), + SCHEMA_PREFIX + ) +} + +fn parse_vnode_from_schema(schema_name: &str) -> Option { + if schema_name.starts_with(SCHEMA_PREFIX) { + schema_name[SCHEMA_PREFIX.len()..].parse::().ok() + } else { + None + } +} + +fn listvnodes_response_to_json(resp: ListVnodesResponse) -> Value { + serde_json::to_value(resp).expect("failed to serialize ListVnodesResponse") +} + +fn do_listvnodes( + conn: &mut PostgresConnection, + metrics: &RegisteredMetrics, + log: &Logger, +) -> Result { + let sql = listvnodes_sql(); + + sql::query( + sql::Method::ListVnodes, + conn, + sql.as_str(), + &[], + metrics, + log, + ) + .map_err(|e| e.to_string()) + .map(|rows| { + // SQL already returns rows ORDER BY schema_name + let vnodes: Vec = rows + .iter() + .filter_map(|row| { + let name: String = row.get("schema_name"); + parse_vnode_from_schema(&name) + }) + .collect(); + ListVnodesResponse { vnodes } + }) +} + +#[allow(clippy::needless_pass_by_value)] +pub(crate) fn listvnodes_action( + msg_id: u32, + method: &str, + metrics: &RegisteredMetrics, + log: &Logger, + _payload: ListVnodesPayload, + conn: &mut PostgresConnection, +) -> Result { + do_listvnodes(conn, metrics, log) + .map(|resp| { + debug!(log, "operation successful"); + let value = listvnodes_response_to_json(resp); + let msg_data = + FastMessageData::new(method.into(), array_wrap(value)); + FastMessage::data(msg_id, msg_data).into() + }) + .or_else(|e| { + error!(log, "operation failed"; "error" => &e); + let err = BucketsMdapiError::PostgresError(e); + let msg_data = FastMessageData::new( + method.into(), + array_wrap(err.into_fast()), + ); + let msg: HandlerResponse = + FastMessage::data(msg_id, msg_data).into(); + Ok(msg) + }) +} + +// --------------------------------------------------------------------------- +// listowners action +// --------------------------------------------------------------------------- + +// PostgreSQL does not support parameterised identifiers (schema names), +// so we use format!. `vnode` is a u64, so no injection risk. +fn listowners_sql(vnode: u64) -> String { + format!( + "SELECT DISTINCT owner \ + FROM {}{}.manta_bucket", + SCHEMA_PREFIX, vnode + ) +} + +/// Lightweight single-row check for vnode schema existence, avoiding +/// the full table scan that `do_listvnodes` performs. +fn vnode_schema_exists( + vnode: u64, + conn: &mut PostgresConnection, + metrics: &RegisteredMetrics, + log: &Logger, +) -> Result { + let schema_name = format!("{}{}", SCHEMA_PREFIX, vnode); + let sql = "SELECT 1 FROM information_schema.schemata \ + WHERE schema_name = $1 LIMIT 1"; + + sql::query( + sql::Method::VnodeExists, + conn, + sql, + &[&schema_name], + metrics, + log, + ) + .map_err(|e| e.to_string()) + .map(|rows| !rows.is_empty()) +} + +fn listowners_response_to_json(resp: ListOwnersResponse) -> Value { + serde_json::to_value(resp).expect("failed to serialize ListOwnersResponse") +} + +fn do_listowners( + vnode: u64, + conn: &mut PostgresConnection, + metrics: &RegisteredMetrics, + log: &Logger, +) -> Result { + let sql = listowners_sql(vnode); + + sql::query( + sql::Method::ListOwners, + conn, + sql.as_str(), + &[], + metrics, + log, + ) + .map_err(|e| e.to_string()) + .map(|rows| { + let owners: Vec = + rows.iter().map(|row| row.get("owner")).collect(); + ListOwnersResponse { owners } + }) +} + +#[allow(clippy::needless_pass_by_value)] +pub(crate) fn listowners_action( + msg_id: u32, + method: &str, + metrics: &RegisteredMetrics, + log: &Logger, + payload: ListOwnersPayload, + conn: &mut PostgresConnection, +) -> Result { + // Validate that the requested vnode schema exists before + // querying it, to return a clean application-level error + // instead of a raw PostgreSQL "relation does not exist". + match vnode_schema_exists(payload.vnode, conn, metrics, log) { + Ok(false) => { + let e = format!( + "vnode {} does not exist on this mdapi instance", + payload.vnode + ); + error!(log, "operation failed"; "error" => &e); + let err = BucketsMdapiError::PostgresError(e); + let msg_data = FastMessageData::new( + method.into(), + array_wrap(err.into_fast()), + ); + let msg: HandlerResponse = + FastMessage::data(msg_id, msg_data).into(); + return Ok(msg); + } + Err(e) => { + error!(log, "operation failed"; "error" => &e); + let err = BucketsMdapiError::PostgresError(e); + let msg_data = FastMessageData::new( + method.into(), + array_wrap(err.into_fast()), + ); + let msg: HandlerResponse = + FastMessage::data(msg_id, msg_data).into(); + return Ok(msg); + } + Ok(true) => {} // vnode exists, proceed + } + + do_listowners(payload.vnode, conn, metrics, log) + .map(|resp| { + debug!(log, "operation successful"); + let value = listowners_response_to_json(resp); + let msg_data = + FastMessageData::new(method.into(), array_wrap(value)); + FastMessage::data(msg_id, msg_data).into() + }) + .or_else(|e| { + error!(log, "operation failed"; "error" => &e); + let err = BucketsMdapiError::PostgresError(e); + let msg_data = FastMessageData::new( + method.into(), + array_wrap(err.into_fast()), + ); + let msg: HandlerResponse = + FastMessage::data(msg_id, msg_data).into(); + Ok(msg) + }) +} + +#[cfg(test)] +mod test { + use super::*; + + use quickcheck::{quickcheck, Arbitrary, Gen}; + use serde_json; + use serde_json::Map; + + // -- ListVnodesPayload --------------------------------------------------- + + #[derive(Clone, Debug)] + struct ListVnodesJson(Value); + + impl Arbitrary for ListVnodesJson { + fn arbitrary(_g: &mut G) -> Self { + let request_id = serde_json::to_value(Uuid::new_v4()) + .expect("failed to convert request_id field to Value"); + + let mut obj = Map::new(); + obj.insert("request_id".into(), request_id); + ListVnodesJson(Value::Object(obj)) + } + } + + impl Arbitrary for ListVnodesPayload { + fn arbitrary(_g: &mut G) -> Self { + ListVnodesPayload { + request_id: Uuid::new_v4(), + } + } + } + + // -- ListOwnersPayload --------------------------------------------------- + + #[derive(Clone, Debug)] + struct ListOwnersJson(Value); + + impl Arbitrary for ListOwnersJson { + fn arbitrary(g: &mut G) -> Self { + let vnode = serde_json::to_value(u64::arbitrary(g)) + .expect("failed to convert vnode field to Value"); + let request_id = serde_json::to_value(Uuid::new_v4()) + .expect("failed to convert request_id field to Value"); + + let mut obj = Map::new(); + obj.insert("vnode".into(), vnode); + obj.insert("request_id".into(), request_id); + ListOwnersJson(Value::Object(obj)) + } + } + + impl Arbitrary for ListOwnersPayload { + fn arbitrary(g: &mut G) -> Self { + ListOwnersPayload { + vnode: u64::arbitrary(g), + request_id: Uuid::new_v4(), + } + } + } + + // -- ListVnodesResponse -------------------------------------------------- + + impl Arbitrary for ListVnodesResponse { + fn arbitrary(g: &mut G) -> Self { + let count = usize::arbitrary(g) % 16; + let vnodes = (0..count).map(|_| u64::arbitrary(g)).collect(); + ListVnodesResponse { vnodes } + } + } + + // -- ListOwnersResponse -------------------------------------------------- + + impl Arbitrary for ListOwnersResponse { + fn arbitrary(g: &mut G) -> Self { + let count = usize::arbitrary(g) % 16; + let owners = (0..count).map(|_| Uuid::new_v4()).collect(); + ListOwnersResponse { owners } + } + } + + // -- Unit tests for SQL and parsing -------------------------------------- + + #[test] + fn test_parse_vnode_from_schema() { + assert_eq!(parse_vnode_from_schema("manta_bucket_0"), Some(0)); + assert_eq!(parse_vnode_from_schema("manta_bucket_42"), Some(42)); + assert_eq!(parse_vnode_from_schema("manta_bucket_1023"), Some(1023)); + assert_eq!(parse_vnode_from_schema("public"), None); + assert_eq!(parse_vnode_from_schema("manta_bucket_"), None); + assert_eq!(parse_vnode_from_schema("manta_bucket_abc"), None); + } + + #[test] + fn test_listvnodes_sql_contains_schema_prefix() { + let sql = listvnodes_sql(); + assert!(sql.contains("information_schema.schemata")); + assert!(sql.contains("manta_bucket_")); + } + + #[test] + fn test_listowners_sql_contains_vnode() { + let sql = listowners_sql(7); + assert!(sql.contains("manta_bucket_7.manta_bucket")); + assert!(sql.contains("DISTINCT owner")); + } + + // -- Quickcheck properties ----------------------------------------------- + + quickcheck! { + fn prop_listvnodes_payload_serialize_deserialize_identity(msg: ListVnodesPayload) -> bool { + match serde_json::to_string(&msg) { + Ok(s) => { + let decode_result: Result = + serde_json::from_str(&s); + match decode_result { + Ok(decoded) => decoded == msg, + Err(_) => false, + } + } + Err(_) => false, + } + } + } + + quickcheck! { + fn prop_listowners_payload_serialize_deserialize_identity(msg: ListOwnersPayload) -> bool { + match serde_json::to_string(&msg) { + Ok(s) => { + let decode_result: Result = + serde_json::from_str(&s); + match decode_result { + Ok(decoded) => decoded == msg, + Err(_) => false, + } + } + Err(_) => false, + } + } + } + + quickcheck! { + fn prop_listvnodes_response_serialize_deserialize_identity(msg: ListVnodesResponse) -> bool { + match serde_json::to_string(&msg) { + Ok(s) => { + let decode_result: Result = + serde_json::from_str(&s); + match decode_result { + Ok(decoded) => decoded == msg, + Err(_) => false, + } + } + Err(_) => false, + } + } + } + + quickcheck! { + fn prop_listowners_response_serialize_deserialize_identity(msg: ListOwnersResponse) -> bool { + match serde_json::to_string(&msg) { + Ok(s) => { + let decode_result: Result = + serde_json::from_str(&s); + match decode_result { + Ok(decoded) => decoded == msg, + Err(_) => false, + } + } + Err(_) => false, + } + } + } + + quickcheck! { + fn prop_listvnodes_payload_from_json(json: ListVnodesJson) -> bool { + let decode_result1: Result = + serde_json::from_value(json.0.clone()); + let res1 = decode_result1.is_ok(); + + let decode_result2: Result, _> = + serde_json::from_value(Value::Array(vec![json.0])); + let res2 = decode_result2.is_ok(); + + res1 && res2 + } + } + + quickcheck! { + fn prop_listowners_payload_from_json(json: ListOwnersJson) -> bool { + let decode_result1: Result = + serde_json::from_value(json.0.clone()); + let res1 = decode_result1.is_ok(); + + let decode_result2: Result, _> = + serde_json::from_value(Value::Array(vec![json.0])); + let res2 = decode_result2.is_ok(); + + res1 && res2 + } + } +} diff --git a/buckets-mdapi/src/lib.rs b/buckets-mdapi/src/lib.rs index 68851a4..4f88b43 100644 --- a/buckets-mdapi/src/lib.rs +++ b/buckets-mdapi/src/lib.rs @@ -1,9 +1,11 @@ // Copyright 2020 Joyent, Inc. +// Copyright 2026 Edgecast Cloud LLC. #![allow(clippy::module_name_repetitions)] pub mod bucket; pub mod conditional; +pub mod discovery; pub mod error; pub mod gc; pub mod metrics; @@ -29,6 +31,7 @@ pub mod util { use fast_rpc::protocol::{FastMessage, FastMessageData}; use crate::bucket; + use crate::discovery; use crate::error::BucketsMdapiError; use crate::gc; use crate::metrics::RegisteredMetrics; @@ -123,6 +126,17 @@ pub mod util { metrics, log, ), + "batchupdateobjects" => handle_request( + msg.id, + method, + object::batch_update::decode_msg( + &msg.data.d, + ), + &mut conn, + &object::batch_update::action, + metrics, + log, + ), "deleteobject" => handle_request( msg.id, method, @@ -195,6 +209,24 @@ pub mod util { metrics, log, ), + "listvnodes" => handle_request( + msg.id, + method, + discovery::decode_listvnodes_msg(&msg.data.d), + &mut conn, + &discovery::listvnodes_action, + metrics, + log, + ), + "listowners" => handle_request( + msg.id, + method, + discovery::decode_listowners_msg(&msg.data.d), + &mut conn, + &discovery::listowners_action, + metrics, + log, + ), _ => { let err_msg = format!("Unsupported functon: {}", method); Err(HandlerError::IO(other_error(&err_msg))) diff --git a/buckets-mdapi/src/object.rs b/buckets-mdapi/src/object.rs index 1e1cfc7..e85726e 100644 --- a/buckets-mdapi/src/object.rs +++ b/buckets-mdapi/src/object.rs @@ -1,5 +1,6 @@ // Copyright 2020 Joyent, Inc. // Copyright 2023 MNX Cloud, Inc. +// Copyright 2026 Edgecast Cloud LLC. use std::error::Error; use std::vec::Vec; @@ -16,6 +17,7 @@ use crate::conditional; use crate::error::BucketsMdapiError; use crate::types::{HasRequestId, Hstore, RowSlice, Timestamptz}; +pub mod batch_update; pub mod create; pub mod delete; pub mod get; diff --git a/buckets-mdapi/src/object/batch_update.rs b/buckets-mdapi/src/object/batch_update.rs new file mode 100644 index 0000000..297248c --- /dev/null +++ b/buckets-mdapi/src/object/batch_update.rs @@ -0,0 +1,473 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright 2026 Edgecast Cloud LLC. + +use std::collections::HashMap; + +use serde_derive::{Deserialize, Serialize}; +use serde_json::Error as SerdeError; +use serde_json::Value; +use slog::{debug, error, warn, Logger}; +use uuid::Uuid; + +use cueball_postgres_connection::PostgresConnection; +use fast_rpc::protocol::{FastMessage, FastMessageData}; + +use crate::conditional; +use crate::error::BucketsMdapiError; +use crate::metrics::RegisteredMetrics; +use crate::object::update::UpdateObjectPayload; +use crate::sql; +use crate::types::{HandlerResponse, HasRequestId}; +use crate::util::array_wrap; + +/// Maximum number of objects allowed in a single batch +/// RPC. Prevents excessively long-running transactions +/// that could starve other connections. +/// 1000 was chosen as by default S3 lists 1000 objects +/// per call. +const MAX_BATCH_SIZE: usize = 1000; + +/// Payload for the batchupdateobjects RPC. +/// +/// Contains a list of individual update payloads and a +/// request-level identifier for tracing. +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +pub struct BatchUpdateObjectsPayload { + pub objects: Vec, + pub request_id: Uuid, +} + +impl HasRequestId for BatchUpdateObjectsPayload { + fn request_id(&self) -> Uuid { + self.request_id + } +} + +/// A vnode whose entire transaction was rolled back. +/// +/// All objects in this group must be retried together +/// since the transaction is atomic per vnode. The +/// `objects` field contains the original request payloads +/// so clients can resubmit them directly. +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +pub struct VnodeFailure { + pub vnode: u64, + /// The error that caused the rollback. + pub error: Value, + /// Original request payloads — ready for retry. + pub objects: Vec, +} + +/// Aggregate response for batchupdateobjects RPC. +/// +/// Only contains failures. Transactions are atomic per +/// vnode: either every object on a vnode commits, or the +/// entire vnode group is rolled back. Objects not present +/// in `failed_vnodes` succeeded — the client already has +/// the data it sent, so the server does not echo it back. +/// +/// # Checking for errors +/// +/// ```ignore +/// if response.is_success() { /* all done */ } +/// ``` +/// +/// # Retrying failures +/// +/// Each [`VnodeFailure`] carries the original +/// [`UpdateObjectPayload`] entries that were sent in the +/// request. Callers can resubmit them directly without +/// correlating IDs back to the original batch: +/// +/// ```ignore +/// for vf in &response.failed_vnodes { +/// let retry = BatchUpdateObjectsPayload { +/// objects: vf.objects.clone(), +/// request_id: Uuid::new_v4(), +/// }; +/// client.batch_update_objects(retry); +/// } +/// ``` +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +pub struct BatchUpdateObjectsResponse { + pub failed_vnodes: Vec, +} + +impl BatchUpdateObjectsResponse { + /// Returns true when every object was updated + /// successfully. + pub fn is_success(&self) -> bool { + self.failed_vnodes.is_empty() + } + + /// Number of objects whose vnode transaction failed. + pub fn failed_count(&self) -> usize { + self.failed_vnodes.iter().map(|v| v.objects.len()).sum() + } +} + +/// Decode a Fast RPC message into a batch payload. +pub(crate) fn decode_msg( + value: &Value, +) -> Result, SerdeError> { + serde_json::from_value::>(value.clone()) +} + +/// Fast RPC action handler for batchupdateobjects. +/// +/// Dispatches to `do_batch_update` and wraps the result +/// in a Fast response message. +#[allow(clippy::needless_pass_by_value)] +pub(crate) fn action( + msg_id: u32, + method: &str, + metrics: &RegisteredMetrics, + log: &Logger, + payload: BatchUpdateObjectsPayload, + conn: &mut PostgresConnection, +) -> Result { + do_batch_update(&payload, conn, metrics, log) + .and_then(|resp| { + debug!(log, "batch operation complete"; + "failed" => resp.failed_count(), + ); + let value = serde_json::to_value(&resp).map_err(|e| { + BucketsMdapiError::PostgresError(format!( + "failed to serialize response: {}", + e + )) + })?; + let msg_data = + FastMessageData::new(method.into(), array_wrap(value)); + let msg: HandlerResponse = + FastMessage::data(msg_id, msg_data).into(); + Ok(msg) + }) + .or_else(|e| { + if let BucketsMdapiError::PostgresError(_) = &e { + error!(log, "batch operation failed"; + "error" => e.message()); + } + let msg_data = + FastMessageData::new(method.into(), array_wrap(e.into_fast())); + let msg: HandlerResponse = + FastMessage::data(msg_id, msg_data).into(); + Ok(msg) + }) +} + +/// Execute a batch update with per-vnode atomic +/// transactions. +/// +/// # Algorithm +/// +/// 1. Group objects by vnode. +/// 2. For each vnode group, open a PostgreSQL transaction. +/// 3. Within the transaction, run conditional checks and +/// UPDATE for each object. +/// 4. If all objects in the vnode succeed, COMMIT. +/// If any object fails, ROLLBACK the entire vnode group. +/// 5. Collect per-vnode failures. +/// +/// # Time complexity +/// +/// O(N) where N is total items across vnodes: one SQL UPDATE per +/// item, grouped into V transactions (V = distinct +/// vnodes). +/// +/// # Space complexity +/// +/// O(N) for payload storage and response assembly. +fn do_batch_update( + payload: &BatchUpdateObjectsPayload, + conn: &mut PostgresConnection, + metrics: &RegisteredMetrics, + log: &Logger, +) -> Result { + let objects = &payload.objects; + + if objects.is_empty() { + return Ok(BatchUpdateObjectsResponse { + failed_vnodes: Vec::new(), + }); + } + + if objects.len() > MAX_BATCH_SIZE { + return Err(BucketsMdapiError::LimitConstraintError(format!( + "batch size {} exceeds maximum of {}", + objects.len(), + MAX_BATCH_SIZE + ))); + } + + // Group objects by vnode for per-vnode transactions. + let mut groups: HashMap> = HashMap::new(); + for obj in objects { + groups.entry(obj.vnode).or_insert_with(Vec::new).push(obj); + } + + debug!(log, "batch update grouped"; + "request_id" => payload.request_id.to_string(), + "total_objects" => objects.len(), + "vnode_groups" => groups.len(), + ); + + let mut failed_vnodes: Vec = Vec::new(); + + for (vnode, items) in &groups { + if let Err((err, failed_payloads)) = + update_vnode_group(*vnode, items, conn, metrics, log) + { + warn!(log, "vnode group failed"; + "request_id" => payload.request_id.to_string(), + "vnode" => vnode, + "objects" => ?failed_payloads.iter().map(|p| p.name.as_str()).collect::>(), + "error" => err.message(), + ); + let err_value = err.into_fast(); + failed_vnodes.push(VnodeFailure { + vnode: *vnode, + error: err_value, + objects: failed_payloads, + }); + } + } + + Ok(BatchUpdateObjectsResponse { failed_vnodes }) +} + +/// Update all objects in a single vnode within one +/// PostgreSQL transaction. +/// +/// # Invariant +/// +/// All objects succeed and the transaction commits, or +/// any failure causes a rollback and all objects in this +/// group are marked failed. +fn update_vnode_group( + vnode: u64, + items: &[&UpdateObjectPayload], + conn: &mut PostgresConnection, + metrics: &RegisteredMetrics, + log: &Logger, +) -> Result<(), (BucketsMdapiError, Vec)> { + // Clone payloads only on error paths to avoid + // allocation on the common (success) path. + let clone_payloads = || items.iter().map(|i| (*i).clone()).collect(); + + let mut txn = conn.transaction().map_err(|e| { + ( + BucketsMdapiError::PostgresError(e.to_string()), + clone_payloads(), + ) + })?; + + let update_sql = update_sql(vnode); + + for item in items { + // Run conditional checks within the transaction. + if let Err(e) = conditional::request( + &mut txn, + &[&item.owner, &item.bucket_id, &item.name], + item.vnode, + &item.conditions, + metrics, + log, + ) { + // Condition failed: rollback entire vnode + // group. txn is dropped, triggering implicit + // rollback. + return Err((e, clone_payloads())); + } + + let exec_result = sql::txn_execute( + sql::Method::ObjectBatchUpdate, + &mut txn, + update_sql.as_str(), + &[ + &item.content_type, + &item.headers, + &item.sharks, + &item.properties, + &item.owner, + &item.bucket_id, + &item.name, + &item.id, + ], + metrics, + log, + ); + + match exec_result { + Ok(0) => { + return Err(( + BucketsMdapiError::ObjectNotFound, + clone_payloads(), + )); + } + Ok(_) => {} + Err(e) => { + return Err(( + BucketsMdapiError::PostgresError(e.to_string()), + clone_payloads(), + )); + } + } + } + + // All objects in this vnode succeeded; commit. + txn.commit().map_err(|e| { + ( + BucketsMdapiError::PostgresError(e.to_string()), + clone_payloads(), + ) + })?; + + Ok(()) +} + +/// Generate the UPDATE SQL for a given vnode. +/// +/// Identical to `object::update::update_sql` but kept +/// local to avoid coupling to a private function. +fn update_sql(vnode: u64) -> String { + [ + "UPDATE manta_bucket_", + &vnode.to_string(), + ".manta_bucket_object \ + SET content_type = $1, \ + headers = $2, \ + sharks = COALESCE($3, sharks), \ + properties = $4, \ + modified = current_timestamp \ + WHERE owner = $5 \ + AND bucket_id = $6 \ + AND name = $7 \ + AND id = $8", + ] + .concat() +} + +#[cfg(test)] +mod test { + use super::*; + + use quickcheck::{quickcheck, Arbitrary, Gen}; + use quickcheck_helpers::random; + + impl Arbitrary for BatchUpdateObjectsPayload { + fn arbitrary(g: &mut G) -> Self { + let count = (usize::arbitrary(g) % 10) + 1; + let objects: Vec = (0..count) + .map(|_| UpdateObjectPayload::arbitrary(g)) + .collect(); + BatchUpdateObjectsPayload { + objects, + request_id: Uuid::new_v4(), + } + } + } + + impl Arbitrary for VnodeFailure { + fn arbitrary(g: &mut G) -> Self { + let count = (usize::arbitrary(g) % 5) + 1; + VnodeFailure { + vnode: u64::arbitrary(g), + error: Value::String(random::string(g, 16)), + objects: (0..count) + .map(|_| UpdateObjectPayload::arbitrary(g)) + .collect(), + } + } + } + + impl Arbitrary for BatchUpdateObjectsResponse { + fn arbitrary(g: &mut G) -> Self { + let fail_count = usize::arbitrary(g) % 3; + BatchUpdateObjectsResponse { + failed_vnodes: (0..fail_count) + .map(|_| VnodeFailure::arbitrary(g)) + .collect(), + } + } + } + + quickcheck! { + fn prop_batch_payload_roundtrip( + msg: BatchUpdateObjectsPayload + ) -> bool { + match serde_json::to_string(&msg) { + Ok(s) => { + let decoded: Result< + BatchUpdateObjectsPayload, _ + > = serde_json::from_str(&s); + match decoded { + Ok(d) => d == msg, + Err(_) => false, + } + } + Err(_) => false, + } + } + } + + quickcheck! { + fn prop_batch_response_roundtrip( + msg: BatchUpdateObjectsResponse + ) -> bool { + match serde_json::to_string(&msg) { + Ok(s) => { + let decoded: Result< + BatchUpdateObjectsResponse, _ + > = serde_json::from_str(&s); + match decoded { + Ok(d) => d == msg, + Err(_) => false, + } + } + Err(_) => false, + } + } + } + + quickcheck! { + fn prop_batch_payload_from_json( + payload: BatchUpdateObjectsPayload + ) -> bool { + let val = serde_json::to_value(&payload) + .expect("serialize"); + let arr = Value::Array(vec![val]); + let decoded: Result< + Vec, _ + > = serde_json::from_value(arr); + decoded.is_ok() + } + } + + #[test] + fn test_empty_batch_returns_zero_counts() { + let payload = BatchUpdateObjectsPayload { + objects: Vec::new(), + request_id: Uuid::new_v4(), + }; + // We cannot call do_batch_update without a DB + // connection, but we verify the payload is valid + // and the empty-objects path is exercised via + // the serialization roundtrip. + let json = serde_json::to_string(&payload).unwrap(); + let decoded: BatchUpdateObjectsPayload = + serde_json::from_str(&json).unwrap(); + assert_eq!(decoded.objects.len(), 0); + } + + #[test] + fn test_update_sql_contains_vnode() { + let sql = update_sql(42); + assert!(sql.contains("manta_bucket_42")); + assert!(sql.contains("COALESCE")); + assert!(!sql.contains("RETURNING")); + } +} diff --git a/buckets-mdapi/src/object/update.rs b/buckets-mdapi/src/object/update.rs index 319a4b2..ca6e875 100644 --- a/buckets-mdapi/src/object/update.rs +++ b/buckets-mdapi/src/object/update.rs @@ -1,5 +1,6 @@ // Copyright 2020 Joyent, Inc. // Copyright 2023 MNX Cloud, Inc. +// Copyright 2026 Edgecast Cloud LLC. use std::vec::Vec; @@ -15,7 +16,9 @@ use fast_rpc::protocol::{FastMessage, FastMessageData}; use crate::conditional; use crate::error::BucketsMdapiError; use crate::metrics::RegisteredMetrics; -use crate::object::{object_not_found, response, to_json, ObjectResponse}; +use crate::object::{ + object_not_found, response, to_json, ObjectResponse, StorageNodeIdentifier, +}; use crate::sql; use crate::types::{HandlerResponse, HasRequestId, Hstore}; use crate::util::array_wrap; @@ -29,6 +32,8 @@ pub struct UpdateObjectPayload { pub vnode: u64, pub content_type: String, pub headers: Hstore, + #[serde(default)] + pub sharks: Option>, pub properties: Option, pub request_id: Uuid, @@ -113,10 +118,12 @@ fn do_update( &[ &payload.content_type, &payload.headers, + &payload.sharks, &payload.properties, &payload.owner, &payload.bucket_id, &payload.name, + &payload.id, ], metrics, log, @@ -136,16 +143,18 @@ fn update_sql(vnode: u64) -> String { "UPDATE manta_bucket_", &vnode.to_string(), &".manta_bucket_object \ - SET content_type = $1, - headers = $2, \ - properties = $3, \ - modified = current_timestamp \ - WHERE owner = $4 \ - AND bucket_id = $5 \ - AND name = $6 \ - RETURNING id, owner, bucket_id, name, created, modified, \ - content_length, content_md5, content_type, headers, \ - sharks, properties", + SET content_type = $1, \ + headers = $2, \ + sharks = COALESCE($3, sharks), \ + properties = $4, \ + modified = current_timestamp \ + WHERE owner = $5 \ + AND bucket_id = $6 \ + AND name = $7 \ + AND id = $8 \ + RETURNING id, owner, bucket_id, name, created, modified, \ + content_length, content_md5, content_type, headers, \ + sharks, properties", ] .concat() } @@ -161,6 +170,8 @@ mod test { use serde_json; use serde_json::Map; + use crate::object::StorageNodeIdentifier; + #[derive(Clone, Debug)] struct UpdateObjectJson(Value); @@ -196,6 +207,25 @@ mod test { obj.insert("vnode".into(), vnode); obj.insert("content_type".into(), content_type); obj.insert("headers".into(), headers); + // Sometimes include sharks, sometimes omit to + // test #[serde(default)] deserialization. + if bool::arbitrary(g) { + let shark_1 = StorageNodeIdentifier { + datacenter: random::string(g, 32), + manta_storage_id: random::string(g, 32), + }; + let shark_2 = StorageNodeIdentifier { + datacenter: random::string(g, 32), + manta_storage_id: random::string(g, 32), + }; + let sharks = if bool::arbitrary(g) { + serde_json::to_value(vec![shark_1, shark_2]) + .expect("failed to convert sharks to Value") + } else { + Value::Null + }; + obj.insert("sharks".into(), sharks); + } obj.insert("request_id".into(), request_id); UpdateObjectJson(Value::Object(obj)) } @@ -214,7 +244,19 @@ mod test { .insert(random::string(g, 32), Some(random::string(g, 32))); let _ = headers .insert(random::string(g, 32), Some(random::string(g, 32))); - + let shark_1 = StorageNodeIdentifier { + datacenter: random::string(g, 32), + manta_storage_id: random::string(g, 32), + }; + let shark_2 = StorageNodeIdentifier { + datacenter: random::string(g, 32), + manta_storage_id: random::string(g, 32), + }; + let sharks = if bool::arbitrary(g) { + Some(vec![shark_1, shark_2]) + } else { + None + }; let properties = None; let request_id = Uuid::new_v4(); let conditions: conditional::Conditions = Default::default(); @@ -227,6 +269,7 @@ mod test { vnode, content_type, headers, + sharks, properties, request_id, conditions, diff --git a/buckets-mdapi/src/sql.rs b/buckets-mdapi/src/sql.rs index ee40630..9f8a65b 100644 --- a/buckets-mdapi/src/sql.rs +++ b/buckets-mdapi/src/sql.rs @@ -1,4 +1,5 @@ // Copyright 2020 Joyent, Inc. +// Copyright 2026 Edgecast Cloud LLC. use std::fmt::Display; use std::marker::Sync; @@ -29,12 +30,16 @@ pub enum Method { ObjectDelete, ObjectDeleteMove, ObjectUpdate, + ObjectBatchUpdate, GarbageGet, GarbageDelete, GarbageRecordDelete, GarbageBatchIdGet, GarbageBatchIdUpdate, GarbageRefresh, + ListVnodes, + ListOwners, + VnodeExists, } impl Method { @@ -52,12 +57,16 @@ impl Method { Method::ObjectDelete => "ObjectDelete", Method::ObjectDeleteMove => "ObjectDeleteMove", Method::ObjectUpdate => "ObjectUpdate", + Method::ObjectBatchUpdate => "ObjectBatchUpdate", Method::GarbageGet => "GarbageGet", Method::GarbageDelete => "GarbageDelete", Method::GarbageRecordDelete => "GarbageRecordDelete", Method::GarbageBatchIdGet => "GarbageBatchIdGet", Method::GarbageBatchIdUpdate => "GarbageBatchIdUpdate", Method::GarbageRefresh => "GarbageRefresh", + Method::ListVnodes => "ListVnodes", + Method::ListOwners => "ListOwners", + Method::VnodeExists => "VnodeExists", } } } diff --git a/buckets-mdapi/tests/rpc_handlers.rs b/buckets-mdapi/tests/rpc_handlers.rs index 0251743..afd21a7 100644 --- a/buckets-mdapi/tests/rpc_handlers.rs +++ b/buckets-mdapi/tests/rpc_handlers.rs @@ -1,12 +1,17 @@ // Copyright 2020 Joyent, Inc. // Copyright 2023 MNX Cloud, Inc. +// Copyright 2026 Edgecast Cloud LLC. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::net::{IpAddr, Ipv4Addr}; use std::path::Path; use std::process::Command; -use std::sync::Mutex; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex, Once}; +use std::thread; +use std::time::Instant; +use rand::Rng; use serde_json::json; use slog::{error, info, o, Drain, Level, LevelFilter, Logger}; use url::Url; @@ -22,139 +27,274 @@ use fast_rpc::protocol::{FastMessage, FastMessageData}; use buckets_mdapi::bucket; use buckets_mdapi::conditional; +use buckets_mdapi::discovery; use buckets_mdapi::error::{BucketsMdapiError, BucketsMdapiWrappedError}; use buckets_mdapi::gc; use buckets_mdapi::metrics; +use buckets_mdapi::metrics::RegisteredMetrics; use buckets_mdapi::object; use buckets_mdapi::util; use utils::{config, schema}; -// This test suite requires PostgreSQL and pg_tmp -// (http://eradman.com/ephemeralpg/) to be installed on the test system. -#[test] - -fn verify_rpc_handlers() { - let plain = slog_term::PlainSyncDecorator::new(std::io::stdout()); - let log = Logger::root( - Mutex::new(LevelFilter::new( - slog_term::FullFormat::new(plain).build(), - Level::Error, - )) - .fuse(), - o!(), - ); - - let metrics_config = config::ConfigMetrics::default(); - let metrics = metrics::register_metrics(&metrics_config); - - //////////////////////////////////////////////////////////////////////////// - // Check for pg_tmp on the system - //////////////////////////////////////////////////////////////////////////// - let pg_tmp_check_output = Command::new("which") - .arg("pg_tmp") - .output() - .expect("failed to execute process"); - - if !pg_tmp_check_output.status.success() { - error!(log, "pg_tmp is required to run this test"); +// Prometheus metrics are registered into a global registry. +// When tests run in parallel, each test must share the same +// RegisteredMetrics to avoid duplicate-registration panics. +// Uses static mut + Once because Mutex::new is not const fn +// in Rust 1.40 (the target toolchain on SmartOS). +static METRICS_INIT: Once = Once::new(); +static mut METRICS_STORE: Option = None; + +fn shared_metrics() -> RegisteredMetrics { + unsafe { + METRICS_INIT.call_once(|| { + let cfg = config::ConfigMetrics::default(); + METRICS_STORE = Some(metrics::register_metrics(&cfg)); + }); + METRICS_STORE.clone().expect("metrics not initialized") } - assert!(pg_tmp_check_output.status.success()); - - //////////////////////////////////////////////////////////////////////////// - // Create pg_tmp database. This requires that pg_tmp be installed on the - // system running the test. - //////////////////////////////////////////////////////////////////////////// - let create_db_output = - Command::new("../tools/postgres/create-ephemeral-db.sh") - .output() - .expect("failed to execute process"); - - assert!(create_db_output.status.success()); - - let pg_connect_str = String::from_utf8_lossy(&create_db_output.stdout); - - info!(log, "pg url: {}", pg_connect_str); +} - let pg_url = Url::parse(&pg_connect_str) - .expect("failed to parse postgres connection string"); +// Create an ephemeral PostgreSQL database, connection pool, +// and vnode schemas for integration testing. +// +// Requires pg_tmp (ephemeralpg) to be installed. +// +// Implemented as a macro because ConnectionPool contains +// an unnameable closure type from connection_creator(). +// The macro expands inline so the pool type is inferred. +macro_rules! setup_test_env { + ($pool:ident, $metrics:ident, $log:ident) => { + let plain = slog_term::PlainSyncDecorator::new(std::io::stdout()); + let $log = Logger::root( + Mutex::new(LevelFilter::new( + slog_term::FullFormat::new(plain).build(), + Level::Error, + )) + .fuse(), + o!(), + ); - //////////////////////////////////////////////////////////////////////////// - // Create connection pool - //////////////////////////////////////////////////////////////////////////// - let pg_port = pg_url.port().expect("failed to parse postgres port"); - let pg_db = "test"; - let user = "postgres"; - let application_name = "buckets_mdapi_test"; + let $metrics = shared_metrics(); - let pg_config = PostgresConnectionConfig { - user: Some(user.into()), - password: None, - host: None, - port: Some(pg_port), - database: Some(pg_db.into()), - application_name: Some(application_name.into()), - tls_config: TlsConfig::disable(), - }; - - let connection_creator = PostgresConnection::connection_creator(pg_config); - let pool_opts = ConnectionPoolOptions { - max_connections: Some(5), - claim_timeout: None, - log: Some(log.clone()), - rebalancer_action_delay: None, - decoherence_interval: None, - connection_check_interval: None, + let pg_tmp_check_output = Command::new("which") + .arg("pg_tmp") + .output() + .expect("failed to run 'which pg_tmp' — ensure 'which' is available on PATH"); + + if !pg_tmp_check_output.status.success() { + error!($log, "pg_tmp is required to run this test"); + } + assert!(pg_tmp_check_output.status.success()); + + let create_db_output = + Command::new("../tools/postgres/create-ephemeral-db.sh") + .output() + .expect("failed to run '../tools/postgres/create-ephemeral-db.sh' — check the script exists and is executable"); + + assert!(create_db_output.status.success()); + + let pg_connect_str = String::from_utf8_lossy(&create_db_output.stdout); + + info!($log, "pg url: {}", pg_connect_str); + + let pg_url = Url::parse(&pg_connect_str) + .expect("failed to parse postgres connection string"); + + let pg_port = pg_url.port().expect("failed to parse postgres port"); + let pg_db = "test"; + let user = "postgres"; + let application_name = "buckets_mdapi_test"; + + let pg_config = PostgresConnectionConfig { + user: Some(user.into()), + password: None, + host: None, + port: Some(pg_port), + database: Some(pg_db.into()), + application_name: Some(application_name.into()), + tls_config: TlsConfig::disable(), + }; + + let connection_creator = + PostgresConnection::connection_creator(pg_config); + let pool_opts = ConnectionPoolOptions { + max_connections: Some(5), + claim_timeout: None, + log: Some($log.clone()), + rebalancer_action_delay: None, + decoherence_interval: None, + connection_check_interval: None, + }; + + let primary_backend = + (IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), pg_port); + let resolver = StaticIpResolver::new(vec![primary_backend]); + + let $pool = + ConnectionPool::new(pool_opts, resolver, connection_creator); + + let template_dir = "../schema_templates"; + let migrations_dir = Path::new("../migrations"); + + let mut conn = $pool + .claim() + .expect("failed to acquire connection for schema setup"); + + let config = config::ConfigDatabase { + port: pg_port, + database: pg_db.to_owned(), + ..Default::default() + }; + + let vnode_resolver = StaticIpResolver::new(vec![primary_backend]); + + schema::create_bucket_schemas( + &mut conn, + &config, + vnode_resolver, + template_dir, + migrations_dir, + ["0", "1"].to_vec(), + &$log, + ) + .expect("failed to create vnode schemas"); + + drop(conn); }; +} - let primary_backend = (IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), pg_port); - let resolver = StaticIpResolver::new(vec![primary_backend]); - - let pool = ConnectionPool::new(pool_opts, resolver, connection_creator); - - //////////////////////////////////////////////////////////////////////////// - // Setup the vnode schemas - // - // Use the schema-manager functions to create the vnode schemas in the - // postgres database. This sets up two vnode schemas (vnodes 0 and 1). - //////////////////////////////////////////////////////////////////////////// - - let template_dir = "../schema_templates"; - let migrations_dir = Path::new("../migrations"); - - let mut conn = pool - .claim() - .expect("failed to acquire postgres connection for vnode schema setup"); +// Helper macros for common setup operations within tests +// that already have pool, metrics, and log in scope. + +macro_rules! create_test_bucket { + ($msg_id:expr, $owner_id:expr, $bucket:expr, + $vnode:expr, $request_id:expr, + $pool:expr, $metrics:expr, $log:expr) => {{ + let payload = bucket::create::CreateBucketPayload { + owner: $owner_id, + name: $bucket.into(), + vnode: $vnode, + request_id: $request_id, + }; + let json = serde_json::to_value(vec![payload]).unwrap(); + let msg_data = FastMessageData::new("createbucket".into(), json); + let msg = FastMessage::data($msg_id, msg_data); + let result = util::handle_msg(&msg, &$pool, &$metrics, &$log); + assert!(result.is_ok()); + let response = result.unwrap(); + assert_eq!(response.len(), 1); + let resp: Result = + serde_json::from_value(response[0].data.d[0].clone()); + assert!(resp.is_ok()); + assert_eq!(resp.unwrap().name, $bucket); + }}; +} - let config = config::ConfigDatabase { - port: pg_port, - database: pg_db.to_owned(), - ..Default::default() - }; +macro_rules! create_test_object { + ($msg_id:expr, $owner_id:expr, $bucket_id:expr, + $object_name:expr, $object_id:expr, $vnode:expr, + $request_id:expr, + $pool:expr, $metrics:expr, $log:expr) => {{ + let shark1 = object::StorageNodeIdentifier { + datacenter: "us-east-1".into(), + manta_storage_id: + "1.stor.us-east.joyent.com".into(), + }; + let shark2 = object::StorageNodeIdentifier { + datacenter: "us-east-2".into(), + manta_storage_id: + "3.stor.us-east.joyent.com".into(), + }; + let conditions = serde_json::from_value::< + conditional::Conditions, + >(json!({ + "if-none-match": [ "*" ] + })) + .unwrap(); + let payload = object::create::CreateObjectPayload { + owner: $owner_id, + bucket_id: $bucket_id, + name: $object_name.into(), + id: $object_id, + vnode: $vnode, + content_length: 5, + content_md5: "xzY5jJbR9rcrMRhlcmi/8g==".into(), + content_type: "text/plain".into(), + headers: HashMap::new(), + sharks: vec![shark1, shark2], + properties: None, + request_id: $request_id, + conditions, + }; + let json = + serde_json::to_value(vec![payload]).unwrap(); + let msg_data = FastMessageData::new( + "createobject".into(), + json, + ); + let msg = FastMessage::data($msg_id, msg_data); + let result = util::handle_msg( + &msg, &$pool, &$metrics, &$log, + ); + assert!(result.is_ok()); + let response = result.unwrap(); + assert_eq!(response.len(), 1); + let resp: Result = + serde_json::from_value( + response[0].data.d[0].clone(), + ); + assert!(resp.is_ok()); + assert_eq!(&resp.unwrap().name, $object_name); + }}; +} - let vnode_resolver = StaticIpResolver::new(vec![primary_backend]); +macro_rules! delete_test_object { + ($msg_id:expr, $owner_id:expr, $bucket_id:expr, + $object_name:expr, $vnode:expr, $request_id:expr, + $pool:expr, $metrics:expr, $log:expr) => {{ + let payload = object::GetObjectPayload { + owner: $owner_id, + bucket_id: $bucket_id, + name: $object_name.into(), + vnode: $vnode, + request_id: $request_id, + conditions: Default::default(), + }; + let json = serde_json::to_value(vec![payload]).unwrap(); + let msg_data = FastMessageData::new("deleteobject".into(), json); + let msg = FastMessage::data($msg_id, msg_data); + let result = util::handle_msg(&msg, &$pool, &$metrics, &$log); + assert!(result.is_ok()); + let response = result.unwrap(); + assert_eq!(response.len(), 1); + let resp: Result, _> = + serde_json::from_value(response[0].data.d[0].clone()); + assert!(resp.is_ok()); + let items = resp.unwrap(); + assert_eq!(items.len(), 1); + assert_eq!(&items[0].owner, &$owner_id); + assert_eq!(&items[0].bucket_id, &$bucket_id); + assert_eq!(&items[0].name, $object_name); + }}; +} - schema::create_bucket_schemas( - &mut conn, - &config, - vnode_resolver, - template_dir, - migrations_dir, - ["0", "1"].to_vec(), - &log, - ) - .expect("failed to create vnode schemas"); +// This test suite requires PostgreSQL and pg_tmp +// (http://eradman.com/ephemeralpg/) to be installed on +// the test system. - drop(conn); +//////////////////////////////////////////////////////////// +// Bucket CRUD tests +//////////////////////////////////////////////////////////// - //////////////////////////////////////////////////////////////////////////// - // Exercise RPC handlers - //////////////////////////////////////////////////////////////////////////// +#[test] +fn verify_bucket_handlers() { + setup_test_env!(pool, metrics, log); let msg_id: u32 = 0x1; let owner_id = Uuid::new_v4(); let bucket: String = "testbucket".into(); let request_id = Uuid::new_v4(); - // Try to read a bucket + // Get nonexistent bucket let get_bucket_payload = bucket::GetBucketPayload { owner: owner_id, name: bucket.clone(), @@ -178,8 +318,8 @@ fn verify_rpc_handlers() { let get_bucket_response_result_data = get_bucket_response[0].data.d[0].clone(); /* - * All errors should have a "name" and "message" property to comply with the error format in - * the Fast protocol. + * All errors should have a "name" and "message" property + * to comply with the error format in the Fast protocol. */ assert_eq!( get_bucket_response_result_data, @@ -222,7 +362,7 @@ fn verify_rpc_handlers() { assert!(create_bucket_response_result.is_ok()); assert_eq!(create_bucket_response_result.unwrap().name, bucket); - // Read bucket again and make sure the resonse is returned successfully + // Read bucket and verify success get_bucket_result = util::handle_msg(&get_bucket_fast_msg, &pool, &metrics, &log); @@ -235,8 +375,7 @@ fn verify_rpc_handlers() { assert!(get_bucket_response_result.is_ok()); assert_eq!(get_bucket_response_result.unwrap().name, bucket); - // Try to create same bucket again and verify a BucketAlreadyExists error is - // returned + // Create same bucket again -> BucketAlreadyExists create_bucket_result = util::handle_msg(&create_bucket_fast_msg, &pool, &metrics, &log); @@ -253,7 +392,6 @@ fn verify_rpc_handlers() { ); // Delete bucket - let delete_bucket_payload = bucket::DeleteBucketPayload { owner: owner_id, name: bucket.clone(), @@ -278,7 +416,7 @@ fn verify_rpc_handlers() { assert!(delete_bucket_response_result.is_ok()); assert_eq!(delete_bucket_response_result.unwrap(), 1); - // Read bucket again and verify it's gone + // Read bucket again -> BucketNotFound get_bucket_result = util::handle_msg(&get_bucket_fast_msg, &pool, &metrics, &log); @@ -294,7 +432,7 @@ fn verify_rpc_handlers() { BucketsMdapiWrappedError::new(BucketsMdapiError::BucketNotFound), ); - // Attempt to delete a nonexistent bucket and verify an error is returned + // Delete nonexistent bucket -> BucketNotFound delete_bucket_result = util::handle_msg(&delete_bucket_fast_msg, &pool, &metrics, &log); @@ -310,9 +448,64 @@ fn verify_rpc_handlers() { BucketsMdapiWrappedError::new(BucketsMdapiError::BucketNotFound), ); - // Try to read an object + // List buckets (empty) + let list_buckets_payload = bucket::list::ListBucketsPayload { + owner: owner_id, + vnode: 0, + prefix: Some("testbucket".into()), + limit: 1000, + marker: None, + request_id, + }; + + let list_buckets_json = + serde_json::to_value(vec![list_buckets_payload]).unwrap(); + let list_buckets_fast_msg_data = + FastMessageData::new("listbuckets".into(), list_buckets_json); + let list_buckets_fast_msg = + FastMessage::data(msg_id, list_buckets_fast_msg_data); + let mut list_buckets_result = + util::handle_msg(&list_buckets_fast_msg, &pool, &metrics, &log); + + assert!(list_buckets_result.is_ok()); + let list_buckets_response = list_buckets_result.unwrap(); + assert_eq!(list_buckets_response.len(), 0); + + // Create bucket and list again -> 1 + create_bucket_result = + util::handle_msg(&create_bucket_fast_msg, &pool, &metrics, &log); + + assert!(create_bucket_result.is_ok()); + let create_bucket_response = create_bucket_result.unwrap(); + assert_eq!(create_bucket_response.len(), 1); + + let create_bucket_response_result: Result = + serde_json::from_value(create_bucket_response[0].data.d[0].clone()); + assert!(create_bucket_response_result.is_ok()); + assert_eq!(create_bucket_response_result.unwrap().name, bucket); + + list_buckets_result = + util::handle_msg(&list_buckets_fast_msg, &pool, &metrics, &log); + + assert!(list_buckets_result.is_ok()); + let list_buckets_response = list_buckets_result.unwrap(); + assert_eq!(list_buckets_response.len(), 1); +} + +//////////////////////////////////////////////////////////// +// Object CRUD tests +//////////////////////////////////////////////////////////// + +#[test] +fn verify_object_handlers() { + setup_test_env!(pool, metrics, log); + let msg_id: u32 = 0x1; + let owner_id = Uuid::new_v4(); let bucket_id = Uuid::new_v4(); let object: String = "testobject".into(); + let request_id = Uuid::new_v4(); + + // Get nonexistent object let conditions: conditional::Conditions = Default::default(); let get_object_payload = object::GetObjectPayload { owner: owner_id, @@ -344,7 +537,7 @@ fn verify_rpc_handlers() { BucketsMdapiWrappedError::new(BucketsMdapiError::ObjectNotFound), ); - // Try to update an nonexistent object's metadata + // Update nonexistent object let object_id = Uuid::new_v4(); let mut update_headers = HashMap::new(); @@ -358,6 +551,17 @@ fn verify_rpc_handlers() { ); let conditions: conditional::Conditions = Default::default(); + let update_sharks = vec![ + object::StorageNodeIdentifier { + datacenter: "us-west-1".into(), + manta_storage_id: "2.stor.us-west.joyent.com".into(), + }, + object::StorageNodeIdentifier { + datacenter: "us-west-2".into(), + manta_storage_id: "4.stor.us-west.joyent.com".into(), + }, + ]; + let update_object_payload = object::update::UpdateObjectPayload { owner: owner_id, bucket_id, @@ -366,6 +570,7 @@ fn verify_rpc_handlers() { vnode: 1, content_type: "text/html".into(), headers: update_headers, + sharks: Some(update_sharks.clone()), properties: None, request_id, conditions, @@ -392,7 +597,7 @@ fn verify_rpc_handlers() { BucketsMdapiWrappedError::new(BucketsMdapiError::ObjectNotFound), ); - // Create an object, fail with bad etag + // Create object with if-match:"*" -> PreconditionFailed let shark1 = object::StorageNodeIdentifier { datacenter: "us-east-1".into(), manta_storage_id: "1.stor.us-east.joyent.com".into(), @@ -447,7 +652,7 @@ fn verify_rpc_handlers() { ), ); - // Create an object, ensure nothing was there before us + // Create object with if-none-match:"*" -> success let shark1 = object::StorageNodeIdentifier { datacenter: "us-east-1".into(), manta_storage_id: "1.stor.us-east.joyent.com".into(), @@ -495,7 +700,8 @@ fn verify_rpc_handlers() { assert!(create_object_response_result.is_ok()); assert_eq!(create_object_response_result.unwrap().name, object); - // Create an object, fail with bad etag + // Create duplicate with if-none-match:"*" -> + // PreconditionFailed let shark1 = object::StorageNodeIdentifier { datacenter: "us-east-1".into(), manta_storage_id: "1.stor.us-east.joyent.com".into(), @@ -551,7 +757,7 @@ fn verify_rpc_handlers() { ), ); - // Read object again and verify a successful response is returned + // Read object -> success get_object_result = util::handle_msg(&get_object_fast_msg, &pool, &metrics, &log); @@ -566,7 +772,7 @@ fn verify_rpc_handlers() { assert_eq!(get_object_unwrapped_result.name, object); assert_eq!(&get_object_unwrapped_result.content_type, "text/plain"); - // Update the object's metadata and verify it is successful + // Update object metadata -> success update_object_result = util::handle_msg(&update_object_fast_msg, &pool, &metrics, &log); @@ -580,8 +786,10 @@ fn verify_rpc_handlers() { let update_object_unwrapped_result = update_object_response_result.unwrap(); assert_eq!(update_object_unwrapped_result.name, object); assert_eq!(&update_object_unwrapped_result.content_type, "text/html"); + // Verify sharks were persisted by the update + assert_eq!(update_object_unwrapped_result.sharks, update_sharks); - // Read object again and verify the metadata update + // Read object -> verify update persisted get_object_result = util::handle_msg(&get_object_fast_msg, &pool, &metrics, &log); assert!(get_object_result.is_ok()); @@ -595,7 +803,7 @@ fn verify_rpc_handlers() { assert_eq!(get_object_unwrapped_result.name, object); assert_eq!(&get_object_unwrapped_result.content_type, "text/html"); - // Get object with "if-match: correctETag" + // Get with if-match: correct ETag -> success let request_id = Uuid::new_v4(); let conditions = serde_json::from_value::(json!({ @@ -632,7 +840,7 @@ fn verify_rpc_handlers() { assert_eq!(get_object_unwrapped_result.name, object); assert_eq!(&get_object_unwrapped_result.content_type, "text/html"); - // Try get object with "if-match: wrongETag" + // Get with if-match: wrong ETag -> PreconditionFailed let request_id = Uuid::new_v4(); let if_match_etag = Uuid::new_v4(); @@ -677,9 +885,6 @@ fn verify_rpc_handlers() { ); // Delete object - - // The get and delete object args are the same so we can reuse - // get_object_json here. Just lets empty the conditions first. get_object_payload.conditions = Default::default(); let delete_object_json = serde_json::to_value(vec![get_object_payload]).unwrap(); @@ -705,7 +910,7 @@ fn verify_rpc_handlers() { assert_eq!(&delete_object_response[0].bucket_id, &bucket_id); assert_eq!(&delete_object_response[0].name, &object); - // Read object again and verify it is not found + // Read deleted object -> ObjectNotFound get_object_result = util::handle_msg(&get_object_fast_msg, &pool, &metrics, &log); @@ -721,7 +926,7 @@ fn verify_rpc_handlers() { BucketsMdapiWrappedError::new(BucketsMdapiError::ObjectNotFound), ); - // Delete the object again and verify it is not found + // Delete nonexistent object -> ObjectNotFound delete_object_result = util::handle_msg(&delete_object_fast_msg, &pool, &metrics, &log); @@ -737,52 +942,7 @@ fn verify_rpc_handlers() { BucketsMdapiWrappedError::new(BucketsMdapiError::ObjectNotFound), ); - // List buckets and confirm none are found - - let list_buckets_payload = bucket::list::ListBucketsPayload { - owner: owner_id, - vnode: 0, - prefix: Some("testbucket".into()), - limit: 1000, - marker: None, - request_id, - }; - - let list_buckets_json = - serde_json::to_value(vec![list_buckets_payload]).unwrap(); - let list_buckets_fast_msg_data = - FastMessageData::new("listbuckets".into(), list_buckets_json); - let list_buckets_fast_msg = - FastMessage::data(msg_id, list_buckets_fast_msg_data); - let mut list_buckets_result = - util::handle_msg(&list_buckets_fast_msg, &pool, &metrics, &log); - - assert!(list_buckets_result.is_ok()); - let list_buckets_response = list_buckets_result.unwrap(); - assert_eq!(list_buckets_response.len(), 0); - - // Create a bucket and list buckets again - create_bucket_result = - util::handle_msg(&create_bucket_fast_msg, &pool, &metrics, &log); - - assert!(create_bucket_result.is_ok()); - let create_bucket_response = create_bucket_result.unwrap(); - assert_eq!(create_bucket_response.len(), 1); - - let create_bucket_response_result: Result = - serde_json::from_value(create_bucket_response[0].data.d[0].clone()); - assert!(create_bucket_response_result.is_ok()); - assert_eq!(create_bucket_response_result.unwrap().name, bucket); - - list_buckets_result = - util::handle_msg(&list_buckets_fast_msg, &pool, &metrics, &log); - - assert!(list_buckets_result.is_ok()); - let list_buckets_response = list_buckets_result.unwrap(); - assert_eq!(list_buckets_response.len(), 1); - - // List objects and confirm none are found - + // List objects (empty) let list_objects_payload = object::list::ListObjectsPayload { owner: owner_id, bucket_id, @@ -806,7 +966,7 @@ fn verify_rpc_handlers() { let list_objects_response = list_objects_result.unwrap(); assert_eq!(list_objects_response.len(), 0); - // Create an object and list objects again + // Create object and list again -> 1 let create_object_result = util::handle_msg(&create_object_fast_msg, &pool, &metrics, &log); @@ -825,10 +985,597 @@ fn verify_rpc_handlers() { assert!(list_objects_result.is_ok()); let list_objects_response = list_objects_result.unwrap(); assert_eq!(list_objects_response.len(), 1); +} - // Exercise the garbage collection functions +//////////////////////////////////////////////////////////// +// Batch update tests +//////////////////////////////////////////////////////////// - // First request a batch of garbage +#[test] +fn verify_batch_update_handlers() { + setup_test_env!(pool, metrics, log); + let msg_id: u32 = 0x1; + let owner_id = Uuid::new_v4(); + let bucket_id = Uuid::new_v4(); + let object: String = "testobject".into(); + let object_id = Uuid::new_v4(); + let request_id = Uuid::new_v4(); + + // Setup: create a bucket and an object + create_test_bucket!( + msg_id, + owner_id, + "testbucket", + 0, + request_id, + pool, + metrics, + log + ); + create_test_object!( + msg_id, owner_id, bucket_id, &object, object_id, 1, request_id, pool, + metrics, log + ); + + // Sharks used for batch update payloads + let batch_sharks = vec![ + object::StorageNodeIdentifier { + datacenter: "us-west-1".into(), + manta_storage_id: "2.stor.us-west.joyent.com".into(), + }, + object::StorageNodeIdentifier { + datacenter: "us-west-2".into(), + manta_storage_id: "4.stor.us-west.joyent.com".into(), + }, + ]; + + // Batch update nonexistent object + let nonexistent_id = Uuid::new_v4(); + let request_id = Uuid::new_v4(); + let batch_update_payload = + object::batch_update::BatchUpdateObjectsPayload { + objects: vec![object::update::UpdateObjectPayload { + owner: owner_id, + bucket_id, + name: "no_such_object".into(), + id: nonexistent_id, + vnode: 1, + content_type: "text/plain".into(), + headers: HashMap::new(), + sharks: Some(batch_sharks.clone()), + properties: None, + request_id, + conditions: Default::default(), + }], + request_id, + }; + + let batch_update_json = + serde_json::to_value(vec![&batch_update_payload]).unwrap(); + let batch_update_fast_msg_data = + FastMessageData::new("batchupdateobjects".into(), batch_update_json); + let batch_update_fast_msg = + FastMessage::data(msg_id, batch_update_fast_msg_data); + let batch_update_result = + util::handle_msg(&batch_update_fast_msg, &pool, &metrics, &log); + + assert!(batch_update_result.is_ok()); + let batch_update_response = batch_update_result.unwrap(); + assert_eq!(batch_update_response.len(), 1); + + let batch_resp: object::batch_update::BatchUpdateObjectsResponse = + serde_json::from_value(batch_update_response[0].data.d[0].clone()) + .unwrap(); + assert!(!batch_resp.is_success()); + assert_eq!(batch_resp.failed_vnodes.len(), 1); + assert_eq!(batch_resp.failed_count(), 1); + + // Batch update happy path + let request_id = Uuid::new_v4(); + let mut batch_headers = HashMap::new(); + let _ = batch_headers + .insert("m-batch-header".to_string(), Some("batchval".to_string())); + + let batch_update_payload = + object::batch_update::BatchUpdateObjectsPayload { + objects: vec![object::update::UpdateObjectPayload { + owner: owner_id, + bucket_id, + name: object.clone(), + id: object_id, + vnode: 1, + content_type: "application/json".into(), + headers: batch_headers, + sharks: Some(batch_sharks.clone()), + properties: None, + request_id, + conditions: Default::default(), + }], + request_id, + }; + + let batch_update_json = + serde_json::to_value(vec![&batch_update_payload]).unwrap(); + let batch_update_fast_msg_data = + FastMessageData::new("batchupdateobjects".into(), batch_update_json); + let batch_update_fast_msg = + FastMessage::data(msg_id, batch_update_fast_msg_data); + let batch_update_result = + util::handle_msg(&batch_update_fast_msg, &pool, &metrics, &log); + + assert!(batch_update_result.is_ok()); + let batch_update_response = batch_update_result.unwrap(); + assert_eq!(batch_update_response.len(), 1); + + let batch_resp: object::batch_update::BatchUpdateObjectsResponse = + serde_json::from_value(batch_update_response[0].data.d[0].clone()) + .unwrap(); + assert!(batch_resp.is_success()); + + // Verify persistence via getobject + let request_id = Uuid::new_v4(); + let conditions: conditional::Conditions = Default::default(); + let get_object_payload = object::GetObjectPayload { + owner: owner_id, + bucket_id, + name: object.clone(), + vnode: 1, + request_id, + conditions, + }; + + let get_object_json = + serde_json::to_value(vec![&get_object_payload]).unwrap(); + let get_object_fast_msg_data = + FastMessageData::new("getobject".into(), get_object_json); + let get_object_fast_msg = + FastMessage::data(msg_id, get_object_fast_msg_data); + let get_object_result = + util::handle_msg(&get_object_fast_msg, &pool, &metrics, &log); + + assert!(get_object_result.is_ok()); + let get_object_response = get_object_result.unwrap(); + assert_eq!(get_object_response.len(), 1); + + let get_object_response_result: Result = + serde_json::from_value(get_object_response[0].data.d[0].clone()); + assert!(get_object_response_result.is_ok()); + let get_object_after_batch = get_object_response_result.unwrap(); + assert_eq!(get_object_after_batch.name, object); + assert_eq!(get_object_after_batch.content_type, "application/json"); + // Verify sharks were persisted by the batch update + assert_eq!(get_object_after_batch.sharks, batch_sharks); + + // Batch update with correct if-match ETag + let request_id = Uuid::new_v4(); + let conditions = serde_json::from_value::( + json!({ "if-match": [ object_id.to_string() ] }), + ) + .unwrap(); + + let batch_update_payload = + object::batch_update::BatchUpdateObjectsPayload { + objects: vec![object::update::UpdateObjectPayload { + owner: owner_id, + bucket_id, + name: object.clone(), + id: object_id, + vnode: 1, + content_type: "text/csv".into(), + headers: HashMap::new(), + sharks: Some(batch_sharks.clone()), + properties: None, + request_id, + conditions, + }], + request_id, + }; + + let batch_update_json = + serde_json::to_value(vec![&batch_update_payload]).unwrap(); + let batch_update_fast_msg_data = + FastMessageData::new("batchupdateobjects".into(), batch_update_json); + let batch_update_fast_msg = + FastMessage::data(msg_id, batch_update_fast_msg_data); + let batch_update_result = + util::handle_msg(&batch_update_fast_msg, &pool, &metrics, &log); + + assert!(batch_update_result.is_ok()); + let batch_update_response = batch_update_result.unwrap(); + assert_eq!(batch_update_response.len(), 1); + + let batch_resp: object::batch_update::BatchUpdateObjectsResponse = + serde_json::from_value(batch_update_response[0].data.d[0].clone()) + .unwrap(); + assert!(batch_resp.is_success()); + + // Batch update with wrong if-match ETag + let request_id = Uuid::new_v4(); + let wrong_etag = Uuid::new_v4(); + let conditions = serde_json::from_value::(json!({ + "if-match": [ wrong_etag.to_string() ] + })) + .unwrap(); + + let batch_update_payload = + object::batch_update::BatchUpdateObjectsPayload { + objects: vec![object::update::UpdateObjectPayload { + owner: owner_id, + bucket_id, + name: object.clone(), + id: object_id, + vnode: 1, + content_type: "text/xml".into(), + headers: HashMap::new(), + sharks: Some(batch_sharks.clone()), + properties: None, + request_id, + conditions, + }], + request_id, + }; + + let batch_update_json = + serde_json::to_value(vec![&batch_update_payload]).unwrap(); + let batch_update_fast_msg_data = + FastMessageData::new("batchupdateobjects".into(), batch_update_json); + let batch_update_fast_msg = + FastMessage::data(msg_id, batch_update_fast_msg_data); + let batch_update_result = + util::handle_msg(&batch_update_fast_msg, &pool, &metrics, &log); + + assert!(batch_update_result.is_ok()); + let batch_update_response = batch_update_result.unwrap(); + assert_eq!(batch_update_response.len(), 1); + + let batch_resp: object::batch_update::BatchUpdateObjectsResponse = + serde_json::from_value(batch_update_response[0].data.d[0].clone()) + .unwrap(); + assert!(!batch_resp.is_success()); + assert_eq!(batch_resp.failed_vnodes.len(), 1); + assert_eq!(batch_resp.failed_count(), 1); + + // Multi-object batch update (same vnode) + // Create a second object on vnode 1 + let object_b: String = "testobject_b".into(); + let object_b_id = Uuid::new_v4(); + let request_id = Uuid::new_v4(); + create_test_object!( + msg_id, + owner_id, + bucket_id, + &object_b, + object_b_id, + 1, + request_id, + pool, + metrics, + log + ); + + let request_id = Uuid::new_v4(); + let batch_update_payload = + object::batch_update::BatchUpdateObjectsPayload { + objects: vec![ + object::update::UpdateObjectPayload { + owner: owner_id, + bucket_id, + name: object.clone(), + id: object_id, + vnode: 1, + content_type: "multi/obj-a".into(), + headers: HashMap::new(), + sharks: Some(batch_sharks.clone()), + properties: None, + request_id, + conditions: Default::default(), + }, + object::update::UpdateObjectPayload { + owner: owner_id, + bucket_id, + name: object_b.clone(), + id: object_b_id, + vnode: 1, + content_type: "multi/obj-b".into(), + headers: HashMap::new(), + sharks: Some(batch_sharks.clone()), + properties: None, + request_id, + conditions: Default::default(), + }, + ], + request_id, + }; + + let batch_update_json = + serde_json::to_value(vec![&batch_update_payload]).unwrap(); + let batch_update_fast_msg_data = + FastMessageData::new("batchupdateobjects".into(), batch_update_json); + let batch_update_fast_msg = + FastMessage::data(msg_id, batch_update_fast_msg_data); + let batch_update_result = + util::handle_msg(&batch_update_fast_msg, &pool, &metrics, &log); + + assert!(batch_update_result.is_ok()); + let batch_update_response = batch_update_result.unwrap(); + assert_eq!(batch_update_response.len(), 1); + + let batch_resp: object::batch_update::BatchUpdateObjectsResponse = + serde_json::from_value(batch_update_response[0].data.d[0].clone()) + .unwrap(); + assert!(batch_resp.is_success()); + + // Multi-vnode batch update + // Create a third object on vnode 0 (different vnode) + let object_c: String = "testobject_c".into(); + let object_c_id = Uuid::new_v4(); + let request_id = Uuid::new_v4(); + create_test_object!( + msg_id, + owner_id, + bucket_id, + &object_c, + object_c_id, + 0, + request_id, + pool, + metrics, + log + ); + + let request_id = Uuid::new_v4(); + let batch_update_payload = + object::batch_update::BatchUpdateObjectsPayload { + objects: vec![ + object::update::UpdateObjectPayload { + owner: owner_id, + bucket_id, + name: object.clone(), + id: object_id, + vnode: 1, + content_type: "vnode1/updated".into(), + headers: HashMap::new(), + sharks: Some(batch_sharks.clone()), + properties: None, + request_id, + conditions: Default::default(), + }, + object::update::UpdateObjectPayload { + owner: owner_id, + bucket_id, + name: object_c.clone(), + id: object_c_id, + vnode: 0, + content_type: "vnode0/updated".into(), + headers: HashMap::new(), + sharks: Some(batch_sharks.clone()), + properties: None, + request_id, + conditions: Default::default(), + }, + ], + request_id, + }; + + let batch_update_json = + serde_json::to_value(vec![&batch_update_payload]).unwrap(); + let batch_update_fast_msg_data = + FastMessageData::new("batchupdateobjects".into(), batch_update_json); + let batch_update_fast_msg = + FastMessage::data(msg_id, batch_update_fast_msg_data); + let batch_update_result = + util::handle_msg(&batch_update_fast_msg, &pool, &metrics, &log); + + assert!(batch_update_result.is_ok()); + let batch_update_response = batch_update_result.unwrap(); + assert_eq!(batch_update_response.len(), 1); + + let batch_resp: object::batch_update::BatchUpdateObjectsResponse = + serde_json::from_value(batch_update_response[0].data.d[0].clone()) + .unwrap(); + assert!(batch_resp.is_success()); + + // Cross-vnode partial failure: vnode 1 succeeds, + // vnode 0 fails (nonexistent object) + let request_id = Uuid::new_v4(); + let batch_update_payload = + object::batch_update::BatchUpdateObjectsPayload { + objects: vec![ + object::update::UpdateObjectPayload { + owner: owner_id, + bucket_id, + name: object.clone(), + id: object_id, + vnode: 1, + content_type: "should/succeed".into(), + headers: HashMap::new(), + sharks: Some(batch_sharks.clone()), + properties: None, + request_id, + conditions: Default::default(), + }, + object::update::UpdateObjectPayload { + owner: owner_id, + bucket_id, + name: "no_such_object_on_v0".into(), + id: Uuid::new_v4(), + vnode: 0, + content_type: "should/fail".into(), + headers: HashMap::new(), + sharks: Some(batch_sharks.clone()), + properties: None, + request_id, + conditions: Default::default(), + }, + ], + request_id, + }; + + let batch_update_json = + serde_json::to_value(vec![&batch_update_payload]).unwrap(); + let batch_update_fast_msg_data = + FastMessageData::new("batchupdateobjects".into(), batch_update_json); + let batch_update_fast_msg = + FastMessage::data(msg_id, batch_update_fast_msg_data); + let batch_update_result = + util::handle_msg(&batch_update_fast_msg, &pool, &metrics, &log); + + assert!(batch_update_result.is_ok()); + let batch_update_response = batch_update_result.unwrap(); + assert_eq!(batch_update_response.len(), 1); + + let batch_resp: object::batch_update::BatchUpdateObjectsResponse = + serde_json::from_value(batch_update_response[0].data.d[0].clone()) + .unwrap(); + assert!(!batch_resp.is_success()); + // Vnode 0 failed as an atomic unit + assert_eq!(batch_resp.failed_vnodes.len(), 1); + assert_eq!(batch_resp.failed_vnodes[0].vnode, 0); + assert_eq!(batch_resp.failed_vnodes[0].objects.len(), 1); +} + +//////////////////////////////////////////////////////////// +// Randomized batch update test +//////////////////////////////////////////////////////////// + +#[test] +fn verify_batch_update_random_workload() { + setup_test_env!(pool, metrics, log); + let msg_id: u32 = 0x1; + let owner_id = Uuid::new_v4(); + let bucket_id = Uuid::new_v4(); + let request_id = Uuid::new_v4(); + + // Create a bucket on each vnode + create_test_bucket!( + msg_id, + owner_id, + "randombucket", + 0, + request_id, + pool, + metrics, + log + ); + + // Generate 10 objects spread across vnodes 0 and 1. + // Uuid::new_v4() provides randomness for unique names. + let obj_count = 10; + + struct TestObj { + name: String, + id: Uuid, + vnode: u64, + } + + let mut objects: Vec = Vec::new(); + for i in 0..obj_count { + let name = format!("rnd_obj_{}", Uuid::new_v4()); + let id = Uuid::new_v4(); + let vnode = if i % 2 == 0 { 0 } else { 1 }; + objects.push(TestObj { name, id, vnode }); + } + + // Create all objects in the database + for obj in &objects { + create_test_object!( + msg_id, owner_id, bucket_id, &obj.name, obj.id, obj.vnode, + request_id, pool, metrics, log + ); + } + + // Build a batch update with unique content types + let request_id = Uuid::new_v4(); + let update_payloads: Vec = objects + .iter() + .enumerate() + .map(|(i, obj)| { + let content_type = format!("random/type-{}", i); + let mut headers = HashMap::new(); + let _ = headers.insert( + format!("m-hdr-{}", Uuid::new_v4()), + Some(format!("val-{}", Uuid::new_v4())), + ); + let sharks = vec![object::StorageNodeIdentifier { + datacenter: format!("dc-{}", i), + manta_storage_id: format!("{}.stor.dc-{}.joyent.com", i, i), + }]; + object::update::UpdateObjectPayload { + owner: owner_id, + bucket_id, + name: obj.name.clone(), + id: obj.id, + vnode: obj.vnode, + content_type, + headers, + sharks: Some(sharks), + properties: None, + request_id, + conditions: Default::default(), + } + }) + .collect(); + + let batch_update_payload = + object::batch_update::BatchUpdateObjectsPayload { + objects: update_payloads, + request_id, + }; + + let batch_update_json = + serde_json::to_value(vec![&batch_update_payload]).unwrap(); + let batch_update_fast_msg_data = + FastMessageData::new("batchupdateobjects".into(), batch_update_json); + let batch_update_fast_msg = + FastMessage::data(msg_id, batch_update_fast_msg_data); + let batch_update_result = + util::handle_msg(&batch_update_fast_msg, &pool, &metrics, &log); + + assert!(batch_update_result.is_ok()); + let batch_update_response = batch_update_result.unwrap(); + assert_eq!(batch_update_response.len(), 1); + + let batch_resp: object::batch_update::BatchUpdateObjectsResponse = + serde_json::from_value(batch_update_response[0].data.d[0].clone()) + .unwrap(); + assert!(batch_resp.is_success()); +} + +//////////////////////////////////////////////////////////// +// Garbage collection tests +//////////////////////////////////////////////////////////// + +#[test] +fn verify_gc_handlers() { + setup_test_env!(pool, metrics, log); + let msg_id: u32 = 0x1; + let owner_id = Uuid::new_v4(); + let bucket_id = Uuid::new_v4(); + let object: String = "testobject".into(); + let object_id = Uuid::new_v4(); + let request_id = Uuid::new_v4(); + + // Setup: create bucket, create object, then delete it + // to produce garbage records. + create_test_bucket!( + msg_id, + owner_id, + "testbucket", + 0, + request_id, + pool, + metrics, + log + ); + create_test_object!( + msg_id, owner_id, bucket_id, &object, object_id, 1, request_id, pool, + metrics, log + ); + delete_test_object!( + msg_id, owner_id, bucket_id, &object, 1, request_id, pool, metrics, log + ); + + // Get garbage batch let request_id = Uuid::new_v4(); let mut get_garbage_payload = gc::get::GetGarbagePayload { request_id }; @@ -857,9 +1604,7 @@ fn verify_rpc_handlers() { let batch_id = get_garbage_unwrapped_result.batch_id.unwrap(); - // Request the batch again and verify that the reported batch id matches the - // one from the previous request. - + // Get garbage batch again -> same batch_id let request_id = Uuid::new_v4(); get_garbage_payload = gc::get::GetGarbagePayload { request_id }; @@ -885,9 +1630,7 @@ fn verify_rpc_handlers() { assert_eq!(batch_id, get_garbage_unwrapped_result.batch_id.unwrap()); - // Indicate that the batch of garbage is processed and request for it to be - // deleted, but use a batch_id that does not match the id of the current - // batch. + // Delete garbage with wrong batch_id let mut delete_garbage_payload = gc::delete::DeleteGarbagePayload { batch_id: Uuid::new_v4(), request_id, @@ -911,10 +1654,7 @@ fn verify_rpc_handlers() { let mut delete_garbage_response = delete_garbage_response_result.unwrap(); assert_eq!(&delete_garbage_response, "ok"); - // Now indicate that the batch of garbage is processed and request for it to - // be deleted. This also verifies the logic that ensures the previous - // request to delete the gc batch using an invalid batch id does not - // actually result in the garbage batch being removed. + // Delete garbage with correct batch_id delete_garbage_payload = gc::delete::DeleteGarbagePayload { batch_id, request_id, @@ -929,20 +1669,16 @@ fn verify_rpc_handlers() { util::handle_msg(&delete_garbage_fast_msg, &pool, &metrics, &log); assert!(delete_garbage_result.is_ok()); - // let mut delete_garbage_responses = delete_garbage_result.unwrap(); delete_garbage_responses = delete_garbage_result.unwrap(); assert_eq!(delete_garbage_responses.len(), 1); - // let mut delete_garbage_response_result: Result = - // serde_json::from_value(delete_garbage_response[0].data.d[0].clone()); delete_garbage_response_result = serde_json::from_value(delete_garbage_responses[0].data.d[0].clone()); assert!(delete_garbage_response_result.is_ok()); delete_garbage_response = delete_garbage_response_result.unwrap(); assert_eq!(&delete_garbage_response, "ok"); - // Request another batch of garbage and this time it should return an empty - // list and a NULL batch_id + // Get garbage batch -> empty get_garbage_result = util::handle_msg(&get_garbage_fast_msg, &pool, &metrics, &log); @@ -953,9 +1689,608 @@ fn verify_rpc_handlers() { let get_garbage_response_result: Result = serde_json::from_value(get_garbage_response[0].data.d[0].clone()); - println!("ggr: {:?}", get_garbage_response); assert!(get_garbage_response_result.is_ok()); get_garbage_unwrapped_result = get_garbage_response_result.unwrap(); assert!(get_garbage_unwrapped_result.batch_id.is_none()); assert!(get_garbage_unwrapped_result.garbage.is_empty()); } + +//////////////////////////////////////////////////////////// +// Discovery RPC tests +//////////////////////////////////////////////////////////// + +// Helper: call listvnodes and return the parsed response +fn do_listvnodes( + msg_id: u32, + pool: &ConnectionPool< + PostgresConnection, + impl cueball::resolver::Resolver, + impl FnMut(&cueball::backend::Backend) -> PostgresConnection + + Send + + 'static, + >, + metrics: &RegisteredMetrics, + log: &Logger, +) -> discovery::ListVnodesResponse { + let payload = discovery::ListVnodesPayload { + request_id: Uuid::new_v4(), + }; + let json = serde_json::to_value(vec![&payload]).unwrap(); + let msg_data = FastMessageData::new("listvnodes".into(), json); + let msg = FastMessage::data(msg_id, msg_data); + let result = util::handle_msg(&msg, pool, metrics, log); + assert!(result.is_ok()); + let response = result.unwrap(); + assert_eq!(response.len(), 1); + serde_json::from_value(response[0].data.d[0].clone()) + .expect("failed to parse ListVnodesResponse") +} + +// Helper: call listowners for a vnode and return the parsed response +fn do_listowners( + msg_id: u32, + vnode: u64, + pool: &ConnectionPool< + PostgresConnection, + impl cueball::resolver::Resolver, + impl FnMut(&cueball::backend::Backend) -> PostgresConnection + + Send + + 'static, + >, + metrics: &RegisteredMetrics, + log: &Logger, +) -> discovery::ListOwnersResponse { + let payload = discovery::ListOwnersPayload { + vnode, + request_id: Uuid::new_v4(), + }; + let json = serde_json::to_value(vec![&payload]).unwrap(); + let msg_data = FastMessageData::new("listowners".into(), json); + let msg = FastMessage::data(msg_id, msg_data); + let result = util::handle_msg(&msg, pool, metrics, log); + assert!(result.is_ok()); + let response = result.unwrap(); + assert_eq!(response.len(), 1); + serde_json::from_value(response[0].data.d[0].clone()) + .expect("failed to parse ListOwnersResponse") +} + +#[test] +fn verify_discovery_handlers() { + setup_test_env!(pool, metrics, log); + let msg_id: u32 = 0x1; + + // listvnodes: the test harness creates schemas for vnodes 0 and 1 + let vnodes_resp = do_listvnodes(msg_id, &pool, &metrics, &log); + assert!( + vnodes_resp.vnodes.contains(&0), + "expected vnode 0 in response" + ); + assert!( + vnodes_resp.vnodes.contains(&1), + "expected vnode 1 in response" + ); + + // listowners on both vnodes before any data -> empty + for vnode in &[0u64, 1] { + let owners_resp = do_listowners(msg_id, *vnode, &pool, &metrics, &log); + assert!( + owners_resp.owners.is_empty(), + "expected empty owners on vnode {} with no buckets", + vnode + ); + } + + // Create buckets for 3 random owners spread across both vnodes. + // owner_a: buckets on vnode 0 only + // owner_b: buckets on vnode 1 only + // owner_c: buckets on both vnodes + let owner_a = Uuid::new_v4(); + let owner_b = Uuid::new_v4(); + let owner_c = Uuid::new_v4(); + + // Use random bucket names to avoid collisions + let bucket_a0 = format!("disc_{}", Uuid::new_v4()); + let bucket_b1 = format!("disc_{}", Uuid::new_v4()); + let bucket_c0 = format!("disc_{}", Uuid::new_v4()); + let bucket_c1 = format!("disc_{}", Uuid::new_v4()); + + create_test_bucket!( + msg_id, + owner_a, + bucket_a0.as_str(), + 0, + Uuid::new_v4(), + pool, + metrics, + log + ); + create_test_bucket!( + msg_id, + owner_b, + bucket_b1.as_str(), + 1, + Uuid::new_v4(), + pool, + metrics, + log + ); + create_test_bucket!( + msg_id, + owner_c, + bucket_c0.as_str(), + 0, + Uuid::new_v4(), + pool, + metrics, + log + ); + create_test_bucket!( + msg_id, + owner_c, + bucket_c1.as_str(), + 1, + Uuid::new_v4(), + pool, + metrics, + log + ); + + // listowners on vnode 0: should contain owner_a and owner_c + let owners_v0 = do_listowners(msg_id, 0, &pool, &metrics, &log); + assert_eq!( + owners_v0.owners.len(), + 2, + "expected 2 distinct owners on vnode 0, got {:?}", + owners_v0.owners + ); + assert!(owners_v0.owners.contains(&owner_a)); + assert!(owners_v0.owners.contains(&owner_c)); + + // listowners on vnode 1: should contain owner_b and owner_c + let owners_v1 = do_listowners(msg_id, 1, &pool, &metrics, &log); + assert_eq!( + owners_v1.owners.len(), + 2, + "expected 2 distinct owners on vnode 1, got {:?}", + owners_v1.owners + ); + assert!(owners_v1.owners.contains(&owner_b)); + assert!(owners_v1.owners.contains(&owner_c)); + + // listvnodes should still return the same vnodes (data doesn't + // change the schema set) + let vnodes_resp = do_listvnodes(msg_id, &pool, &metrics, &log); + assert_eq!(vnodes_resp.vnodes, vec![0, 1]); +} + +#[test] +fn verify_listowners_invalid_vnode() { + setup_test_env!(pool, metrics, _log); + + // Use a Critical-level logger to suppress the expected ERRO line + // from the invalid-vnode error path, keeping test output clean. + let plain = slog_term::PlainSyncDecorator::new(std::io::stdout()); + let log = Logger::root( + Mutex::new(LevelFilter::new( + slog_term::FullFormat::new(plain).build(), + Level::Critical, + )) + .fuse(), + o!(), + ); + + let msg_id: u32 = 0x1; + + // Pick a vnode that definitely does not exist in the test harness + let bad_vnode: u64 = 99999; + + let payload = discovery::ListOwnersPayload { + vnode: bad_vnode, + request_id: Uuid::new_v4(), + }; + let json = serde_json::to_value(vec![&payload]).unwrap(); + let msg_data = FastMessageData::new("listowners".into(), json); + let msg = FastMessage::data(msg_id, msg_data); + let result = util::handle_msg(&msg, &pool, &metrics, &log); + assert!(result.is_ok()); + let response = result.unwrap(); + assert_eq!(response.len(), 1); + + // The response should be a wrapped PostgresError, not a + // successful ListOwnersResponse. + let err_result: Result = + serde_json::from_value(response[0].data.d[0].clone()); + assert!( + err_result.is_ok(), + "expected a BucketsMdapiWrappedError for invalid vnode" + ); + let wrapped = err_result.unwrap(); + assert_eq!(wrapped.error.name, "PostgresError"); + assert!( + wrapped.error.message.contains("does not exist"), + "expected 'does not exist' in error message, got: {}", + wrapped.error.message + ); +} + +//////////////////////////////////////////////////////////// +// Discovery concurrency and performance tests +//////////////////////////////////////////////////////////// + +/// Validate that multiple threads can perform discovery +/// RPCs simultaneously without errors or data corruption. +/// +/// Spawns 8 threads, each running 20 iterations of +/// listvnodes + listowners on both vnodes, contending on +/// a pool of 5 connections. +#[test] +fn verify_discovery_concurrent_reads() { + setup_test_env!(pool, metrics, log); + let msg_id: u32 = 0x1; + + // Seed data: 5 owners on both vnodes (10 buckets). + let mut expected_v0: HashSet = HashSet::new(); + let mut expected_v1: HashSet = HashSet::new(); + + for i in 0u32..5 { + let owner = Uuid::new_v4(); + let bkt_v0 = format!("conc_r_{}_v0_{}", i, Uuid::new_v4()); + let bkt_v1 = format!("conc_r_{}_v1_{}", i, Uuid::new_v4()); + + create_test_bucket!( + msg_id, + owner, + bkt_v0.as_str(), + 0, + Uuid::new_v4(), + pool, + metrics, + log + ); + create_test_bucket!( + msg_id, + owner, + bkt_v1.as_str(), + 1, + Uuid::new_v4(), + pool, + metrics, + log + ); + expected_v0.insert(owner); + expected_v1.insert(owner); + } + + let num_threads = 8; + let iterations = 20; + + let handles: Vec<_> = (0..num_threads) + .map(|t| { + let pool_c = pool.clone(); + let metrics_c = metrics.clone(); + let log_c = log.clone(); + let exp_v0 = expected_v0.clone(); + let exp_v1 = expected_v1.clone(); + + thread::spawn(move || { + for iter in 0..iterations { + let mid = ((t * iterations + iter) as u32) + 1; + + // listvnodes + let vr = do_listvnodes(mid, &pool_c, &metrics_c, &log_c); + assert!( + vr.vnodes.contains(&0) && vr.vnodes.contains(&1), + "thread {} iter {}: bad vnodes {:?}", + t, + iter, + vr.vnodes + ); + + // listowners vnode 0 + let o0 = do_listowners(mid, 0, &pool_c, &metrics_c, &log_c); + let set0: HashSet = o0.owners.into_iter().collect(); + assert_eq!( + set0, exp_v0, + "thread {} iter {}: vnode 0 owners", + t, iter + ); + + // listowners vnode 1 + let o1 = do_listowners(mid, 1, &pool_c, &metrics_c, &log_c); + let set1: HashSet = o1.owners.into_iter().collect(); + assert_eq!( + set1, exp_v1, + "thread {} iter {}: vnode 1 owners", + t, iter + ); + } + }) + }) + .collect(); + + for h in handles { + h.join().expect("reader thread panicked"); + } +} + +/// Validate discovery reads return consistent results +/// while concurrent bucket creation is happening. +/// +/// Reader threads assert monotonically non-decreasing +/// owner counts (no stale reads showing fewer owners +/// than a previous read). +/// +/// Invariant: for each vnode, |owners(t2)| >= |owners(t1)| +/// when t2 > t1 during concurrent inserts. +#[test] +fn verify_discovery_concurrent_reads_writes() { + setup_test_env!(pool, metrics, log); + + let done = Arc::new(AtomicBool::new(false)); + let num_readers = 4; + + // Spawn reader threads before writes begin. + let reader_handles: Vec<_> = (0..num_readers) + .map(|t| { + let pool_c = pool.clone(); + let metrics_c = metrics.clone(); + let log_c = log.clone(); + let done_c = done.clone(); + + thread::spawn(move || { + let mut prev_v0: usize = 0; + let mut prev_v1: usize = 0; + let mut reads: u64 = 0; + + while !done_c.load(Ordering::Relaxed) { + let mid = + ((t as u32) * 10000) + ((reads as u32) % 10000) + 1; + + // listvnodes: always [0, 1] + let vr = do_listvnodes(mid, &pool_c, &metrics_c, &log_c); + assert!( + vr.vnodes.contains(&0) && vr.vnodes.contains(&1), + "reader {}: bad vnodes {:?}", + t, + vr.vnodes + ); + + // listowners vnode 0: monotonic + let o0 = do_listowners(mid, 0, &pool_c, &metrics_c, &log_c); + let cur_v0 = o0.owners.len(); + assert!( + cur_v0 >= prev_v0, + "reader {}: vnode 0 owners shrank \ + from {} to {} (read #{})", + t, + prev_v0, + cur_v0, + reads + ); + prev_v0 = cur_v0; + + // listowners vnode 1: monotonic + let o1 = do_listowners(mid, 1, &pool_c, &metrics_c, &log_c); + let cur_v1 = o1.owners.len(); + assert!( + cur_v1 >= prev_v1, + "reader {}: vnode 1 owners shrank \ + from {} to {} (read #{})", + t, + prev_v1, + cur_v1, + reads + ); + prev_v1 = cur_v1; + + reads += 1; + } + }) + }) + .collect(); + + // Writer: create 10 buckets with distinct owners, + // alternating vnodes. + let msg_id: u32 = 0x1; + for i in 0u32..10 { + let owner = Uuid::new_v4(); + let vnode = (i % 2) as u64; + let bkt = format!("conc_rw_{}_{}", i, Uuid::new_v4()); + create_test_bucket!( + msg_id, + owner, + bkt.as_str(), + vnode, + Uuid::new_v4(), + pool, + metrics, + log + ); + } + + // Signal readers to stop. + done.store(true, Ordering::Relaxed); + + for h in reader_handles { + h.join().expect("reader thread panicked"); + } +} + +/// Test discovery correctness with a larger, randomized +/// dataset of owners distributed across vnodes. +/// +/// Generates 10-20 random owners, each with 1-3 buckets +/// on a random vnode. Validates that listowners returns +/// exactly the expected owner sets and their union covers +/// all owners. +#[test] +fn verify_discovery_random_owners_workload() { + setup_test_env!(pool, metrics, log); + let msg_id: u32 = 0x1; + + let mut rng = rand::thread_rng(); + let num_owners: usize = rng.gen_range(10, 21); + + let mut expected_v0: HashSet = HashSet::new(); + let mut expected_v1: HashSet = HashSet::new(); + let mut all_owners: HashSet = HashSet::new(); + + for i in 0..num_owners { + let owner = Uuid::new_v4(); + all_owners.insert(owner); + + let num_buckets: usize = rng.gen_range(1, 4); + for j in 0..num_buckets { + let vnode: u64 = rng.gen_range(0, 2); + let bkt = format!("rand_{}_{}_{}", i, j, Uuid::new_v4()); + create_test_bucket!( + msg_id, + owner, + bkt.as_str(), + vnode, + Uuid::new_v4(), + pool, + metrics, + log + ); + if vnode == 0 { + expected_v0.insert(owner); + } else { + expected_v1.insert(owner); + } + } + } + + // Verify listowners(0) + let o0 = do_listowners(msg_id, 0, &pool, &metrics, &log); + let actual_v0: HashSet = o0.owners.into_iter().collect(); + assert_eq!( + actual_v0, + expected_v0, + "vnode 0 owners mismatch: expected {} got {}", + expected_v0.len(), + actual_v0.len() + ); + + // Verify listowners(1) + let o1 = do_listowners(msg_id, 1, &pool, &metrics, &log); + let actual_v1: HashSet = o1.owners.into_iter().collect(); + assert_eq!( + actual_v1, + expected_v1, + "vnode 1 owners mismatch: expected {} got {}", + expected_v1.len(), + actual_v1.len() + ); + + // Union of both vnodes must cover all owners. + let union: HashSet = actual_v0.union(&actual_v1).cloned().collect(); + assert_eq!( + union, + all_owners, + "union of vnode owners ({}) != all owners ({})", + union.len(), + all_owners.len() + ); + + // listvnodes must still report [0, 1] + let vr = do_listvnodes(msg_id, &pool, &metrics, &log); + assert!( + vr.vnodes.contains(&0) && vr.vnodes.contains(&1), + "expected vnodes [0,1], got {:?}", + vr.vnodes + ); +} + +/// Smoke test that discovery RPCs complete within +/// reasonable time bounds, catching gross performance +/// regressions. +/// +/// Uses generous bounds (50ms average per RPC) to avoid +/// flaky CI. Real latency on ephemeral PG is 1-5ms. +#[test] +fn verify_discovery_performance() { + setup_test_env!(pool, metrics, log); + let msg_id: u32 = 0x1; + + // Seed 20 owners across both vnodes (40 buckets). + for i in 0u32..20 { + let owner = Uuid::new_v4(); + let bkt_v0 = format!("perf_{}_v0_{}", i, Uuid::new_v4()); + let bkt_v1 = format!("perf_{}_v1_{}", i, Uuid::new_v4()); + create_test_bucket!( + msg_id, + owner, + bkt_v0.as_str(), + 0, + Uuid::new_v4(), + pool, + metrics, + log + ); + create_test_bucket!( + msg_id, + owner, + bkt_v1.as_str(), + 1, + Uuid::new_v4(), + pool, + metrics, + log + ); + } + + // Warm up: prime the connection pool and PG caches. + let _ = do_listvnodes(msg_id, &pool, &metrics, &log); + let _ = do_listowners(msg_id, 0, &pool, &metrics, &log); + let _ = do_listowners(msg_id, 1, &pool, &metrics, &log); + + let iterations: u32 = 100; + let max_total = std::time::Duration::from_secs(5); + + // Benchmark listvnodes + let start = Instant::now(); + for i in 0..iterations { + let vr = do_listvnodes(i + 1, &pool, &metrics, &log); + assert!(vr.vnodes.contains(&0)); + } + let elapsed_vnodes = start.elapsed(); + assert!( + elapsed_vnodes < max_total, + "listvnodes: {} iterations took {:?} (> {:?})", + iterations, + elapsed_vnodes, + max_total + ); + + // Benchmark listowners(0) + let start = Instant::now(); + for i in 0..iterations { + let o0 = do_listowners(i + 1, 0, &pool, &metrics, &log); + assert_eq!(o0.owners.len(), 20); + } + let elapsed_owners_v0 = start.elapsed(); + assert!( + elapsed_owners_v0 < max_total, + "listowners(0): {} iterations took {:?} (> {:?})", + iterations, + elapsed_owners_v0, + max_total + ); + + // Benchmark listowners(1) + let start = Instant::now(); + for i in 0..iterations { + let o1 = do_listowners(i + 1, 1, &pool, &metrics, &log); + assert_eq!(o1.owners.len(), 20); + } + let elapsed_owners_v1 = start.elapsed(); + assert!( + elapsed_owners_v1 < max_total, + "listowners(1): {} iterations took {:?} (> {:?})", + iterations, + elapsed_owners_v1, + max_total + ); +}