@@ -25,14 +25,17 @@ use path_slash::PathExt;
2525use std:: {
2626 fmt,
2727 fs:: { self , File } ,
28- io:: { self , BufReader } ,
28+ io:: { self , BufReader , Write as _ } ,
2929 num:: ParseIntError ,
3030 ops:: RangeInclusive ,
3131 path:: { Path , PathBuf } ,
3232 sync:: Arc ,
3333} ;
3434use std:: { iter, str:: FromStr } ;
35- use tokio:: { io:: AsyncWriteExt , runtime:: Runtime } ;
35+ use tokio:: {
36+ io:: { AsyncRead , AsyncReadExt as _, AsyncWriteExt } ,
37+ runtime:: Runtime ,
38+ } ;
3639use tracing:: { error, info_span, instrument, trace} ;
3740use walkdir:: WalkDir ;
3841
@@ -57,6 +60,80 @@ impl Blob {
5760 }
5861}
5962
63+ pub ( crate ) struct StreamingBlob {
64+ pub ( crate ) path : String ,
65+ pub ( crate ) mime : Mime ,
66+ pub ( crate ) date_updated : DateTime < Utc > ,
67+ pub ( crate ) compression : Option < CompressionAlgorithm > ,
68+ pub ( crate ) content_length : usize ,
69+ pub ( crate ) content : Box < dyn AsyncRead + Unpin + Send > ,
70+ }
71+
72+ impl std:: fmt:: Debug for StreamingBlob {
73+ fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
74+ f. debug_struct ( "StreamingBlob" )
75+ . field ( "path" , & self . path )
76+ . field ( "mime" , & self . mime )
77+ . field ( "date_updated" , & self . date_updated )
78+ . field ( "compression" , & self . compression )
79+ . finish ( )
80+ }
81+ }
82+
83+ impl StreamingBlob {
84+ /// wrap the content stream in a streaming decompressor according to the
85+ /// algorithm found in `compression` attribute.
86+ pub ( crate ) fn decompress ( mut self ) -> Self {
87+ let Some ( alg) = self . compression else {
88+ return self ;
89+ } ;
90+
91+ match alg {
92+ CompressionAlgorithm :: Zstd => {
93+ self . content = Box :: new ( async_compression:: tokio:: bufread:: ZstdDecoder :: new (
94+ tokio:: io:: BufReader :: new ( self . content ) ,
95+ ) )
96+ }
97+ CompressionAlgorithm :: Bzip2 => {
98+ self . content = Box :: new ( async_compression:: tokio:: bufread:: BzDecoder :: new (
99+ tokio:: io:: BufReader :: new ( self . content ) ,
100+ ) )
101+ }
102+ CompressionAlgorithm :: Gzip => {
103+ self . content = Box :: new ( async_compression:: tokio:: bufread:: GzipDecoder :: new (
104+ tokio:: io:: BufReader :: new ( self . content ) ,
105+ ) )
106+ }
107+ } ;
108+ self . compression = None ;
109+ self
110+ }
111+
112+ pub ( crate ) async fn materialize ( mut self , max_size : usize ) -> Result < Blob > {
113+ self = self . decompress ( ) ;
114+
115+ let mut content = crate :: utils:: sized_buffer:: SizedBuffer :: new ( max_size) ;
116+ content. reserve ( self . content_length ) ;
117+
118+ let mut buf = [ 0u8 ; 8 * 1024 ] ;
119+ loop {
120+ let n = self . content . read ( & mut buf) . await ?;
121+ if n == 0 {
122+ break ;
123+ }
124+ content. write_all ( & buf[ ..n] ) ?;
125+ }
126+
127+ Ok ( Blob {
128+ path : self . path ,
129+ mime : self . mime ,
130+ date_updated : self . date_updated ,
131+ content : content. into_inner ( ) ,
132+ compression : self . compression ,
133+ } )
134+ }
135+ }
136+
60137pub fn get_file_list < P : AsRef < Path > > ( path : P ) -> Box < dyn Iterator < Item = Result < PathBuf > > > {
61138 let path = path. as_ref ( ) . to_path_buf ( ) ;
62139 if path. is_file ( ) {
@@ -210,6 +287,35 @@ impl AsyncStorage {
210287 } )
211288 }
212289
290+ /// Fetch a rustdoc file from our blob storage.
291+ /// * `name` - the crate name
292+ /// * `version` - the crate version
293+ /// * `latest_build_id` - the id of the most recent build. used purely to invalidate the local archive
294+ /// index cache, when `archive_storage` is `true.` Without it we wouldn't know that we have
295+ /// to invalidate the locally cached file after a rebuild.
296+ /// * `path` - the wanted path inside the documentation.
297+ /// * `archive_storage` - if `true`, we will assume we have a remove ZIP archive and an index
298+ /// where we can fetch the requested path from inside the ZIP file.
299+ #[ instrument]
300+ pub ( crate ) async fn stream_rustdoc_file (
301+ & self ,
302+ name : & str ,
303+ version : & str ,
304+ latest_build_id : Option < BuildId > ,
305+ path : & str ,
306+ archive_storage : bool ,
307+ ) -> Result < StreamingBlob > {
308+ trace ! ( "fetch rustdoc file" ) ;
309+ Ok ( if archive_storage {
310+ self . stream_from_archive ( & rustdoc_archive_path ( name, version) , latest_build_id, path)
311+ . await ?
312+ } else {
313+ // Add rustdoc prefix, name and version to the path for accessing the file stored in the database
314+ let remote_path = format ! ( "rustdoc/{name}/{version}/{path}" ) ;
315+ self . get_stream ( & remote_path) . await ?
316+ } )
317+ }
318+
213319 #[ context( "fetching {path} from {name} {version} (archive: {archive_storage})" ) ]
214320 pub ( crate ) async fn fetch_source_file (
215321 & self ,
@@ -282,15 +388,16 @@ impl AsyncStorage {
282388
283389 #[ instrument]
284390 pub ( crate ) async fn get ( & self , path : & str , max_size : usize ) -> Result < Blob > {
285- let mut blob = match & self . backend {
286- StorageBackend :: Database ( db) => db. get ( path, max_size, None ) . await ,
287- StorageBackend :: S3 ( s3) => s3. get ( path, max_size, None ) . await ,
391+ self . get_stream ( path) . await ?. materialize ( max_size) . await
392+ }
393+
394+ #[ instrument]
395+ pub ( crate ) async fn get_stream ( & self , path : & str ) -> Result < StreamingBlob > {
396+ let blob = match & self . backend {
397+ StorageBackend :: Database ( db) => db. get_stream ( path, None ) . await ,
398+ StorageBackend :: S3 ( s3) => s3. get_stream ( path, None ) . await ,
288399 } ?;
289- if let Some ( alg) = blob. compression {
290- blob. content = decompress ( blob. content . as_slice ( ) , alg, max_size) ?;
291- blob. compression = None ;
292- }
293- Ok ( blob)
400+ Ok ( blob. decompress ( ) )
294401 }
295402
296403 #[ instrument]
@@ -301,18 +408,28 @@ impl AsyncStorage {
301408 range : FileRange ,
302409 compression : Option < CompressionAlgorithm > ,
303410 ) -> Result < Blob > {
411+ self . get_range_stream ( path, range, compression)
412+ . await ?
413+ . materialize ( max_size)
414+ . await
415+ }
416+
417+ #[ instrument]
418+ pub ( super ) async fn get_range_stream (
419+ & self ,
420+ path : & str ,
421+ range : FileRange ,
422+ compression : Option < CompressionAlgorithm > ,
423+ ) -> Result < StreamingBlob > {
304424 let mut blob = match & self . backend {
305- StorageBackend :: Database ( db) => db. get ( path, max_size , Some ( range) ) . await ,
306- StorageBackend :: S3 ( s3) => s3. get ( path, max_size , Some ( range) ) . await ,
425+ StorageBackend :: Database ( db) => db. get_stream ( path, Some ( range) ) . await ,
426+ StorageBackend :: S3 ( s3) => s3. get_stream ( path, Some ( range) ) . await ,
307427 } ?;
308428 // `compression` represents the compression of the file-stream inside the archive.
309429 // We don't compress the whole archive, so the encoding of the archive's blob is irrelevant
310430 // here.
311- if let Some ( alg) = compression {
312- blob. content = decompress ( blob. content . as_slice ( ) , alg, max_size) ?;
313- blob. compression = None ;
314- }
315- Ok ( blob)
431+ blob. compression = compression;
432+ Ok ( blob. decompress ( ) )
316433 }
317434
318435 #[ instrument]
@@ -389,6 +506,38 @@ impl AsyncStorage {
389506 } )
390507 }
391508
509+ #[ instrument]
510+ pub ( crate ) async fn stream_from_archive (
511+ & self ,
512+ archive_path : & str ,
513+ latest_build_id : Option < BuildId > ,
514+ path : & str ,
515+ ) -> Result < StreamingBlob > {
516+ let index_filename = self
517+ . download_archive_index ( archive_path, latest_build_id)
518+ . await ?;
519+
520+ let info = {
521+ let path = path. to_owned ( ) ;
522+ spawn_blocking ( move || archive_index:: find_in_file ( index_filename, & path) ) . await
523+ } ?
524+ . ok_or ( PathNotFoundError ) ?;
525+
526+ let blob = self
527+ . get_range_stream ( archive_path, info. range ( ) , Some ( info. compression ( ) ) )
528+ . await ?;
529+ assert_eq ! ( blob. compression, None ) ;
530+
531+ Ok ( StreamingBlob {
532+ path : format ! ( "{archive_path}/{path}" ) ,
533+ mime : detect_mime ( path) ,
534+ date_updated : blob. date_updated ,
535+ content : blob. content ,
536+ content_length : blob. content_length ,
537+ compression : None ,
538+ } )
539+ }
540+
392541 #[ instrument( skip( self ) ) ]
393542 pub ( crate ) async fn store_all_in_archive (
394543 & self ,
0 commit comments