Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 111 additions & 16 deletions crates/common/src/auction/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,23 @@ use error_stack::{Report, ResultExt};
use fastly::http::request::{select, PendingRequest};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant};

use crate::error::TrustedServerError;

use super::config::AuctionConfig;
use super::provider::AuctionProvider;
use super::types::{AuctionContext, AuctionRequest, AuctionResponse, Bid, BidStatus};

/// Compute the remaining time budget from a deadline.
///
/// Returns the number of milliseconds left before `timeout_ms` is exceeded,
/// measured from `start`. Returns `0` when the deadline has already passed.
#[inline]
fn remaining_budget_ms(start: Instant, timeout_ms: u32) -> u32 {
timeout_ms.saturating_sub(start.elapsed().as_millis() as u32)
}

/// Manages auction execution across multiple providers.
pub struct AuctionOrchestrator {
config: AuctionConfig,
Expand Down Expand Up @@ -93,6 +102,7 @@ impl AuctionOrchestrator {
request: &AuctionRequest,
context: &AuctionContext<'_>,
) -> Result<OrchestrationResult, Report<TrustedServerError>> {
let mediation_start = Instant::now();
let provider_responses = self.run_providers_parallel(request, context).await?;

let floor_prices = self.floor_prices_by_slot(request);
Expand All @@ -105,11 +115,30 @@ impl AuctionOrchestrator {
mediator.provider_name()
);

// Create a context with provider responses for the mediator
// Give the mediator only the remaining time from the auction
// deadline, not the full timeout — the bidding phase already
// consumed part of it.
let remaining_ms = remaining_budget_ms(mediation_start, context.timeout_ms);

if remaining_ms == 0 {
log::warn!(
"Auction timeout ({}ms) exhausted during bidding phase — skipping mediator",
context.timeout_ms
);
let winning = self.select_winning_bids(&provider_responses, &floor_prices);
return Ok(OrchestrationResult {
provider_responses,
mediator_response: None,
winning_bids: winning,
total_time_ms: 0,
metadata: HashMap::new(),
});
}

let mediator_context = AuctionContext {
settings: context.settings,
request: context.request,
timeout_ms: context.timeout_ms,
timeout_ms: remaining_ms,
provider_responses: Some(&provider_responses),
};

Expand Down Expand Up @@ -211,6 +240,9 @@ impl AuctionOrchestrator {
provider_names.len()
);

// Track auction start time for deadline enforcement
let auction_start = Instant::now();

// Phase 1: Launch all requests concurrently and build mapping
// Maps backend_name -> (provider_name, start_time, provider)
let mut backend_to_provider: HashMap<String, (&str, Instant, &dyn AuctionProvider)> =
Expand Down Expand Up @@ -246,14 +278,26 @@ impl AuctionOrchestrator {
}
};

// Give each provider only the remaining time from the auction
// deadline so that its backend first_byte_timeout doesn't extend
// past the overall budget.
let remaining_ms = remaining_budget_ms(auction_start, context.timeout_ms);
let provider_context = AuctionContext {
settings: context.settings,
request: context.request,
timeout_ms: remaining_ms,
provider_responses: context.provider_responses,
};

log::info!(
"Launching bid request to: {} (backend: {})",
"Launching bid request to: {} (backend: {}, budget: {}ms)",
provider.provider_name(),
backend_name
backend_name,
remaining_ms
);

let start_time = Instant::now();
match provider.request_bids(request, context) {
match provider.request_bids(request, &provider_context) {
Ok(pending) => {
backend_to_provider.insert(
backend_name,
Expand All @@ -275,12 +319,16 @@ impl AuctionOrchestrator {
}
}

let deadline = Duration::from_millis(u64::from(context.timeout_ms));
log::info!(
"Launched {} concurrent requests, waiting for responses using select...",
pending_requests.len()
"Launched {} concurrent requests, waiting for responses (timeout: {}ms)...",
pending_requests.len(),
context.timeout_ms
);

// Phase 2: Wait for responses using select() to process as they become ready
// Phase 2: Wait for responses using select() to process as they become ready.
// Enforce the auction deadline: after each select() returns, check
// elapsed time and drop remaining requests if the timeout is exceeded.
let mut responses = Vec::new();
let mut remaining = pending_requests;

Expand Down Expand Up @@ -332,6 +380,18 @@ impl AuctionOrchestrator {
log::warn!("A provider request failed: {:?}", e);
}
}

// Check auction deadline after processing each response.
// Remaining PendingRequests are dropped, which abandons the
// in-flight HTTP calls on the Fastly host.
if auction_start.elapsed() >= deadline && !remaining.is_empty() {
log::warn!(
"Auction timeout ({}ms) reached, dropping {} remaining request(s)",
context.timeout_ms,
remaining.len()
);
break;
}
}

Ok(responses)
Expand Down Expand Up @@ -628,14 +688,14 @@ mod tests {
);
}

// TODO: Re-enable these tests after implementing mock provider support for send_async()
// Mock providers currently don't work with concurrent requests because they can't
// create PendingRequest without real backends configured in Fastly.
// TODO: Re-enable provider integration tests after implementing mock support
// for send_async(). Mock providers can't create PendingRequest without real
// Fastly backends.
//
// Options to fix:
// 1. Configure dummy backends in fastly.toml for testing
// 2. Refactor mock providers to use a different pattern
// 3. Create a test-only mock backend server
// Untested timeout enforcement paths (require real backends):
// - Deadline check in select() loop (drops remaining requests)
// - Mediator skip when remaining_ms == 0 (bidding exhausts budget)
// - Provider context receives reduced timeout_ms per remaining budget

#[tokio::test]
async fn test_no_providers_configured() {
Expand Down Expand Up @@ -679,6 +739,41 @@ mod tests {
assert!(!orchestrator.is_enabled());
}

#[test]
fn remaining_budget_returns_full_timeout_immediately() {
let start = std::time::Instant::now();
let result = super::remaining_budget_ms(start, 2000);
// Should be very close to 2000 (allow a few ms for test execution)
assert!(
result >= 1990,
"should return ~full timeout immediately, got {result}"
);
}

#[test]
fn remaining_budget_saturates_at_zero() {
// Create an instant in the past by sleeping briefly with a tiny timeout
let start = std::time::Instant::now();
// Use a timeout of 0 — elapsed will always exceed it
let result = super::remaining_budget_ms(start, 0);
assert_eq!(result, 0, "should return 0 when timeout is 0");
}

#[test]
fn remaining_budget_decreases_over_time() {
let start = std::time::Instant::now();
std::thread::sleep(std::time::Duration::from_millis(50));
let result = super::remaining_budget_ms(start, 2000);
assert!(
result < 2000,
"should be less than full timeout after sleeping"
);
assert!(
result > 1900,
"should still have most of the budget, got {result}"
);
}

#[test]
fn test_apply_floor_prices_allows_none_prices_for_encoded_bids() {
// Test that bids with None prices (APS-style) pass through floor pricing
Expand Down
5 changes: 3 additions & 2 deletions crates/common/src/auction/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ pub trait AuctionProvider: Send + Sync {
///
/// This is used by the orchestrator to correlate responses with providers
/// when using `select()` to wait for multiple concurrent requests.
/// The backend name should match what `BackendConfig::from_url()` returns
/// for this provider's endpoint.
/// Implementations should use [`BackendConfig::backend_name_for_url()`] to
/// compute the name without registering a backend — the actual registration
/// happens in [`request_bids`](Self::request_bids).
fn backend_name(&self) -> Option<String> {
None
}
Expand Down
Loading
Loading