Skip to content

Commit 442e5e7

Browse files
fix: stabilize atomic publish e2e overlap detection
1 parent 149dda9 commit 442e5e7

1 file changed

Lines changed: 26 additions & 10 deletions

File tree

tests/e2e_search_index.rs

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -985,6 +985,7 @@ fn force_rebuild_preserves_search_results_and_reader_surface_during_atomic_publi
985985

986986
let stop = Arc::new(AtomicBool::new(false));
987987
let rebuild_running = Arc::new(AtomicBool::new(false));
988+
let search_in_flight = Arc::new(AtomicBool::new(false));
988989
let reader_attempts_during_rebuild = Arc::new(AtomicUsize::new(0));
989990
let search_attempts_during_rebuild = Arc::new(AtomicUsize::new(0));
990991
let (ready_tx, ready_rx) = mpsc::channel();
@@ -1012,6 +1013,7 @@ fn force_rebuild_preserves_search_results_and_reader_surface_during_atomic_publi
10121013
let search_ready_tx = ready_tx.clone();
10131014
let search_stop = Arc::clone(&stop);
10141015
let search_rebuild_running = Arc::clone(&rebuild_running);
1016+
let search_in_flight_thread = Arc::clone(&search_in_flight);
10151017
let search_overlap = Arc::clone(&search_attempts_during_rebuild);
10161018
let search_home = home.clone();
10171019
let search_codex_home = codex_home.clone();
@@ -1020,11 +1022,9 @@ fn force_rebuild_preserves_search_results_and_reader_surface_during_atomic_publi
10201022
let _ = search_ready_tx.send("search");
10211023
let mut stats = SearchLoopStats::default();
10221024
while !search_stop.load(Ordering::Relaxed) {
1023-
if search_rebuild_running.load(Ordering::Relaxed) {
1024-
search_overlap.fetch_add(1, Ordering::Relaxed);
1025-
}
1026-
10271025
let search_started = Instant::now();
1026+
search_in_flight_thread.store(true, Ordering::Relaxed);
1027+
let started_during_rebuild = search_rebuild_running.load(Ordering::Relaxed);
10281028
let output = cargo_bin_cmd!("cass")
10291029
.args([
10301030
"search",
@@ -1045,7 +1045,12 @@ fn force_rebuild_preserves_search_results_and_reader_surface_during_atomic_publi
10451045
.timeout(Duration::from_secs(20))
10461046
.output()
10471047
.expect("run concurrent cass search");
1048-
let elapsed_ms = search_started.elapsed().as_millis() as u64;
1048+
search_in_flight_thread.store(false, Ordering::Relaxed);
1049+
let search_finished = Instant::now();
1050+
if started_during_rebuild || search_rebuild_running.load(Ordering::Relaxed) {
1051+
search_overlap.fetch_add(1, Ordering::Relaxed);
1052+
}
1053+
let elapsed_ms = search_finished.duration_since(search_started).as_millis() as u64;
10491054
stats.attempts += 1;
10501055
stats.max_duration_ms = stats.max_duration_ms.max(elapsed_ms);
10511056

@@ -1086,6 +1091,9 @@ fn force_rebuild_preserves_search_results_and_reader_surface_during_atomic_publi
10861091
Some("Run cass index --full --force-rebuild while a direct reader and cass search poll the same live index"),
10871092
);
10881093
rebuild_running.store(true, Ordering::Relaxed);
1094+
if search_in_flight.load(Ordering::Relaxed) {
1095+
search_attempts_during_rebuild.fetch_add(1, Ordering::Relaxed);
1096+
}
10891097
let publish_pause_sentinel = home.join("atomic-publish-overlap-sentinel.json");
10901098
let mut attempt = 0usize;
10911099
let rebuild_output = loop {
@@ -1341,6 +1349,7 @@ fn force_rebuild_preserves_search_results_and_reader_surface_during_federated_at
13411349

13421350
let stop = Arc::new(AtomicBool::new(false));
13431351
let rebuild_running = Arc::new(AtomicBool::new(false));
1352+
let search_in_flight = Arc::new(AtomicBool::new(false));
13441353
let reader_attempts_during_rebuild = Arc::new(AtomicUsize::new(0));
13451354
let search_attempts_during_rebuild = Arc::new(AtomicUsize::new(0));
13461355
let (ready_tx, ready_rx) = mpsc::channel();
@@ -1368,6 +1377,7 @@ fn force_rebuild_preserves_search_results_and_reader_surface_during_federated_at
13681377
let search_ready_tx = ready_tx.clone();
13691378
let search_stop = Arc::clone(&stop);
13701379
let search_rebuild_running = Arc::clone(&rebuild_running);
1380+
let search_in_flight_thread = Arc::clone(&search_in_flight);
13711381
let search_overlap = Arc::clone(&search_attempts_during_rebuild);
13721382
let search_home = home.clone();
13731383
let search_codex_home = codex_home.clone();
@@ -1376,11 +1386,9 @@ fn force_rebuild_preserves_search_results_and_reader_surface_during_federated_at
13761386
let _ = search_ready_tx.send("search");
13771387
let mut stats = SearchLoopStats::default();
13781388
while !search_stop.load(Ordering::Relaxed) {
1379-
if search_rebuild_running.load(Ordering::Relaxed) {
1380-
search_overlap.fetch_add(1, Ordering::Relaxed);
1381-
}
1382-
13831389
let search_started = Instant::now();
1390+
search_in_flight_thread.store(true, Ordering::Relaxed);
1391+
let started_during_rebuild = search_rebuild_running.load(Ordering::Relaxed);
13841392
let output = cargo_bin_cmd!("cass")
13851393
.args([
13861394
"search",
@@ -1401,7 +1409,12 @@ fn force_rebuild_preserves_search_results_and_reader_surface_during_federated_at
14011409
.timeout(Duration::from_secs(20))
14021410
.output()
14031411
.expect("run concurrent federated cass search");
1404-
let elapsed_ms = search_started.elapsed().as_millis() as u64;
1412+
search_in_flight_thread.store(false, Ordering::Relaxed);
1413+
let search_finished = Instant::now();
1414+
if started_during_rebuild || search_rebuild_running.load(Ordering::Relaxed) {
1415+
search_overlap.fetch_add(1, Ordering::Relaxed);
1416+
}
1417+
let elapsed_ms = search_finished.duration_since(search_started).as_millis() as u64;
14051418
stats.attempts += 1;
14061419
stats.max_duration_ms = stats.max_duration_ms.max(elapsed_ms);
14071420

@@ -1442,6 +1455,9 @@ fn force_rebuild_preserves_search_results_and_reader_surface_during_federated_at
14421455
Some("Run cass index --full --force-rebuild with forced multi-shard planning while a direct reader and cass search poll the same live index"),
14431456
);
14441457
rebuild_running.store(true, Ordering::Relaxed);
1458+
if search_in_flight.load(Ordering::Relaxed) {
1459+
search_attempts_during_rebuild.fetch_add(1, Ordering::Relaxed);
1460+
}
14451461
let mut rebuild = cargo_bin_cmd!("cass");
14461462
force_federated_publish_env(&mut rebuild);
14471463
let publish_pause_sentinel = home.join("federated-atomic-publish-overlap-sentinel.json");

0 commit comments

Comments
 (0)