@@ -170,13 +170,8 @@ pub struct MetadataSlot {
170170 page_count : u32 ,
171171 /// Total size of `orphan_pages` (including reclaimed pages)
172172 orphan_pages_len : u32 ,
173- /// Index of the first reclaimed page inside of `orphan_pages`
174- reclaimed_orphans_start : u32 ,
175- /// Number of reclaimed pages inside of `orphan_pages`
176- reclaimed_orphans_end : u32 ,
177- /// Unused data to allow this structure to be properly aligned. This padding is stored on disk
178- /// to improve runtime performance
179- padding : u32 ,
173+ /// Number of reclaimed pages from the `orphan_pages` list
174+ reclaimed_orphans_len : u32 ,
180175}
181176
182177impl MetadataSlot {
@@ -251,24 +246,16 @@ impl MetadataSlot {
251246 #[ inline]
252247 #[ must_use]
253248 fn reclaimed_range ( & self ) -> Range < usize > {
254- self . reclaimed_orphans_start as usize .. self . reclaimed_orphans_end as usize
249+ 0 .. self . reclaimed_orphans_len as usize
255250 }
256251
257252 /// Returns the range of pages that are actually orphans (not reclaimed) in the `orphan_pages`
258253 /// list.
259- ///
260- /// This returns 2 disjoint ranges because there may be some reclaimed pages in the middle.
261254 #[ inline]
262255 #[ must_use]
263- fn actual_orphans_ranges ( & self ) -> ( Range < usize > , Range < usize > ) {
264- debug_assert ! (
265- self . orphan_pages_len >= self . reclaimed_orphans_start + self . reclaimed_orphans_end
266- ) ;
267- (
268- 0 ..( self . reclaimed_orphans_start as usize ) ,
269- ( self . reclaimed_orphans_start as usize + self . reclaimed_orphans_end as usize ) ..
270- ( self . orphan_pages_len as usize ) ,
271- )
256+ fn actual_orphans_range ( & self ) -> Range < usize > {
257+ debug_assert ! ( self . orphan_pages_len >= self . reclaimed_orphans_len) ;
258+ self . reclaimed_orphans_len as usize ..self . orphan_pages_len as usize
272259 }
273260
274261 /// Computes the hash for this slot.
@@ -310,11 +297,7 @@ impl HashedMetadataSlot {
310297 }
311298 // Check that the number of reclaimed pages doesn't exceed the total number of orphan
312299 // pages.
313- let reclaimed_orphans_end = self
314- . reclaimed_orphans_start
315- . checked_add ( self . reclaimed_orphans_end )
316- . ok_or ( CorruptedMetadataError ) ?;
317- if self . orphan_pages_len < reclaimed_orphans_end {
300+ if self . orphan_pages_len < self . reclaimed_orphans_len {
318301 return Err ( CorruptedMetadataError ) ;
319302 }
320303 // Check the hash.
@@ -559,11 +542,43 @@ impl MetadataManager {
559542 Ok ( ( ) )
560543 }
561544
545+ fn compact_orphans ( & mut self ) {
546+ let ( active, dirty, list) = self . parts_mut ( ) ;
547+ let reclaimed_range = active. reclaimed_range ( ) ;
548+ let actual_orphans_range = dirty. actual_orphans_range ( ) ;
549+
550+ // If orphan page list has observed N pushes and M pops, then first of all note that:
551+ //
552+ // - N == actual_orphans_range.len()
553+ // - M == reclaimed_range.len()
554+ // - N - M = actual_orphans_range.len()
555+ // - N >= M, or equivalently N - M >= 0
556+ //
557+ // Here we move the orphans to the start of the list when M >= N - M, which implies N <=
558+ // 2M. If that condition is satisfied, we will move N - M items. In total, if the condition
559+ // is satisfied, we will have performed:
560+ //
561+ // - N pushes
562+ // - M pops
563+ // - N - M copies
564+ //
565+ // for a total of N + M + N - M = 2N operations, which means that adding items to the
566+ // orphan page list still takes O(1) amortized time.
567+ if reclaimed_range. len ( ) >= actual_orphans_range. len ( ) {
568+ dirty. orphan_pages_len = actual_orphans_range. len ( ) as u32 ;
569+ dirty. reclaimed_orphans_len = 0 ;
570+ list. copy_within ( actual_orphans_range, 0 ) ;
571+ }
572+ }
573+
562574 /// Saves the metadata to the storage device, and promotes the dirty slot to the active slot.
563575 ///
564576 /// After calling this method, a new dirty slot is produced with the same contents as the new
565577 /// active slot, and an auto-incremented snapshot ID.
566578 pub fn commit ( & mut self ) -> io:: Result < ( ) > {
579+ // Compact the orphan page list if there's enough room
580+ self . compact_orphans ( ) ;
581+
567582 // First make sure the changes from the dirty slot are on disk
568583 self . dirty_slot_mut ( ) . update_hash ( ) ;
569584 debug_assert ! ( self . dirty_slot_mut( ) . verify_integrity( ) . is_ok( ) ) ;
@@ -622,7 +637,7 @@ impl<'a> OrphanPages<'a> {
622637 #[ inline]
623638 pub fn len ( & self ) -> usize {
624639 let m = self . manager . dirty_slot ( ) ;
625- ( m. orphan_pages_len as usize ) - ( m. reclaimed_orphans_end as usize )
640+ ( m. orphan_pages_len as usize ) - ( m. reclaimed_orphans_len as usize )
626641 }
627642
628643 /// Maximum number of orphan pages that this list can contain without resizing.
@@ -640,8 +655,8 @@ impl<'a> OrphanPages<'a> {
640655 /// Returns an iterator that yields the IDs of orphan pages.
641656 pub fn iter ( & self ) -> impl FusedIterator < Item = OrphanPage > + use < ' _ > {
642657 let list = self . manager . raw_orphan_pages ( ) ;
643- let ( left , right ) = self . manager . dirty_slot ( ) . actual_orphans_ranges ( ) ;
644- list[ right ] . iter ( ) . copied ( ) . chain ( list [ left ] . iter ( ) . copied ( ) )
658+ let range = self . manager . dirty_slot ( ) . actual_orphans_range ( ) ;
659+ list[ range ] . iter ( ) . copied ( )
645660 }
646661
647662 /// Adds a page to the orphan page list, increasing the capacity of the list if necessary.
@@ -656,43 +671,11 @@ impl<'a> OrphanPages<'a> {
656671
657672 /// Adds a page to the orphan page list if there is enough capacity.
658673 pub fn push_within_capacity ( & mut self , orphan : OrphanPage ) -> Result < ( ) , OrphanPage > {
659- // To make sure the previous snapshot is always valid, we cannot modify orphan pages that
660- // are referenced by the previous snapshot. We can only modify the reclaimed orphans
661- // slice, or the the additional orphans elements added at the end of the list (if any).
662- //
663- // Particular care should be taken because a `pop()` may be followed by a `push()`, and
664- // it's important that the `push()` does not overwrite data from the previous snapshot.
665-
666- let ( active, dirty, list) = self . manager . parts_mut ( ) ;
667-
668- // Check if we can write in the reclaimed slice. We can only write in the intersection
669- // between the active reclaimed range and the dirty reclaimed range, and only at the bounds
670- // of the dirty reclaimed range.
671- let active_reclaimed_range = active. reclaimed_range ( ) ;
672- let dirty_reclaimed_range = dirty. reclaimed_range ( ) ;
673- let intersection = active_reclaimed_range. start . max ( dirty_reclaimed_range. start ) ..
674- active_reclaimed_range. end . min ( dirty_reclaimed_range. end ) ;
675-
676- if !intersection. is_empty ( ) {
677- if intersection. start == dirty_reclaimed_range. start {
678- list[ dirty_reclaimed_range. start ] = orphan;
679- dirty. reclaimed_orphans_start += 1 ;
680- dirty. reclaimed_orphans_end -= 1 ;
681- return Ok ( ( ) ) ;
682- } else if intersection. end == dirty_reclaimed_range. end {
683- list[ dirty_reclaimed_range. end - 1 ] = orphan;
684- dirty. reclaimed_orphans_end -= 1 ;
685- return Ok ( ( ) ) ;
686- }
687- }
674+ let ( _, dirty, list) = self . manager . parts_mut ( ) ;
688675
689- // We need to write at the end of the list. We can only write past the active and dirty
690- // list.
691- let ( _, active_orphans_range) = active. actual_orphans_ranges ( ) ;
692- let ( _, dirty_orphans_range) = dirty. actual_orphans_ranges ( ) ;
693- let index = active_orphans_range. end . max ( dirty_orphans_range. end ) ;
694- if index < list. len ( ) {
695- list[ index] = orphan;
676+ let range = dirty. actual_orphans_range ( ) ;
677+ if range. end < list. len ( ) {
678+ list[ range. end ] = orphan;
696679 dirty. orphan_pages_len += 1 ;
697680 return Ok ( ( ) ) ;
698681 }
@@ -710,33 +693,24 @@ impl<'a> OrphanPages<'a> {
710693 /// exists.
711694 pub fn pop ( & mut self , snapshot_threshold : SnapshotId ) -> Option < OrphanPage > {
712695 let ( _, dirty, list) = self . manager . parts_mut ( ) ;
713- let ( left , right ) = dirty. actual_orphans_ranges ( ) ;
696+ let range = dirty. actual_orphans_range ( ) ;
714697
715- // The following code checks the `left` and `right` ranges for orphaned pages that have an
716- // `orphaned_at()` equal or below `snapshot_threshold`.
717- //
718- // Instead of scanning the whole `left` and `right` lists, the code only check the boundary
719- // elements. The assumption is that snapshot IDs are always increasing, never decreasing,
720- // and therefore each call to `push()` always adds pages with an increasing
721- // `orphaned_at()`. So if the first element has an `orphaned_at()` that is already too
722- // high, there's no point in checking the other elements, because they will also be above
723- // the threshold.
724-
725- if !right. is_empty ( ) {
726- let orphan = list[ right. start ] ;
698+ if !range. is_empty ( ) {
699+ let orphan = list[ range. start ] ;
727700 if orphan. orphaned_at ( ) <= snapshot_threshold {
728- dirty. reclaimed_orphans_end += 1 ;
701+ dirty. reclaimed_orphans_len += 1 ;
729702 return Some ( orphan) ;
730703 }
731704 }
732705
733- if !left. is_empty ( ) {
734- let orphan = list[ left. end - 1 ] ;
735- if orphan. orphaned_at ( ) <= snapshot_threshold {
736- dirty. reclaimed_orphans_start -= 1 ;
737- dirty. reclaimed_orphans_end += 1 ;
738- return Some ( orphan) ;
739- }
706+ #[ cfg( debug_assertions) ]
707+ {
708+ // TODO: remove
709+ let valid_candidates = list[ range]
710+ . iter ( )
711+ . filter ( |orphan| orphan. orphaned_at ( ) <= snapshot_threshold)
712+ . count ( ) ;
713+ debug_assert_eq ! ( valid_candidates, 0 ) ;
740714 }
741715
742716 None
@@ -911,6 +885,43 @@ mod tests {
911885 }
912886 }
913887
888+ #[ test]
889+ fn push_pop_2 ( ) {
890+ let f = tempfile:: tempfile ( ) . expect ( "failed to open temporary file" ) ;
891+ let mut manager =
892+ MetadataManager :: from_file ( f) . expect ( "failed to initialize metadata manager" ) ;
893+
894+ // Add 4 pages with increasing snapshots; the orphan page list will look like this:
895+ // [1, 2, 3, 4]
896+ manager. orphan_pages ( ) . push ( OrphanPage :: new ( page_id ! ( 1 ) , 1 ) ) . expect ( "push failed" ) ;
897+ manager. orphan_pages ( ) . push ( OrphanPage :: new ( page_id ! ( 2 ) , 2 ) ) . expect ( "push failed" ) ;
898+ manager. orphan_pages ( ) . push ( OrphanPage :: new ( page_id ! ( 3 ) , 3 ) ) . expect ( "push failed" ) ;
899+ manager. orphan_pages ( ) . push ( OrphanPage :: new ( page_id ! ( 4 ) , 4 ) ) . expect ( "push failed" ) ;
900+ manager. commit ( ) . expect ( "commit failed" ) ;
901+
902+ // Pop 3 pages; orphan page list: [(claimed), (claimed), (claimed), 4]
903+ manager. orphan_pages ( ) . pop ( 3 ) . expect ( "pop failed" ) ;
904+ manager. orphan_pages ( ) . pop ( 3 ) . expect ( "pop failed" ) ;
905+ manager. orphan_pages ( ) . pop ( 3 ) . expect ( "pop failed" ) ;
906+ manager. commit ( ) . expect ( "commit failed" ) ;
907+
908+ // Push 2 new pages, again with increasing snapshots; orphan page list: [5, 6,
909+ // (claimed), 4]
910+ manager. orphan_pages ( ) . push ( OrphanPage :: new ( page_id ! ( 5 ) , 5 ) ) . expect ( "push failed" ) ;
911+ manager. orphan_pages ( ) . push ( OrphanPage :: new ( page_id ! ( 6 ) , 6 ) ) . expect ( "push failed" ) ;
912+ manager. commit ( ) . expect ( "commit failed" ) ;
913+
914+ // Pop 2 pages; orphan page list: [(claimed), 6, (claimed), (claimed)]
915+ manager. orphan_pages ( ) . pop ( 5 ) . expect ( "pop failed" ) ;
916+ manager. orphan_pages ( ) . pop ( 5 ) . expect ( "pop failed" ) ;
917+ manager. commit ( ) . expect ( "commit failed" ) ;
918+
919+ assert_eq ! (
920+ manager. orphan_pages( ) . iter( ) . map( |orphan| orphan. page_id) . collect:: <Vec <_>>( ) ,
921+ [ 6 ]
922+ ) ;
923+ }
924+
914925 #[ test]
915926 fn crash_recovery ( ) {
916927 let f = tempfile:: tempfile ( ) . expect ( "failed to open temporary file" ) ;
0 commit comments