5656//!     "claimed". This way, if a crash occurs, the previous fixed metadata will still report `a b 
5757//!     c d e` as orphans. 
5858//! 
59- //!   * The metadata is committed again and another transaction comes in. The transaction reclaims 2 
60- //!     pages (`c` and `d`) and orphans 2 pages (`h` and `i`). The state will be recorded as 
61- //!     follows: 
59+ //!   * Subsequent transactions will have a similar effect, with new orphans being appended to the 
60+ //!     right of the list, and old orphans being reclaimed from the left. 
6261//! 
63- //!     ```text 
64- //!     page list: [ h i c d e f g ] 
65- //!                  └┬┘ └┬┘ └─┬─┘ 
66- //!                orphans│ orphans 
67- //!                    claimed 
68- //!     ``` 
69- //! 
70- //!     In words, the new orphans are written in place of the previously claimed pages. Like 
71- //!     before, if a crash occurs, the previous fixed metadata will still report `c d e f g` as 
72- //!     orphans. 
73- //! 
74- //!   * Let's now supposed a transaction reclaims 2 pages (`e` and `f`) and orphans 5 pages (`j`, 
75- //!     `k`, `l`, `m`, `n`). This will be the new list: 
76- //! 
77- //!     ```text 
78- //!     page list: [ h i j k e f g l m n ] 
79- //!                  └──┬──┘ └┬┘ └──┬──┘ 
80- //!                  orphans  │  orphans 
81- //!                        claimed 
82- //!     ``` 
83- //! 
84- //!     In words, orphaned pages are added to the previous claimed section until full, and then 
85- //!     appended to the end of the list. 
86- //! 
87- //!   To implement this mechanism, two field are required: the size of the page list, and the range 
88- //!   of claimed pages. 
62+ //!   * To prevent the orphan list from growing too big, the list is periodically compactified by 
63+ //!     overwriting the claimed section of the list with the actual orphans. 
8964
9065mod  error; 
9166
@@ -138,25 +113,6 @@ impl Not for SlotIndex {
138113/// * root node information; 
139114/// * number of allocated pages; 
140115/// * orphan page information. 
141- /// 
142- /// The orphan page information is used to interpret the contents of the `orphan_pages` list. This 
143- /// list may contain pages that are really orphan and unused, and pages that were orphan in the 
144- /// previous snapshot and were later reclaimed. See the [module-level documentation](self) for more 
145- /// information about why `orphan_pages` contains reclaimed pages. 
146- /// 
147- /// The list of reclaimed pages is a contiguous slice inside `orphan_pages` and the meaning of the 
148- /// field in this structure is explained by the following diagram: 
149- /// 
150- /// ```text 
151- ///               reclaimed pages 
152- ///                      ╷ 
153- ///                   ┌──┴──┐ 
154- /// orphan_pages: [ a b c d e f g h ] 
155- ///                   │       │     │ 
156- ///                   │       │     └╴orphan_pages_len 
157- ///                   │       └╴reclaimed_orphans_end 
158- ///                   └╴reclaimed_orphans_start 
159- /// ``` 
160116#[ repr( C ) ]  
161117#[ derive( FromBytes ,  IntoBytes ,  Immutable ,  KnownLayout ,  ByteEq ,  Clone ,  Debug ) ]  
162118pub  struct  MetadataSlot  { 
@@ -170,13 +126,8 @@ pub struct MetadataSlot {
170126     page_count :  u32 , 
171127    /// Total size of `orphan_pages` (including reclaimed pages) 
172128     orphan_pages_len :  u32 , 
173-     /// Index of the first reclaimed page inside of `orphan_pages` 
174-      reclaimed_orphans_start :  u32 , 
175-     /// Number of reclaimed pages inside of `orphan_pages` 
176-      reclaimed_orphans_end :  u32 , 
177-     /// Unused data to allow this structure to be properly aligned. This padding is stored on disk 
178-      /// to improve runtime performance 
179-      padding :  u32 , 
129+     /// Number of reclaimed pages from the `orphan_pages` list 
130+      reclaimed_orphans_len :  u32 , 
180131} 
181132
182133impl  MetadataSlot  { 
@@ -251,24 +202,16 @@ impl MetadataSlot {
251202     #[ inline]  
252203    #[ must_use]  
253204    fn  reclaimed_range ( & self )  -> Range < usize >  { 
254-         self . reclaimed_orphans_start   as   usize .. self . reclaimed_orphans_end  as  usize 
205+         0 .. self . reclaimed_orphans_len  as  usize 
255206    } 
256207
257208    /// Returns the range of pages that are actually orphans (not reclaimed) in the `orphan_pages` 
258209     /// list. 
259-      /// 
260-      /// This returns 2 disjoint ranges because there may be some reclaimed pages in the middle. 
261210     #[ inline]  
262211    #[ must_use]  
263-     fn  actual_orphans_ranges ( & self )  -> ( Range < usize > ,  Range < usize > )  { 
264-         debug_assert ! ( 
265-             self . orphan_pages_len >= self . reclaimed_orphans_start + self . reclaimed_orphans_end
266-         ) ; 
267-         ( 
268-             0 ..( self . reclaimed_orphans_start  as  usize ) , 
269-             ( self . reclaimed_orphans_start  as  usize  + self . reclaimed_orphans_end  as  usize ) ..
270-                 ( self . orphan_pages_len  as  usize ) , 
271-         ) 
212+     fn  actual_orphans_range ( & self )  -> Range < usize >  { 
213+         debug_assert ! ( self . orphan_pages_len >= self . reclaimed_orphans_len) ; 
214+         self . reclaimed_orphans_len  as  usize ..self . orphan_pages_len  as  usize 
272215    } 
273216
274217    /// Computes the hash for this slot. 
@@ -310,11 +253,7 @@ impl HashedMetadataSlot {
310253        } 
311254        // Check that the number of reclaimed pages doesn't exceed the total number of orphan 
312255        // pages. 
313-         let  reclaimed_orphans_end = self 
314-             . reclaimed_orphans_start 
315-             . checked_add ( self . reclaimed_orphans_end ) 
316-             . ok_or ( CorruptedMetadataError ) ?; 
317-         if  self . orphan_pages_len  < reclaimed_orphans_end { 
256+         if  self . orphan_pages_len  < self . reclaimed_orphans_len  { 
318257            return  Err ( CorruptedMetadataError ) ; 
319258        } 
320259        // Check the hash. 
@@ -559,11 +498,43 @@ impl MetadataManager {
559498        Ok ( ( ) ) 
560499    } 
561500
501+     fn  compact_orphans ( & mut  self )  { 
502+         let  ( active,  dirty,  list)  = self . parts_mut ( ) ; 
503+         let  reclaimed_range = active. reclaimed_range ( ) ; 
504+         let  actual_orphans_range = dirty. actual_orphans_range ( ) ; 
505+ 
506+         // If orphan page list has observed N pushes and M pops, then first of all note that: 
507+         // 
508+         // - N == actual_orphans_range.len() 
509+         // - M == reclaimed_range.len() 
510+         // - N - M = actual_orphans_range.len() 
511+         // - N >= M, or equivalently N - M >= 0 
512+         // 
513+         // Here we move the orphans to the start of the list when M >= N - M, which implies N <= 
514+         // 2M. If that condition is satisfied, we will move N - M items. In total, if the condition 
515+         // is satisfied, we will have performed: 
516+         // 
517+         // - N pushes 
518+         // - M pops 
519+         // - N - M copies 
520+         // 
521+         // for a total of N + M + N - M = 2N operations, which means that adding items to the 
522+         // orphan page list still takes O(1) amortized time. 
523+         if  reclaimed_range. len ( )  >= actual_orphans_range. len ( )  { 
524+             dirty. orphan_pages_len  = actual_orphans_range. len ( )  as  u32 ; 
525+             dirty. reclaimed_orphans_len  = 0 ; 
526+             list. copy_within ( actual_orphans_range,  0 ) ; 
527+         } 
528+     } 
529+ 
562530    /// Saves the metadata to the storage device, and promotes the dirty slot to the active slot. 
563531     /// 
564532     /// After calling this method, a new dirty slot is produced with the same contents as the new 
565533     /// active slot, and an auto-incremented snapshot ID. 
566534     pub  fn  commit ( & mut  self )  -> io:: Result < ( ) >  { 
535+         // Compact the orphan page list if there's enough room 
536+         self . compact_orphans ( ) ; 
537+ 
567538        // First make sure the changes from the dirty slot are on disk 
568539        self . dirty_slot_mut ( ) . update_hash ( ) ; 
569540        debug_assert ! ( self . dirty_slot_mut( ) . verify_integrity( ) . is_ok( ) ) ; 
@@ -622,7 +593,7 @@ impl<'a> OrphanPages<'a> {
622593     #[ inline]  
623594    pub  fn  len ( & self )  -> usize  { 
624595        let  m = self . manager . dirty_slot ( ) ; 
625-         ( m. orphan_pages_len  as  usize )  - ( m. reclaimed_orphans_end  as  usize ) 
596+         ( m. orphan_pages_len  as  usize )  - ( m. reclaimed_orphans_len  as  usize ) 
626597    } 
627598
628599    /// Maximum number of orphan pages that this list can contain without resizing. 
@@ -640,8 +611,8 @@ impl<'a> OrphanPages<'a> {
640611    /// Returns an iterator that yields the IDs of orphan pages. 
641612     pub  fn  iter ( & self )  -> impl  FusedIterator < Item  = OrphanPage >  + use < ' _ >  { 
642613        let  list = self . manager . raw_orphan_pages ( ) ; 
643-         let  ( left ,  right )   = self . manager . dirty_slot ( ) . actual_orphans_ranges ( ) ; 
644-         list[ right ] . iter ( ) . copied ( ) . chain ( list [ left ] . iter ( ) . copied ( ) ) 
614+         let  range  = self . manager . dirty_slot ( ) . actual_orphans_range ( ) ; 
615+         list[ range ] . iter ( ) . copied ( ) 
645616    } 
646617
647618    /// Adds a page to the orphan page list, increasing the capacity of the list if necessary. 
@@ -656,43 +627,17 @@ impl<'a> OrphanPages<'a> {
656627
657628    /// Adds a page to the orphan page list if there is enough capacity. 
658629     pub  fn  push_within_capacity ( & mut  self ,  orphan :  OrphanPage )  -> Result < ( ) ,  OrphanPage >  { 
659-         // To make sure the previous snapshot is always valid, we cannot modify orphan pages that 
660-         // are referenced by the previous snapshot. We can only modify the reclaimed orphans 
661-         // slice, or the the additional orphans elements added at the end of the list (if any). 
662-         // 
663-         // Particular care should be taken because a `pop()` may be followed by a `push()`, and 
664-         // it's important that the `push()` does not overwrite data from the previous snapshot. 
665- 
666-         let  ( active,  dirty,  list)  = self . manager . parts_mut ( ) ; 
667- 
668-         // Check if we can write in the reclaimed slice. We can only write in the intersection 
669-         // between the active reclaimed range and the dirty reclaimed range, and only at the bounds 
670-         // of the dirty reclaimed range. 
671-         let  active_reclaimed_range = active. reclaimed_range ( ) ; 
672-         let  dirty_reclaimed_range = dirty. reclaimed_range ( ) ; 
673-         let  intersection = active_reclaimed_range. start . max ( dirty_reclaimed_range. start ) ..
674-             active_reclaimed_range. end . min ( dirty_reclaimed_range. end ) ; 
675- 
676-         if  !intersection. is_empty ( )  { 
677-             if  intersection. start  == dirty_reclaimed_range. start  { 
678-                 list[ dirty_reclaimed_range. start ]  = orphan; 
679-                 dirty. reclaimed_orphans_start  += 1 ; 
680-                 dirty. reclaimed_orphans_end  -= 1 ; 
681-                 return  Ok ( ( ) ) ; 
682-             }  else  if  intersection. end  == dirty_reclaimed_range. end  { 
683-                 list[ dirty_reclaimed_range. end  - 1 ]  = orphan; 
684-                 dirty. reclaimed_orphans_end  -= 1 ; 
685-                 return  Ok ( ( ) ) ; 
686-             } 
687-         } 
630+         let  ( _,  dirty,  list)  = self . manager . parts_mut ( ) ; 
688631
689-         // We need to write at the end of the list. We can only write past the active and dirty 
690-         // list. 
691-         let  ( _,  active_orphans_range)  = active. actual_orphans_ranges ( ) ; 
692-         let  ( _,  dirty_orphans_range)  = dirty. actual_orphans_ranges ( ) ; 
693-         let  index = active_orphans_range. end . max ( dirty_orphans_range. end ) ; 
694-         if  index < list. len ( )  { 
695-             list[ index]  = orphan; 
632+         let  range = dirty. actual_orphans_range ( ) ; 
633+         if  range. end  < list. len ( )  { 
634+             if  range. end  > 0  { 
635+                 // In debug mode, ensure that the sequence of orphan page snapshot IDs are 
636+                 // non-decreasing. This is because `pop()` makes this assumption, and if this 
637+                 // assumption is broken, then the orphan page list may grow indefinetely. 
638+                 debug_assert ! ( orphan. orphaned_at >= list[ range. end - 1 ] . orphaned_at) ; 
639+             } 
640+             list[ range. end ]  = orphan; 
696641            dirty. orphan_pages_len  += 1 ; 
697642            return  Ok ( ( ) ) ; 
698643        } 
@@ -710,31 +655,12 @@ impl<'a> OrphanPages<'a> {
710655     /// exists. 
711656     pub  fn  pop ( & mut  self ,  snapshot_threshold :  SnapshotId )  -> Option < OrphanPage >  { 
712657        let  ( _,  dirty,  list)  = self . manager . parts_mut ( ) ; 
713-         let  ( left ,  right )   = dirty. actual_orphans_ranges ( ) ; 
658+         let  range  = dirty. actual_orphans_range ( ) ; 
714659
715-         // The following code checks the `left` and `right` ranges for orphaned pages that have an 
716-         // `orphaned_at()` equal or below `snapshot_threshold`. 
717-         // 
718-         // Instead of scanning the whole `left` and `right` lists, the code only check the boundary 
719-         // elements. The assumption is that snapshot IDs are always increasing, never decreasing, 
720-         // and therefore each call to `push()` always adds pages with an increasing 
721-         // `orphaned_at()`. So if the first element has an `orphaned_at()` that is already too 
722-         // high, there's no point in checking the other elements, because they will also be above 
723-         // the threshold. 
724- 
725-         if  !right. is_empty ( )  { 
726-             let  orphan = list[ right. start ] ; 
660+         if  !range. is_empty ( )  { 
661+             let  orphan = list[ range. start ] ; 
727662            if  orphan. orphaned_at ( )  <= snapshot_threshold { 
728-                 dirty. reclaimed_orphans_end  += 1 ; 
729-                 return  Some ( orphan) ; 
730-             } 
731-         } 
732- 
733-         if  !left. is_empty ( )  { 
734-             let  orphan = list[ left. end  - 1 ] ; 
735-             if  orphan. orphaned_at ( )  <= snapshot_threshold { 
736-                 dirty. reclaimed_orphans_start  -= 1 ; 
737-                 dirty. reclaimed_orphans_end  += 1 ; 
663+                 dirty. reclaimed_orphans_len  += 1 ; 
738664                return  Some ( orphan) ; 
739665            } 
740666        } 
@@ -890,7 +816,7 @@ mod tests {
890816        } 
891817
892818        #[ test]  
893-         fn  push_pop ( )  { 
819+         fn  random_push_pop ( )  { 
894820            let  f = tempfile:: tempfile ( ) . expect ( "failed to open temporary file" ) ; 
895821            let  mut  manager =
896822                MetadataManager :: from_file ( f) . expect ( "failed to initialize metadata manager" ) ; 
@@ -911,6 +837,43 @@ mod tests {
911837            } 
912838        } 
913839
840+         #[ test]  
841+         fn  push_pop ( )  { 
842+             let  f = tempfile:: tempfile ( ) . expect ( "failed to open temporary file" ) ; 
843+             let  mut  manager =
844+                 MetadataManager :: from_file ( f) . expect ( "failed to initialize metadata manager" ) ; 
845+ 
846+             // Add 4 pages with increasing snapshots; the orphan page list will look like this: 
847+             // [1, 2, 3, 4] 
848+             manager. orphan_pages ( ) . push ( OrphanPage :: new ( page_id ! ( 1 ) ,  1 ) ) . expect ( "push failed" ) ; 
849+             manager. orphan_pages ( ) . push ( OrphanPage :: new ( page_id ! ( 2 ) ,  2 ) ) . expect ( "push failed" ) ; 
850+             manager. orphan_pages ( ) . push ( OrphanPage :: new ( page_id ! ( 3 ) ,  3 ) ) . expect ( "push failed" ) ; 
851+             manager. orphan_pages ( ) . push ( OrphanPage :: new ( page_id ! ( 4 ) ,  4 ) ) . expect ( "push failed" ) ; 
852+             manager. commit ( ) . expect ( "commit failed" ) ; 
853+ 
854+             // Pop 3 pages; orphan page list: [(claimed), (claimed), (claimed), 4] 
855+             manager. orphan_pages ( ) . pop ( 3 ) . expect ( "pop failed" ) ; 
856+             manager. orphan_pages ( ) . pop ( 3 ) . expect ( "pop failed" ) ; 
857+             manager. orphan_pages ( ) . pop ( 3 ) . expect ( "pop failed" ) ; 
858+             manager. commit ( ) . expect ( "commit failed" ) ; 
859+ 
860+             // Push 2 new pages, again with increasing snapshots; orphan page list: [5, 6, 
861+             // (claimed), 4] 
862+             manager. orphan_pages ( ) . push ( OrphanPage :: new ( page_id ! ( 5 ) ,  5 ) ) . expect ( "push failed" ) ; 
863+             manager. orphan_pages ( ) . push ( OrphanPage :: new ( page_id ! ( 6 ) ,  6 ) ) . expect ( "push failed" ) ; 
864+             manager. commit ( ) . expect ( "commit failed" ) ; 
865+ 
866+             // Pop 2 pages; orphan page list: [(claimed), 6, (claimed), (claimed)] 
867+             manager. orphan_pages ( ) . pop ( 5 ) . expect ( "pop failed" ) ; 
868+             manager. orphan_pages ( ) . pop ( 5 ) . expect ( "pop failed" ) ; 
869+             manager. commit ( ) . expect ( "commit failed" ) ; 
870+ 
871+             assert_eq ! ( 
872+                 manager. orphan_pages( ) . iter( ) . map( |orphan| orphan. page_id) . collect:: <Vec <_>>( ) , 
873+                 [ 6 ] 
874+             ) ; 
875+         } 
876+ 
914877        #[ test]  
915878        fn  crash_recovery ( )  { 
916879            let  f = tempfile:: tempfile ( ) . expect ( "failed to open temporary file" ) ; 
0 commit comments