Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions rust/s3heap/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,8 @@ pub struct Limits {
/// Maximum number of items to return.
/// If None, returns all items found within bucket limits.
pub max_items: Option<usize>,
/// Cut-off time: Do not read items after this cut-off.
pub time_cut_off: Option<DateTime<Utc>>,
}

impl Limits {
Expand Down Expand Up @@ -358,6 +360,26 @@ impl Limits {
self
}

/// Set the time cut-off for reading items.
///
/// Items scheduled after this time will not be returned.
///
/// # Arguments
/// * `time_cut_off` - The cut-off time
///
/// # Examples
///
/// ```
/// use s3heap::Limits;
/// use chrono::Utc;
///
/// let limits = Limits::default().with_time_cut_off(Utc::now());
/// ```
pub fn with_time_cut_off(mut self, time_cut_off: DateTime<Utc>) -> Self {
self.time_cut_off = Some(time_cut_off);
self
}

/// Get the maximum number of buckets to read.
///
/// Returns the configured limit or the default (1000) if not set.
Expand Down Expand Up @@ -904,6 +926,7 @@ impl HeapPruner {
/// * `limits` - Controls pruning limits:
/// - `.with_buckets(n)` - Maximum number of buckets to scan (default: 1000)
/// - `.with_items(n)` - Maximum number of items to process (default: unlimited)
/// - `.with_time_cut_off(t)` - Skip items scheduled after this time (default: no limit)
///
/// # Errors
///
Expand All @@ -924,13 +947,23 @@ impl HeapPruner {
///
/// // Stop after processing 1000 items total
/// pruner.prune(Limits::default().with_items(1000)).await?;
///
/// // Only prune tasks scheduled before a specific time
/// use chrono::{Utc, Duration};
/// let cutoff = Utc::now() - Duration::days(7);
/// pruner.prune(Limits::default().with_time_cut_off(cutoff)).await?;
/// ```
pub async fn prune(&self, limits: Limits) -> Result<PruneStats, Error> {
let buckets = self.internal.list_approx_first_1k_buckets().await?;
let mut total_stats = PruneStats::default();
let max_items = limits.max_items.unwrap_or(usize::MAX);

for bucket in buckets.into_iter().take(limits.max_buckets()) {
if let Some(time_cut_off) = limits.time_cut_off {
if bucket > time_cut_off {
break;
}
}
// Stop if we've processed enough items
let items_processed = total_stats.items_pruned + total_stats.items_retained;
if items_processed >= max_items {
Expand Down Expand Up @@ -1136,6 +1169,7 @@ impl HeapReader {
/// * `limits` - Controls scanning limits:
/// - `.with_buckets(n)` - Maximum number of buckets to scan (default: 1000)
/// - `.with_items(n)` - Maximum number of items to return (default: unlimited)
/// - `.with_time_cut_off(t)` - Skip items scheduled after this time (default: no limit)
///
/// # Errors
///
Expand Down Expand Up @@ -1169,6 +1203,14 @@ impl HeapReader {
/// |_| true,
/// Limits::default().with_buckets(5).with_items(10),
/// ).await?;
///
/// // Get tasks scheduled before a specific time
/// use chrono::{Utc, Duration};
/// let one_hour_from_now = Utc::now() + Duration::hours(1);
/// let upcoming = reader.peek(
/// |_| true,
/// Limits::default().with_time_cut_off(one_hour_from_now),
/// ).await?;
/// ```
pub async fn peek(
&self,
Expand All @@ -1181,6 +1223,11 @@ impl HeapReader {
let max_items = limits.max_items.unwrap_or(usize::MAX);

'outer: for bucket in buckets.into_iter().take(limits.max_buckets()) {
if let Some(time_cut_off) = limits.time_cut_off {
if bucket > time_cut_off {
break;
}
}
let (entries, _) = self.internal.load_bucket_or_empty(bucket).await?;
let triggerable_and_nonce = entries
.iter()
Expand Down
8 changes: 8 additions & 0 deletions rust/s3heap/tests/test_unit_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,18 +184,22 @@ fn limits_equality() {
let limits1 = Limits {
buckets_to_read: Some(100),
max_items: None,
time_cut_off: None,
};
let limits2 = Limits {
buckets_to_read: Some(100),
max_items: None,
time_cut_off: None,
};
let limits3 = Limits {
buckets_to_read: Some(200),
max_items: None,
time_cut_off: None,
};
let limits4 = Limits {
buckets_to_read: None,
max_items: None,
time_cut_off: None,
};

assert_eq!(limits1, limits2);
Expand All @@ -208,6 +212,7 @@ fn limits_copy() {
let original = Limits {
buckets_to_read: Some(500),
max_items: None,
time_cut_off: None,
};
let copied = original;
assert_eq!(original, copied);
Expand Down Expand Up @@ -348,6 +353,7 @@ async fn pruner_respects_limits() {
let limits = Limits {
buckets_to_read: Some(5),
max_items: None,
time_cut_off: None,
};

// Should respect the limit even if more buckets exist
Expand Down Expand Up @@ -418,6 +424,7 @@ async fn reader_respects_limits() {
let limits = Limits {
buckets_to_read: Some(3),
max_items: None,
time_cut_off: None,
};

// Should respect the bucket limit
Expand Down Expand Up @@ -455,6 +462,7 @@ fn limits_with_max_value() {
let limits = Limits {
buckets_to_read: Some(usize::MAX),
max_items: None,
time_cut_off: None,
};
assert_eq!(limits.buckets_to_read, Some(usize::MAX));
}
Expand Down
Loading