diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 6132731688f5..8bd262196411 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -1,7 +1,32 @@ // // Page Cache holds all the different page versions and WAL records // -// The Page Cache is currenusing RocksDB for storing wal records and full page images, keyed by the RelFileNode, blocknumber, and the LSN. +// Currently, the page cache uses RocksDB to store WAL wal records and +// full page images, keyed by the RelFileNode, blocknumber, and the +// LSN. +// +// On async vs blocking: +// +// All the functions that can block for any significant length use +// Tokio async, and are marked as 'async'. That currently includes all +// the "Get" functions that get a page image or a relation size. The +// "Put" functions that add base images or WAL records to the cache +// cannot block. +// +// However, we have a funny definition of blocking: waiting on a Mutex +// that protects the in-memory data structures is not considered as +// blocking, as those are short waits, and there is no deadlock +// risk. It is *not* OK to do I/O or wait on other threads while +// holding a Mutex, however. +// +// Another wart is that we currently consider the RocksDB operations +// to be non-blocking, and we do those while holding a lock, and +// without async. That's fantasy, as RocksDB will do I/O, possibly a +// lot of it. But that's not a correctness issue, since the RocksDB +// calls will not call back to any of the other functions in page +// server. RocksDB is just a stopgap solution, to be replaced with +// something else, so it doesn't seem worth it to wrangle those calls +// into async model. // use crate::restore_local_repo::restore_timeline; @@ -343,139 +368,15 @@ impl WALRecord { } } -// Public interface functions - impl PageCache { - async fn do_gc(&self, conf: &PageServerConf) -> anyhow::Result { - let mut minbuf = BytesMut::new(); - let mut maxbuf = BytesMut::new(); - let cf = self - .db - .cf_handle(rocksdb::DEFAULT_COLUMN_FAMILY_NAME) - .unwrap(); - loop { - thread::sleep(conf.gc_period); - let last_lsn = self.get_last_valid_lsn(); - if last_lsn > conf.gc_horizon { - let horizon = last_lsn - conf.gc_horizon; - let mut maxkey = CacheKey { - tag: BufferTag { - rel: RelTag { - spcnode: u32::MAX, - dbnode: u32::MAX, - relnode: u32::MAX, - forknum: u8::MAX, - }, - blknum: u32::MAX, - }, - lsn: u64::MAX, - }; - let now = Instant::now(); - let mut reconstructed = 0u64; - let mut truncated = 0u64; - loop { - maxbuf.clear(); - maxkey.pack(&mut maxbuf); - let mut iter = self.db.iterator(rocksdb::IteratorMode::From( - &maxbuf[..], - rocksdb::Direction::Reverse, - )); - if let Some((k, v)) = iter.next() { - minbuf.clear(); - minbuf.extend_from_slice(&v); - let content = CacheEntryContent::unpack(&mut minbuf); - minbuf.clear(); - minbuf.extend_from_slice(&k); - let key = CacheKey::unpack(&mut minbuf); - - // Construct boundaries for old records cleanup - maxkey.tag = key.tag; - let last_lsn = key.lsn; - maxkey.lsn = min(horizon, last_lsn); // do not remove last version - - let mut minkey = maxkey.clone(); - minkey.lsn = 0; + // Public GET interface functions - // reconstruct most recent page version - if let Some(rec) = content.wal_record { - trace!("Reconstruct most recent page {:?}", key); - // force reconstruction of most recent page version - self.walredo_mgr.request_redo(key.tag, rec.lsn).await?; - reconstructed += 1; - } - - maxbuf.clear(); - maxkey.pack(&mut maxbuf); - - if last_lsn > horizon { - // locate most recent record before horizon - let mut iter = self.db.iterator(rocksdb::IteratorMode::From( - &maxbuf[..], - rocksdb::Direction::Reverse, - )); - if let Some((k, v)) = iter.next() { - minbuf.clear(); - minbuf.extend_from_slice(&v); - let content = CacheEntryContent::unpack(&mut minbuf); - if let Some(rec) = content.wal_record { - minbuf.clear(); - minbuf.extend_from_slice(&k); - let key = CacheKey::unpack(&mut minbuf); - - trace!("Reconstruct horizon page {:?}", key); - self.walredo_mgr.request_redo(key.tag, rec.lsn).await?; - truncated += 1; - } - } - } - // remove records prior to horizon - minbuf.clear(); - minkey.pack(&mut minbuf); - trace!("Delete records in range {:?}..{:?}", minkey, maxkey); - self.db.delete_range_cf(cf, &minbuf[..], &maxbuf[..])?; - - maxkey = minkey; - } else { - break; - } - } - info!("Garbage collection completed in {:?}: {} pages reconstructed, {} version histories truncated", now.elapsed(), reconstructed, truncated); - } - } - } - - async fn wait_lsn(&self, req_lsn: u64) -> anyhow::Result { - let mut lsn = req_lsn; - //When invalid LSN is requested, it means "don't wait, return latest version of the page" - //This is necessary for bootstrap. - if lsn == 0 { - lsn = self.last_valid_lsn.load(Ordering::Acquire); - trace!( - "walreceiver doesn't work yet last_valid_lsn {}, requested {}", - self.last_valid_lsn.load(Ordering::Acquire), - lsn - ); - } - self.seqwait_lsn - .wait_for_timeout(lsn, TIMEOUT) - .await - .with_context(|| { - format!( - "Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive", - lsn >> 32, - lsn & 0xffff_ffff - ) - })?; - - Ok(lsn) - } - - // - // GetPage@LSN - // - // Returns an 8k page image - // + /// + /// GetPage@LSN + /// + /// Returns an 8k page image + /// pub async fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: u64) -> anyhow::Result { self.num_getpage_requests.fetch_add(1, Ordering::Relaxed); @@ -539,13 +440,56 @@ impl PageCache { Ok(page_img) } - // - // Collect all the WAL records that are needed to reconstruct a page - // image for the given cache entry. - // - // Returns an old page image (if any), and a vector of WAL records to apply - // over it. - // + /// + /// Get size of relation at given LSN. + /// + pub async fn relsize_get(&self, rel: &RelTag, lsn: u64) -> anyhow::Result { + self.wait_lsn(lsn).await?; + return self.relsize_get_nowait(rel, lsn); + } + + /// + /// Does relation exist at given LSN? + /// + pub async fn relsize_exist(&self, rel: &RelTag, req_lsn: u64) -> anyhow::Result { + let lsn = self.wait_lsn(req_lsn).await?; + + let key = CacheKey { + tag: BufferTag { + rel: *rel, + blknum: u32::MAX, + }, + lsn, + }; + let mut buf = BytesMut::new(); + key.pack(&mut buf); + let mut iter = self.db.iterator(rocksdb::IteratorMode::From( + &buf[..], + rocksdb::Direction::Reverse, + )); + if let Some((k, _v)) = iter.next() { + buf.clear(); + buf.extend_from_slice(&k); + let tag = BufferTag::unpack(&mut buf); + if tag.rel == *rel { + debug!("Relation {:?} exists at {}", rel, lsn); + return Ok(true); + } + } + debug!("Relation {:?} doesn't exist at {}", rel, lsn); + Ok(false) + } + + // Other public functions, for updating the page cache. + // These are used by the WAL receiver and WAL redo. + + /// + /// Collect all the WAL records that are needed to reconstruct a page + /// image for the given cache entry. + /// + /// Returns an old page image (if any), and a vector of WAL records to apply + /// over it. + /// pub fn collect_records_for_apply(&self, tag: BufferTag, lsn: u64) -> (Option, Vec) { let minkey = CacheKey { tag: BufferTag { @@ -605,9 +549,9 @@ impl PageCache { (base_img, records) } - // - // Adds a WAL record to the page cache - // + /// + /// Adds a WAL record to the page cache + /// pub fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) { let lsn = rec.lsn; let key = CacheKey { tag, lsn }; @@ -630,13 +574,17 @@ impl PageCache { self.num_wal_records.fetch_add(1, Ordering::Relaxed); } - // - // Adds a relation-wide WAL record (like truncate) to the page cache, - // associating it with all pages started with specified block number - // - pub async fn put_rel_wal_record(&self, tag: BufferTag, rec: WALRecord) -> anyhow::Result<()> { + /// + /// Adds a relation-wide WAL record (like truncate) to the page cache, + /// associating it with all pages started with specified block number + /// + pub fn put_rel_wal_record(&self, tag: BufferTag, rec: WALRecord) -> anyhow::Result<()> { let mut key = CacheKey { tag, lsn: rec.lsn }; - let old_rel_size = self.relsize_get(&tag.rel, u64::MAX).await?; + + // What was the size of the relation before this record? + let last_lsn = self.last_valid_lsn.load(Ordering::Acquire); + let old_rel_size = self.relsize_get_nowait(&tag.rel, last_lsn)?; + let content = CacheEntryContent { page_image: None, wal_record: Some(rec), @@ -661,9 +609,9 @@ impl PageCache { Ok(()) } - // - // Memorize a full image of a page version - // + /// + /// Memorize a full image of a page version + /// pub fn put_page_image(&self, tag: BufferTag, lsn: u64, img: Bytes) { let key = CacheKey { tag, lsn }; let content = CacheEntryContent { @@ -685,7 +633,57 @@ impl PageCache { self.num_page_images.fetch_add(1, Ordering::Relaxed); } - // + pub fn create_database( + &self, + lsn: u64, + db_id: Oid, + tablespace_id: Oid, + src_db_id: Oid, + src_tablespace_id: Oid, + ) -> anyhow::Result<()> { + let mut buf = BytesMut::new(); + let key = CacheKey { + tag: BufferTag { + rel: RelTag { + spcnode: src_tablespace_id, + dbnode: src_db_id, + relnode: 0, + forknum: 0u8, + }, + blknum: 0, + }, + lsn: 0, + }; + key.pack(&mut buf); + let iter = self.db.iterator(rocksdb::IteratorMode::From( + &buf[..], + rocksdb::Direction::Forward, + )); + let mut n = 0; + for (k, v) in iter { + buf.clear(); + buf.extend_from_slice(&k); + let mut key = CacheKey::unpack(&mut buf); + if key.tag.rel.spcnode != src_tablespace_id || key.tag.rel.dbnode != src_db_id { + break; + } + key.tag.rel.spcnode = tablespace_id; + key.tag.rel.dbnode = db_id; + key.lsn = lsn; + buf.clear(); + key.pack(&mut buf); + + self.db.put(&buf[..], v)?; + n += 1; + } + info!( + "Create database {}/{}, copy {} entries", + tablespace_id, db_id, n + ); + Ok(()) + } + + /// Remember that WAL has been received and added to the page cache up to the given LSN pub fn advance_last_valid_lsn(&self, lsn: u64) { let mut shared = self.shared.lock().unwrap(); @@ -707,9 +705,11 @@ impl PageCache { } } - // - // NOTE: this updates last_valid_lsn as well. - // + /// + /// Remember the (end of) last valid WAL record remembered in the page cache. + /// + /// NOTE: this updates last_valid_lsn as well. + /// pub fn advance_last_record_lsn(&self, lsn: u64) { let mut shared = self.shared.lock().unwrap(); @@ -725,7 +725,11 @@ impl PageCache { self.last_record_lsn.store(lsn, Ordering::Relaxed); } - // + /// + /// Remember the beginning of valid WAL. + /// + /// TODO: This should be called by garbage collection, so that if an older + /// page is requested, we will return an error to the requestor. pub fn _advance_first_valid_lsn(&self, lsn: u64) { let mut shared = self.shared.lock().unwrap(); @@ -762,11 +766,31 @@ impl PageCache { shared.last_record_lsn } - pub async fn relsize_get(&self, rel: &RelTag, req_lsn: u64) -> anyhow::Result { - let mut lsn = req_lsn; - if lsn != u64::MAX { - lsn = self.wait_lsn(lsn).await?; + // + // Get statistics to be displayed in the user interface. + // + pub fn get_stats(&self) -> PageCacheStats { + PageCacheStats { + num_entries: self.num_entries.load(Ordering::Relaxed), + num_page_images: self.num_page_images.load(Ordering::Relaxed), + num_wal_records: self.num_wal_records.load(Ordering::Relaxed), + num_getpage_requests: self.num_getpage_requests.load(Ordering::Relaxed), + first_valid_lsn: self.first_valid_lsn.load(Ordering::Relaxed), + last_valid_lsn: self.last_valid_lsn.load(Ordering::Relaxed), + last_record_lsn: self.last_record_lsn.load(Ordering::Relaxed), } + } + + // Internal functions + + // + // Internal function to get relation size at given LSN. + // + // The caller must ensure that WAL has been received up to 'lsn'. + // + fn relsize_get_nowait(&self, rel: &RelTag, lsn: u64) -> anyhow::Result { + + assert!(lsn <= self.last_valid_lsn.load(Ordering::Acquire)); let mut key = CacheKey { tag: BufferTag { @@ -812,98 +836,141 @@ impl PageCache { Ok(0) } - pub async fn relsize_exist(&self, rel: &RelTag, req_lsn: u64) -> anyhow::Result { - let lsn = self.wait_lsn(req_lsn).await?; + async fn do_gc(&self, conf: &PageServerConf) -> anyhow::Result { - let key = CacheKey { - tag: BufferTag { - rel: *rel, - blknum: u32::MAX, - }, - lsn, - }; - let mut buf = BytesMut::new(); - key.pack(&mut buf); - let mut iter = self.db.iterator(rocksdb::IteratorMode::From( - &buf[..], - rocksdb::Direction::Reverse, - )); - if let Some((k, _v)) = iter.next() { - buf.clear(); - buf.extend_from_slice(&k); - let tag = BufferTag::unpack(&mut buf); - if tag.rel == *rel { - debug!("Relation {:?} exists at {}", rel, lsn); - return Ok(true); + let mut minbuf = BytesMut::new(); + let mut maxbuf = BytesMut::new(); + let cf = self + .db + .cf_handle(rocksdb::DEFAULT_COLUMN_FAMILY_NAME) + .unwrap(); + loop { + thread::sleep(conf.gc_period); + let last_lsn = self.get_last_valid_lsn(); + if last_lsn > conf.gc_horizon { + let horizon = last_lsn - conf.gc_horizon; + let mut maxkey = CacheKey { + tag: BufferTag { + rel: RelTag { + spcnode: u32::MAX, + dbnode: u32::MAX, + relnode: u32::MAX, + forknum: u8::MAX, + }, + blknum: u32::MAX, + }, + lsn: u64::MAX, + }; + let now = Instant::now(); + let mut reconstructed = 0u64; + let mut truncated = 0u64; + loop { + maxbuf.clear(); + maxkey.pack(&mut maxbuf); + let mut iter = self.db.iterator(rocksdb::IteratorMode::From( + &maxbuf[..], + rocksdb::Direction::Reverse, + )); + if let Some((k, v)) = iter.next() { + minbuf.clear(); + minbuf.extend_from_slice(&v); + let content = CacheEntryContent::unpack(&mut minbuf); + minbuf.clear(); + minbuf.extend_from_slice(&k); + let key = CacheKey::unpack(&mut minbuf); + + // Construct boundaries for old records cleanup + maxkey.tag = key.tag; + let last_lsn = key.lsn; + maxkey.lsn = min(horizon, last_lsn); // do not remove last version + + let mut minkey = maxkey.clone(); + minkey.lsn = 0; + + // reconstruct most recent page version + if let Some(rec) = content.wal_record { + trace!("Reconstruct most recent page {:?}", key); + // force reconstruction of most recent page version + self.walredo_mgr.request_redo(key.tag, rec.lsn).await?; + reconstructed += 1; + } + + maxbuf.clear(); + maxkey.pack(&mut maxbuf); + + if last_lsn > horizon { + // locate most recent record before horizon + let mut iter = self.db.iterator(rocksdb::IteratorMode::From( + &maxbuf[..], + rocksdb::Direction::Reverse, + )); + if let Some((k, v)) = iter.next() { + minbuf.clear(); + minbuf.extend_from_slice(&v); + let content = CacheEntryContent::unpack(&mut minbuf); + if let Some(rec) = content.wal_record { + minbuf.clear(); + minbuf.extend_from_slice(&k); + let key = CacheKey::unpack(&mut minbuf); + + trace!("Reconstruct horizon page {:?}", key); + self.walredo_mgr.request_redo(key.tag, rec.lsn).await?; + truncated += 1; + } + } + } + // remove records prior to horizon + minbuf.clear(); + minkey.pack(&mut minbuf); + trace!("Delete records in range {:?}..{:?}", minkey, maxkey); + self.db.delete_range_cf(cf, &minbuf[..], &maxbuf[..])?; + + maxkey = minkey; + } else { + break; + } + } + info!("Garbage collection completed in {:?}: {} pages reconstructed, {} version histories truncated", now.elapsed(), reconstructed, truncated); } } - debug!("Relation {:?} doesn't exist at {}", rel, lsn); - Ok(false) } - pub fn get_stats(&self) -> PageCacheStats { - PageCacheStats { - num_entries: self.num_entries.load(Ordering::Relaxed), - num_page_images: self.num_page_images.load(Ordering::Relaxed), - num_wal_records: self.num_wal_records.load(Ordering::Relaxed), - num_getpage_requests: self.num_getpage_requests.load(Ordering::Relaxed), - first_valid_lsn: self.first_valid_lsn.load(Ordering::Relaxed), - last_valid_lsn: self.last_valid_lsn.load(Ordering::Relaxed), - last_record_lsn: self.last_record_lsn.load(Ordering::Relaxed), + // + // Wait until WAL has been received up to the given LSN. + // + async fn wait_lsn(&self, req_lsn: u64) -> anyhow::Result { + let mut lsn = req_lsn; + //When invalid LSN is requested, it means "don't wait, return latest version of the page" + //This is necessary for bootstrap. + if lsn == 0 { + lsn = self.last_valid_lsn.load(Ordering::Acquire); + trace!( + "walreceiver doesn't work yet last_valid_lsn {}, requested {}", + self.last_valid_lsn.load(Ordering::Acquire), + lsn + ); } - } - pub fn create_database( - &self, - lsn: u64, - db_id: Oid, - tablespace_id: Oid, - src_db_id: Oid, - src_tablespace_id: Oid, - ) -> anyhow::Result<()> { - let mut buf = BytesMut::new(); - let key = CacheKey { - tag: BufferTag { - rel: RelTag { - spcnode: src_tablespace_id, - dbnode: src_db_id, - relnode: 0, - forknum: 0u8, - }, - blknum: 0, - }, - lsn: 0, - }; - key.pack(&mut buf); - let iter = self.db.iterator(rocksdb::IteratorMode::From( - &buf[..], - rocksdb::Direction::Forward, - )); - let mut n = 0; - for (k, v) in iter { - buf.clear(); - buf.extend_from_slice(&k); - let mut key = CacheKey::unpack(&mut buf); - if key.tag.rel.spcnode != src_tablespace_id || key.tag.rel.dbnode != src_db_id { - break; - } - key.tag.rel.spcnode = tablespace_id; - key.tag.rel.dbnode = db_id; - key.lsn = lsn; - buf.clear(); - key.pack(&mut buf); + self.seqwait_lsn + .wait_for_timeout(lsn, TIMEOUT) + .await + .with_context(|| { + format!( + "Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive", + lsn >> 32, + lsn & 0xffff_ffff + ) + })?; - self.db.put(&buf[..], v)?; - n += 1; - } - info!( - "Create database {}/{}, copy {} entries", - tablespace_id, db_id, n - ); - Ok(()) + Ok(lsn) } } +// +// Get statistics to be displayed in the user interface. +// +// This combines the stats from all PageCache instances +// pub fn get_stats() -> PageCacheStats { let pcaches = PAGECACHES.lock().unwrap(); diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 65c543c64e2a..6671c13d2563 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -266,7 +266,7 @@ async fn walreceiver_main( rec: recdata.clone(), main_data_offset: decoded.main_data_offset as u32, }; - pcache.put_rel_wal_record(tag, rec).await?; + pcache.put_rel_wal_record(tag, rec)?; } } else if decoded.xl_rmid == pg_constants::RM_DBASE_ID && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)