Skip to content

Commit 3675075

Browse files
authored
[5/n] [installinator] move artifact fetch logic into its own file (#8039)
Following up from the previous commit in the series, move artifact fetch logic into its own file similar to report logic. Also put `HttpFetchBackend` in this file. This is pure refactoring -- there are no functional changes in this PR.
1 parent 7f312b7 commit 3675075

File tree

6 files changed

+369
-358
lines changed

6 files changed

+369
-358
lines changed

installinator/src/artifact.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use tokio::sync::mpsc;
1414
use tufaceous_artifact::{ArtifactHash, ArtifactHashId};
1515
use uuid::Uuid;
1616

17-
use crate::{errors::HttpError, peers::FetchReceiver};
17+
use crate::{errors::HttpError, fetch::FetchReceiver};
1818

1919
#[derive(Clone, Debug, Eq, PartialEq, Args)]
2020
pub(crate) struct ArtifactIdOpts {

installinator/src/dispatch.rs

+2-4
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,8 @@ use update_engine::StepResult;
2525
use crate::{
2626
ArtifactWriter, WriteDestination,
2727
artifact::ArtifactIdOpts,
28-
peers::{
29-
DiscoveryMechanism, FetchArtifactBackend, FetchedArtifact,
30-
HttpFetchBackend,
31-
},
28+
fetch::{FetchArtifactBackend, FetchedArtifact, HttpFetchBackend},
29+
peers::DiscoveryMechanism,
3230
reporter::{HttpProgressBackend, ProgressReporter, ReportProgressBackend},
3331
};
3432

installinator/src/fetch.rs

+361
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,361 @@
1+
// This Source Code Form is subject to the terms of the Mozilla Public
2+
// License, v. 2.0. If a copy of the MPL was not distributed with this
3+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
5+
//! Logic to fetch artifacts from a peer.
6+
7+
use std::{fmt, future::Future, time::Duration};
8+
9+
use anyhow::Result;
10+
use async_trait::async_trait;
11+
use buf_list::BufList;
12+
use bytes::Bytes;
13+
use display_error_chain::DisplayErrorChain;
14+
use installinator_client::ClientError;
15+
use installinator_common::{
16+
InstallinatorProgressMetadata, StepContext, StepProgress,
17+
};
18+
use itertools::Itertools;
19+
use tokio::{sync::mpsc, time::Instant};
20+
use tufaceous_artifact::ArtifactHashId;
21+
use update_engine::events::ProgressUnits;
22+
23+
use crate::{
24+
artifact::ArtifactClient,
25+
errors::{ArtifactFetchError, DiscoverPeersError, HttpError},
26+
peers::PeerAddress,
27+
};
28+
29+
/// A fetched artifact.
30+
pub(crate) struct FetchedArtifact {
31+
pub(crate) attempt: usize,
32+
pub(crate) peer: PeerAddress,
33+
pub(crate) artifact: BufList,
34+
}
35+
36+
impl FetchedArtifact {
37+
/// In a loop, discover peers, and fetch from them.
38+
///
39+
/// If `discover_fn` returns [`DiscoverPeersError::Retry`], this function will retry. If it
40+
/// returns `DiscoverPeersError::Abort`, this function will exit with the underlying error.
41+
pub(crate) async fn loop_fetch_from_peers<F, Fut>(
42+
cx: &StepContext,
43+
log: &slog::Logger,
44+
mut discover_fn: F,
45+
artifact_hash_id: &ArtifactHashId,
46+
) -> Result<Self>
47+
where
48+
F: FnMut() -> Fut,
49+
Fut: Future<Output = Result<FetchArtifactBackend, DiscoverPeersError>>,
50+
{
51+
// How long to sleep between retries if we fail to find a peer or fail
52+
// to fetch an artifact from a found peer.
53+
const RETRY_DELAY: Duration = Duration::from_secs(5);
54+
55+
let mut attempt = 0;
56+
loop {
57+
attempt += 1;
58+
let peers = match discover_fn().await {
59+
Ok(peers) => peers,
60+
Err(DiscoverPeersError::Retry(error)) => {
61+
slog::warn!(
62+
log,
63+
"(attempt {attempt}) failed to discover peers, retrying: {}",
64+
DisplayErrorChain::new(
65+
AsRef::<dyn std::error::Error>::as_ref(&error)
66+
),
67+
);
68+
cx.send_progress(StepProgress::retry(format!(
69+
"failed to discover peers: {error}"
70+
)))
71+
.await;
72+
tokio::time::sleep(RETRY_DELAY).await;
73+
continue;
74+
}
75+
#[cfg(test)]
76+
Err(DiscoverPeersError::Abort(error)) => {
77+
return Err(error);
78+
}
79+
};
80+
81+
slog::info!(
82+
log,
83+
"discovered {} peers: [{}]",
84+
peers.peer_count(),
85+
peers.display(),
86+
);
87+
match peers.fetch_artifact(&cx, artifact_hash_id).await {
88+
Some((peer, artifact)) => {
89+
return Ok(Self { attempt, peer, artifact });
90+
}
91+
None => {
92+
slog::warn!(
93+
log,
94+
"unable to fetch artifact from peers, retrying discovery",
95+
);
96+
cx.send_progress(StepProgress::retry(format!(
97+
"unable to fetch artifact from any of {} peers, retrying",
98+
peers.peer_count(),
99+
)))
100+
.await;
101+
tokio::time::sleep(RETRY_DELAY).await;
102+
}
103+
}
104+
}
105+
}
106+
}
107+
108+
impl fmt::Debug for FetchedArtifact {
109+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
110+
f.debug_struct("FetchedArtifact")
111+
.field("attempt", &self.attempt)
112+
.field("peer", &self.peer)
113+
.field(
114+
"artifact",
115+
&format!(
116+
"({} bytes in {} chunks)",
117+
self.artifact.num_bytes(),
118+
self.artifact.num_chunks()
119+
),
120+
)
121+
.finish()
122+
}
123+
}
124+
125+
#[derive(Debug)]
126+
pub(crate) struct FetchArtifactBackend {
127+
log: slog::Logger,
128+
imp: Box<dyn FetchArtifactImpl>,
129+
timeout: Duration,
130+
}
131+
132+
impl FetchArtifactBackend {
133+
pub(crate) fn new(
134+
log: &slog::Logger,
135+
imp: Box<dyn FetchArtifactImpl>,
136+
timeout: Duration,
137+
) -> Self {
138+
let log = log.new(slog::o!("component" => "Peers"));
139+
Self { log, imp, timeout }
140+
}
141+
142+
pub(crate) async fn fetch_artifact(
143+
&self,
144+
cx: &StepContext,
145+
artifact_hash_id: &ArtifactHashId,
146+
) -> Option<(PeerAddress, BufList)> {
147+
// TODO: do we want a check phase that happens before the download?
148+
let peers = self.peers();
149+
let mut remaining_peers = self.peer_count();
150+
151+
let log = self.log.new(
152+
slog::o!("artifact_hash_id" => format!("{artifact_hash_id:?}")),
153+
);
154+
155+
slog::debug!(log, "start fetch from peers"; "remaining_peers" => remaining_peers);
156+
157+
for peer in peers {
158+
remaining_peers -= 1;
159+
160+
slog::debug!(
161+
log,
162+
"start fetch from peer {peer:?}"; "remaining_peers" => remaining_peers,
163+
);
164+
165+
// Attempt to download data from this peer.
166+
let start = Instant::now();
167+
match self.fetch_from_peer(cx, peer, artifact_hash_id).await {
168+
Ok(artifact_bytes) => {
169+
let elapsed = start.elapsed();
170+
slog::info!(
171+
log,
172+
"fetched artifact from peer {peer} in {elapsed:?}"
173+
);
174+
return Some((peer, artifact_bytes));
175+
}
176+
Err(error) => {
177+
let elapsed = start.elapsed();
178+
slog::warn!(
179+
log,
180+
"error after {elapsed:?}: {}",
181+
DisplayErrorChain::new(&error);
182+
"remaining_peers" => remaining_peers,
183+
);
184+
}
185+
}
186+
}
187+
188+
None
189+
}
190+
191+
pub(crate) fn peers(&self) -> impl Iterator<Item = PeerAddress> + '_ {
192+
self.imp.peers()
193+
}
194+
195+
pub(crate) fn peer_count(&self) -> usize {
196+
self.imp.peer_count()
197+
}
198+
199+
pub(crate) fn display(&self) -> impl fmt::Display {
200+
self.peers().join(", ")
201+
}
202+
203+
async fn fetch_from_peer(
204+
&self,
205+
cx: &StepContext,
206+
peer: PeerAddress,
207+
artifact_hash_id: &ArtifactHashId,
208+
) -> Result<BufList, ArtifactFetchError> {
209+
let log = self.log.new(slog::o!("peer" => peer.to_string()));
210+
211+
let (total_bytes, mut receiver) = match self
212+
.imp
213+
.fetch_from_peer_impl(peer, artifact_hash_id.clone())
214+
.await
215+
{
216+
Ok(x) => x,
217+
Err(error) => {
218+
cx.send_progress(StepProgress::Reset {
219+
metadata: InstallinatorProgressMetadata::Download {
220+
peer: peer.address(),
221+
},
222+
message: error.to_string().into(),
223+
})
224+
.await;
225+
return Err(ArtifactFetchError::HttpError {
226+
peer: peer.address(),
227+
error,
228+
});
229+
}
230+
};
231+
232+
let mut artifact_bytes = BufList::new();
233+
let mut downloaded_bytes = 0u64;
234+
let metadata =
235+
InstallinatorProgressMetadata::Download { peer: peer.address() };
236+
237+
loop {
238+
match tokio::time::timeout(self.timeout, receiver.recv()).await {
239+
Ok(Some(Ok(bytes))) => {
240+
slog::debug!(
241+
&log,
242+
"received chunk of {} bytes from peer",
243+
bytes.len()
244+
);
245+
downloaded_bytes += bytes.len() as u64;
246+
artifact_bytes.push_chunk(bytes);
247+
cx.send_progress(StepProgress::with_current_and_total(
248+
downloaded_bytes,
249+
total_bytes,
250+
ProgressUnits::BYTES,
251+
metadata.clone(),
252+
))
253+
.await;
254+
}
255+
Ok(Some(Err(error))) => {
256+
slog::debug!(
257+
&log,
258+
"received error from peer, sending cancellation: {}",
259+
DisplayErrorChain::new(&error),
260+
);
261+
cx.send_progress(StepProgress::Reset {
262+
metadata: metadata.clone(),
263+
message: error.to_string().into(),
264+
})
265+
.await;
266+
return Err(ArtifactFetchError::HttpError {
267+
peer: peer.address(),
268+
error: error.into(),
269+
});
270+
}
271+
Ok(None) => {
272+
// The entire artifact has been downloaded.
273+
break;
274+
}
275+
Err(_) => {
276+
// The operation timed out.
277+
cx.send_progress(StepProgress::Reset {
278+
metadata,
279+
message: format!(
280+
"operation timed out ({:?})",
281+
self.timeout
282+
)
283+
.into(),
284+
})
285+
.await;
286+
return Err(ArtifactFetchError::Timeout {
287+
peer: peer.address(),
288+
timeout: self.timeout,
289+
bytes_fetched: artifact_bytes.num_bytes(),
290+
});
291+
}
292+
}
293+
}
294+
295+
// Check that the artifact size matches the returned size.
296+
if total_bytes != artifact_bytes.num_bytes() as u64 {
297+
let error = ArtifactFetchError::SizeMismatch {
298+
artifact_size: total_bytes,
299+
downloaded_bytes,
300+
};
301+
cx.send_progress(StepProgress::reset(metadata, error.to_string()))
302+
.await;
303+
return Err(error);
304+
}
305+
306+
Ok(artifact_bytes)
307+
}
308+
}
309+
310+
#[async_trait]
311+
pub(crate) trait FetchArtifactImpl: fmt::Debug + Send + Sync {
312+
fn peers(&self) -> Box<dyn Iterator<Item = PeerAddress> + Send + '_>;
313+
fn peer_count(&self) -> usize;
314+
315+
/// Returns (size, receiver) on success, and an error on failure.
316+
async fn fetch_from_peer_impl(
317+
&self,
318+
peer: PeerAddress,
319+
artifact_hash_id: ArtifactHashId,
320+
) -> Result<(u64, FetchReceiver), HttpError>;
321+
}
322+
323+
/// The send side of the channel over which data is sent.
324+
pub(crate) type FetchReceiver = mpsc::Receiver<Result<Bytes, ClientError>>;
325+
326+
/// A [`FetchArtifactImpl`] that uses HTTP to fetch artifacts from peers.
327+
///
328+
/// This is the real implementation.
329+
#[derive(Clone, Debug)]
330+
pub(crate) struct HttpFetchBackend {
331+
log: slog::Logger,
332+
peers: Vec<PeerAddress>,
333+
}
334+
335+
impl HttpFetchBackend {
336+
pub(crate) fn new(log: &slog::Logger, peers: Vec<PeerAddress>) -> Self {
337+
let log = log.new(slog::o!("component" => "HttpPeers"));
338+
Self { log, peers }
339+
}
340+
}
341+
342+
#[async_trait]
343+
impl FetchArtifactImpl for HttpFetchBackend {
344+
fn peers(&self) -> Box<dyn Iterator<Item = PeerAddress> + Send + '_> {
345+
Box::new(self.peers.iter().copied())
346+
}
347+
348+
fn peer_count(&self) -> usize {
349+
self.peers.len()
350+
}
351+
352+
async fn fetch_from_peer_impl(
353+
&self,
354+
peer: PeerAddress,
355+
artifact_hash_id: ArtifactHashId,
356+
) -> Result<(u64, FetchReceiver), HttpError> {
357+
// TODO: be able to fetch from sled-agent clients as well
358+
let artifact_client = ArtifactClient::new(peer.address(), &self.log);
359+
artifact_client.fetch(artifact_hash_id).await
360+
}
361+
}

installinator/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ mod async_temp_file;
77
mod bootstrap;
88
mod dispatch;
99
mod errors;
10+
mod fetch;
1011
mod hardware;
1112
#[cfg(test)]
1213
mod mock_peers;

0 commit comments

Comments
 (0)