Skip to content

Commit bc159d5

Browse files
authored
fix test_repo_upload flake (#8066)
Closes #8057.
1 parent 700a8b9 commit bc159d5

File tree

4 files changed

+47
-7
lines changed

4 files changed

+47
-7
lines changed

nexus/tests/integration_tests/updates.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,12 @@ async fn test_repo_upload() -> Result<()> {
179179
// The local repo is not deleted until the next task run.
180180
assert_eq!(status.local_repos, 1);
181181

182+
// Wait for all the copy requests to complete.
183+
futures::future::join_all(cptestctx.sled_agents.iter().map(|sled_agent| {
184+
sled_agent.sled_agent().artifact_store().wait_for_copy_tasks()
185+
}))
186+
.await;
187+
182188
// Run the replication background task again; the local repos should be
183189
// dropped.
184190
let status =

sled-agent/src/artifact_store.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ use sled_storage::manager::StorageHandle;
4545
use slog::{Logger, error, info};
4646
use slog_error_chain::{InlineErrorChain, SlogInlineError};
4747
use tokio::fs::File;
48-
use tokio::sync::{mpsc, oneshot, watch};
48+
use tokio::sync::{OwnedSemaphorePermit, mpsc, oneshot, watch};
4949
use tokio::task::JoinSet;
5050
use tufaceous_artifact::ArtifactHash;
5151

@@ -77,12 +77,12 @@ const TEMP_SUBDIR: &str = "tmp";
7777
/// - for PUT, we try to write to all datasets, logging errors as we go; if we
7878
/// successfully write the artifact to at least one, we return OK.
7979
/// - for GET, we look in each dataset until we find it.
80-
pub(crate) struct ArtifactStore<T: DatasetsManager> {
80+
pub struct ArtifactStore<T: DatasetsManager> {
8181
log: Logger,
8282
reqwest_client: reqwest::Client,
8383
ledger_tx: mpsc::Sender<LedgerManagerRequest>,
8484
config: watch::Receiver<Option<ArtifactConfig>>,
85-
storage: T,
85+
pub(crate) storage: T,
8686

8787
/// Used for synchronization in unit tests.
8888
#[cfg(test)]
@@ -400,6 +400,7 @@ impl<T: DatasetsManager> ArtifactStore<T> {
400400
) -> Result<(), Error> {
401401
// Check that there's no conflict before we send the upstream request.
402402
let writer = self.writer(sha256, generation).await?;
403+
let permit = self.storage.copy_permit().await;
403404

404405
let client = repo_depot_client::Client::new_with_client(
405406
depot_base_url,
@@ -421,6 +422,7 @@ impl<T: DatasetsManager> ArtifactStore<T> {
421422
let log = self.log.clone();
422423
let base_url = depot_base_url.to_owned();
423424
tokio::task::spawn(async move {
425+
let _permit = permit;
424426
let stream = response.into_inner().into_inner().map_err(|err| {
425427
Error::DepotCopy {
426428
sha256,
@@ -601,10 +603,15 @@ async fn delete_reconciler<T: DatasetsManager>(
601603
/// Abstracts over what kind of sled agent we are; each of the real sled agent,
602604
/// simulated sled agent, and this module's unit tests have different ways of
603605
/// keeping track of the datasets on the system.
604-
pub(crate) trait DatasetsManager: Clone + Send + Sync + 'static {
606+
pub trait DatasetsManager: Clone + Send + Sync + 'static {
605607
fn artifact_storage_paths(
606608
&self,
607609
) -> impl Future<Output = impl Iterator<Item = Utf8PathBuf> + Send + '_> + Send;
610+
611+
#[expect(async_fn_in_trait)]
612+
async fn copy_permit(&self) -> Option<OwnedSemaphorePermit> {
613+
None
614+
}
608615
}
609616

610617
impl DatasetsManager for StorageHandle {

sled-agent/src/sim/artifact_store.rs

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,29 @@
55
//! Implementation of `crate::artifact_store::StorageBackend` for our simulated
66
//! storage.
77
8+
use std::sync::Arc;
9+
810
use camino_tempfile::Utf8TempDir;
911
use dropshot::{
1012
Body, ConfigDropshot, FreeformBody, HttpError, HttpResponseOk, HttpServer,
1113
Path, RequestContext, ServerBuilder,
1214
};
1315
use repo_depot_api::*;
14-
use std::sync::Arc;
16+
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
1517

1618
use crate::artifact_store::{ArtifactStore, DatasetsManager};
1719

20+
// Semaphore mostly uses usize but in `acquire_many` it unfortunately uses u32.
21+
const MAX_PERMITS: u32 = u32::MAX >> 3;
22+
1823
#[derive(Clone)]
19-
pub(super) struct SimArtifactStorage {
24+
pub struct SimArtifactStorage {
2025
// We simulate the two M.2s with two separate temporary directories.
2126
dirs: Arc<[Utf8TempDir; 2]>,
27+
28+
// Semaphore to keep track of how many copy requests are in flight, and to
29+
// be able to await on their completion. Used in integration tests.
30+
copy_semaphore: Arc<Semaphore>,
2231
}
2332

2433
impl SimArtifactStorage {
@@ -28,6 +37,9 @@ impl SimArtifactStorage {
2837
camino_tempfile::tempdir().unwrap(),
2938
camino_tempfile::tempdir().unwrap(),
3039
]),
40+
copy_semaphore: Arc::new(
41+
const { Semaphore::const_new(MAX_PERMITS as usize) },
42+
),
3143
}
3244
}
3345
}
@@ -38,6 +50,10 @@ impl DatasetsManager for SimArtifactStorage {
3850
) -> impl Iterator<Item = camino::Utf8PathBuf> + '_ {
3951
self.dirs.iter().map(|tempdir| tempdir.path().to_owned())
4052
}
53+
54+
async fn copy_permit(&self) -> Option<OwnedSemaphorePermit> {
55+
Some(self.copy_semaphore.clone().acquire_owned().await.unwrap())
56+
}
4157
}
4258

4359
impl ArtifactStore<SimArtifactStorage> {
@@ -56,6 +72,17 @@ impl ArtifactStore<SimArtifactStorage> {
5672
.start()
5773
.unwrap()
5874
}
75+
76+
pub async fn wait_for_copy_tasks(&self) {
77+
// Acquire a permit for MAX_PERMITS, which requires that all copy tasks
78+
// have dropped their permits. Then immediately drop it.
79+
let _permit = self
80+
.storage
81+
.copy_semaphore
82+
.acquire_many(MAX_PERMITS)
83+
.await
84+
.unwrap();
85+
}
5986
}
6087

6188
/// Implementation of the Repo Depot API backed by an

sled-agent/src/sim/sled_agent.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -504,7 +504,7 @@ impl SledAgent {
504504
&self.updates
505505
}
506506

507-
pub(super) fn artifact_store(&self) -> &ArtifactStore<SimArtifactStorage> {
507+
pub fn artifact_store(&self) -> &ArtifactStore<SimArtifactStorage> {
508508
self.repo_depot.app_private()
509509
}
510510

0 commit comments

Comments
 (0)