diff --git a/Cargo.toml b/Cargo.toml index 85ff43db..cf935632 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,8 +20,9 @@ path = "src/lib.rs" default = [] lz4 = ["dep:lz4_flex"] miniz = ["dep:miniz_oxide"] +zstd = ["dep:zstd"] bloom = [] -all = ["bloom", "lz4", "miniz"] +all = ["bloom", "lz4", "miniz", "zstd"] [dependencies] byteorder = "1.5.0" @@ -41,6 +42,7 @@ tempfile = "3.12.0" value-log = "1.1.0" varint-rs = "2.2.0" xxhash-rust = { version = "0.8.12", features = ["xxh3"] } +zstd = { version = "0.13.2", optional = true, features = ["experimental"] } [dev-dependencies] criterion = { version = "0.5.1", features = ["html_reports"] } @@ -73,13 +75,13 @@ required-features = ["bloom"] name = "block" harness = false path = "benches/block.rs" -required-features = ["lz4", "miniz"] +required-features = ["lz4", "miniz", "zstd"] [[bench]] name = "tree" harness = false path = "benches/tree.rs" -required-features = ["lz4", "miniz"] +required-features = ["lz4", "miniz", "zstd"] [[bench]] name = "level_manifest" diff --git a/benches/block.rs b/benches/block.rs index 88a22c67..8ea488e8 100644 --- a/benches/block.rs +++ b/benches/block.rs @@ -148,7 +148,13 @@ fn load_value_block_from_disk(c: &mut Criterion) { for comp_type in [ //CompressionType::None, CompressionType::Lz4, - //CompressionType::Miniz(3), + CompressionType::Miniz(3), + CompressionType::Miniz(6), + CompressionType::Zstd(-3), + CompressionType::Zstd(-1), + CompressionType::Zstd(1), + CompressionType::Zstd(3), + CompressionType::Zstd(12), ] { for block_size in [1, 4, 8, 16, 32, 64, 128] { let block_size = block_size * 1_024; diff --git a/benches/tree.rs b/benches/tree.rs index 1929db2b..3cef9588 100644 --- a/benches/tree.rs +++ b/benches/tree.rs @@ -181,7 +181,7 @@ fn tree_get_pairs(c: &mut Criterion) { { let folder = tempfile::tempdir().unwrap(); let tree = Config::new(folder) - .block_size(1_024) + .data_block_size(1_024) .block_cache(Arc::new(BlockCache::with_capacity_bytes(0))) .open() .unwrap(); @@ -219,7 +219,7 @@ fn tree_get_pairs(c: &mut Criterion) { { let folder = tempfile::tempdir().unwrap(); let tree = Config::new(folder) - .block_size(1_024) + .data_block_size(1_024) .block_cache(Arc::new(BlockCache::with_capacity_bytes(0))) .open() .unwrap(); @@ -262,7 +262,7 @@ fn disk_point_read(c: &mut Criterion) { let folder = tempdir().unwrap(); let tree = Config::new(folder) - .block_size(1_024) + .data_block_size(1_024) .block_cache(Arc::new(BlockCache::with_capacity_bytes(0))) .open() .unwrap(); @@ -300,7 +300,7 @@ fn disjoint_tree_minmax(c: &mut Criterion) { let folder = tempfile::tempdir().unwrap(); let tree = Config::new(folder) - .block_size(1_024) + .data_block_size(1_024) .block_cache(Arc::new(BlockCache::with_capacity_bytes(0))) .open() .unwrap(); diff --git a/src/blob_tree/compression.rs b/src/blob_tree/compression.rs index 5efdf037..8e11ea71 100644 --- a/src/blob_tree/compression.rs +++ b/src/blob_tree/compression.rs @@ -24,6 +24,9 @@ impl Compressor for MyCompressor { #[cfg(feature = "miniz")] CompressionType::Miniz(lvl) => miniz_oxide::deflate::compress_to_vec(bytes, lvl), + + #[cfg(feature = "zstd")] + CompressionType::Zstd(level) => zstd::bulk::compress(bytes, level)?, }) } @@ -39,6 +42,14 @@ impl Compressor for MyCompressor { #[cfg(feature = "miniz")] CompressionType::Miniz(_) => miniz_oxide::inflate::decompress_to_vec(bytes) .map_err(|_| value_log::Error::Decompress), + + #[cfg(feature = "zstd")] + CompressionType::Zstd(_) => zstd::bulk::decompress( + bytes, + // TODO: assuming 4GB output size max + u32::MAX as usize, + ) + .map_err(|_| value_log::Error::Decompress), } } } diff --git a/src/segment/block/mod.rs b/src/segment/block/mod.rs index eb5fa388..5d25a9bf 100644 --- a/src/segment/block/mod.rs +++ b/src/segment/block/mod.rs @@ -59,6 +59,12 @@ impl Block { miniz_oxide::inflate::decompress_to_vec(&bytes) .map_err(|_| crate::Error::Decompress(header.compression))? } + #[cfg(feature = "zstd")] + super::meta::CompressionType::Zstd(_) => { + // TODO: assuming 4GB output size max + zstd::bulk::decompress(&bytes, u32::MAX as usize) + .map_err(|_| crate::Error::Decompress(header.compression))? + } }; let mut bytes = Cursor::new(bytes); @@ -131,6 +137,9 @@ impl Block { #[cfg(feature = "miniz")] CompressionType::Miniz(level) => miniz_oxide::deflate::compress_to_vec(&buf, level), + + #[cfg(feature = "zstd")] + CompressionType::Zstd(level) => zstd::bulk::compress(&buf, level)?, }) } } diff --git a/src/segment/meta/compression.rs b/src/segment/meta/compression.rs index 590debb5..7365a75d 100644 --- a/src/segment/meta/compression.rs +++ b/src/segment/meta/compression.rs @@ -33,6 +33,17 @@ pub enum CompressionType { /// - 10 may save even more space than 9, but the speed trade off may not be worth it #[cfg(feature = "miniz")] Miniz(u8), + + /// zstd Compression + /// + /// Compression level (-128-22) can be adjusted. + /// + /// - -128~ -1 is fast compression level + /// - A level of `0` uses zstd's default (currently `3`). + /// - 1~19 normal compression level, higher is slower (1 is fastest, 3 is default, 12 is as fast as gzip level 6) + /// - 20~22 ultra compression level, increase memory on both compression and decompression + #[cfg(feature = "zstd")] + Zstd(i32), } impl Encode for CompressionType { @@ -56,6 +67,17 @@ impl Encode for CompressionType { writer.write_u8(2)?; writer.write_u8(*level)?; } + + #[cfg(feature = "zstd")] + Self::Zstd(level) => { + assert!( + *level >= -128 && *level <= 22, + "invalid zstd compression level" + ); + writer.write_u8(3)?; + // TODO: this is dependent on endianness + writer.write_u8(*level as u8)?; + } }; Ok(()) @@ -87,6 +109,19 @@ impl Decode for CompressionType { Ok(Self::Miniz(level)) } + #[cfg(feature = "zstd")] + 3 => { + // TODO: this is dependent on endianness + let level = reader.read_u8()? as i8 as i32; + + assert!( + level >= -128 && level <= 22, + "invalid zstd compression level" + ); + + Ok(Self::Zstd(level)) + } + tag => Err(DecodeError::InvalidTag(("CompressionType", tag))), } } @@ -94,19 +129,18 @@ impl Decode for CompressionType { impl std::fmt::Display for CompressionType { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "{}", - match self { - Self::None => "no compression", + match self { + Self::None => write!(f, "no compression"), - #[cfg(feature = "lz4")] - Self::Lz4 => "lz4", + #[cfg(feature = "lz4")] + Self::Lz4 => write!(f, "lz4"), - #[cfg(feature = "miniz")] - Self::Miniz(_) => "miniz", - } - ) + #[cfg(feature = "miniz")] + Self::Miniz(_) => write!(f, "miniz"), + + #[cfg(feature = "zstd")] + Self::Zstd(level) => write!(f, "zstd({})", level), + } } } @@ -118,6 +152,7 @@ mod tests { fn compression_serialize_none() -> crate::Result<()> { let serialized = CompressionType::None.encode_into_vec()?; assert_eq!(2, serialized.len()); + assert_eq!(vec![0u8, 0u8], serialized); Ok(()) } @@ -126,9 +161,10 @@ mod tests { use super::*; #[test_log::test] - fn compression_serialize_none() -> crate::Result<()> { + fn compression_serialize_lz4() -> crate::Result<()> { let serialized = CompressionType::Lz4.encode_into_vec()?; assert_eq!(2, serialized.len()); + assert_eq!(vec![1u8, 0u8], serialized); Ok(()) } } @@ -138,10 +174,26 @@ mod tests { use super::*; #[test_log::test] - fn compression_serialize_none() -> crate::Result<()> { + fn compression_serialize_miniz() -> crate::Result<()> { for lvl in 0..10 { let serialized = CompressionType::Miniz(lvl).encode_into_vec()?; assert_eq!(2, serialized.len()); + assert_eq!(vec![2u8, lvl], serialized); + } + Ok(()) + } + } + + #[cfg(feature = "zstd")] + mod zstd { + use super::*; + + #[test_log::test] + fn compression_serialize_zstd() -> crate::Result<()> { + for lvl in -128..22 { + let serialized = CompressionType::Zstd(lvl).encode_into_vec()?; + assert_eq!(2, serialized.len()); + assert_eq!(vec![3u8, lvl as u8], serialized); } Ok(()) } diff --git a/tests/blob_simple.rs b/tests/blob_simple.rs index d4962a2e..7a268bb9 100644 --- a/tests/blob_simple.rs +++ b/tests/blob_simple.rs @@ -132,3 +132,44 @@ fn blob_tree_simple_compressed_2() -> lsm_tree::Result<()> { Ok(()) } + +#[cfg(feature = "zstd")] +#[test] +fn blob_tree_simple_compressed_zstd() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let path = folder.path(); + + let tree = lsm_tree::Config::new(path) + .compression(lsm_tree::CompressionType::Zstd(3)) + .open_as_blob_tree()?; + + let big_value = b"neptune!".repeat(128_000); + + assert!(tree.get("big")?.is_none()); + tree.insert("big", &big_value, 0); + tree.insert("smol", "small value", 0); + + let value = tree.get("big")?.expect("should exist"); + assert_eq!(&*value, big_value); + + tree.flush_active_memtable(0)?; + + let value = tree.get("big")?.expect("should exist"); + assert_eq!(&*value, big_value); + + let value = tree.get("smol")?.expect("should exist"); + assert_eq!(&*value, b"small value"); + + let new_big_value = b"winter!".repeat(128_000); + tree.insert("big", &new_big_value, 1); + + let value = tree.get("big")?.expect("should exist"); + assert_eq!(&*value, new_big_value); + + tree.flush_active_memtable(0)?; + + let value = tree.get("big")?.expect("should exist"); + assert_eq!(&*value, new_big_value); + + Ok(()) +}