diff --git a/Cargo.toml b/Cargo.toml index 63b89ef..e35c75a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,3 +11,4 @@ thiserror = "1.0.22" [dev-dependencies] tempfile = "3.1.0" +cid = "0.6.0" diff --git a/examples/fromcarfile/cariter.rs b/examples/fromcarfile/cariter.rs new file mode 100644 index 0000000..296fe3b --- /dev/null +++ b/examples/fromcarfile/cariter.rs @@ -0,0 +1,95 @@ +use std::convert::TryFrom; +use std::io::{self, Read}; + +/// Read and unsigen varint (LEB128) from a reader. +/// +/// Code is based on the Rust compiler: +/// https://github.com/rust-lang/rust/blob/0beba9333754ead8febc5101fc5c35f7dcdfaadf/compiler/rustc_serialize/src/leb128.rs +pub fn read_u64_leb128(reader: &mut R) -> Result<(u64, usize), io::Error> { + let mut result = 0; + let mut shift = 0; + let mut position = 0; + let mut buf = [0]; + + loop { + reader.read_exact(&mut buf)?; + let byte = buf[0]; + position += 1; + if (byte & 0x80) == 0 { + result |= (byte as u64) << shift; + return Ok((result, position)); + } else { + result |= ((byte & 0x7F) as u64) << shift; + } + shift += 7; + } +} + +/// An iterator over a car file. +#[derive(Debug)] +pub struct CarIter { + /// The data we are iterating over + reader: R, + /// Position within the reader + pos: u64, +} + +impl CarIter { + pub fn new(mut reader: R) -> Self { + // Ignore the header for now + let (_header, bytes_read) = read_data(&mut reader).unwrap(); + CarIter { + reader, + pos: bytes_read, + } + } +} + +/// Read some data prefixed with a varint. +pub fn read_data(reader: &mut R) -> Result<(Vec, u64), io::Error> { + let (size, bytes_read): (u64, usize) = read_u64_leb128(reader)?; + let mut data = Vec::with_capacity(usize::try_from(size).unwrap()); + reader.take(size).read_to_end(&mut data)?; + Ok((data, u64::try_from(bytes_read).unwrap() + size)) +} + +/// Read a CID together with some data. +pub fn read_block(block: &[u8]) -> (Vec, Vec) { + // A block is a CID together with some data. + let (_version, version_offset) = read_u64_leb128(&mut &block[..]).unwrap(); + let (_codec, codec_offset) = read_u64_leb128(&mut &block[version_offset..]).unwrap(); + let (_multihash_code, multihash_code_offset) = + read_u64_leb128(&mut &block[version_offset + codec_offset..]).unwrap(); + let (multihash_size, multihash_size_offset) = + read_u64_leb128(&mut &block[version_offset + codec_offset + multihash_code_offset..]) + .unwrap(); + let cid_size = version_offset + + codec_offset + + multihash_code_offset + + multihash_size_offset + + usize::try_from(multihash_size).unwrap(); + let (cid, data) = block.split_at(cid_size); + (cid.to_vec(), data.to_vec()) +} + +impl Iterator for CarIter { + type Item = (Vec, Vec, u64); + + fn next(&mut self) -> Option { + match read_data(&mut self.reader) { + Ok((block, bytes_read)) => { + let (cid, data) = read_block(&block); + + // Get the current position in order to return it and update it for the next + // iteration. + let pos = self.pos; + self.pos += bytes_read; + + Some((cid, data, pos)) + } + // We might have hit the end of the file => stop iterating + Err(error) if error.kind() == std::io::ErrorKind::UnexpectedEof => None, + Err(error) => panic!(error), + } + } +} diff --git a/examples/fromcarfile/main.rs b/examples/fromcarfile/main.rs new file mode 100644 index 0000000..8fd148d --- /dev/null +++ b/examples/fromcarfile/main.rs @@ -0,0 +1,76 @@ +mod cariter; + +use std::convert::TryFrom; +use std::env; +use std::fs::File; +use std::io::{BufReader, Read, Seek, SeekFrom}; + +use cid::Cid; +use storethehash::index::Index; +use storethehash::primary::{PrimaryError, PrimaryStorage}; + +use cariter::CarIter; + +/// CAR file storage implementation. +/// +/// The primary storage is a CAR file. +#[derive(Debug)] +struct CarFile(File); + +impl CarFile { + pub fn new(file: File) -> Self { + Self(file) + } +} + +impl PrimaryStorage for CarFile { + fn get_key(&mut self, pos: u64) -> Result, PrimaryError> { + let file_size = self.0.seek(SeekFrom::End(0))?; + if pos > file_size { + return Err(PrimaryError::OutOfBounds); + } + + self.0.seek(SeekFrom::Start(pos))?; + let (block, _bytes_read) = cariter::read_data(&mut self.0)?; + let (cid_bytes, _data) = cariter::read_block(&block); + let cid = + Cid::try_from(&cid_bytes[..]).map_err(|error| PrimaryError::Other(Box::new(error)))?; + let digest = cid.hash().digest(); + Ok(digest.to_vec()) + } +} + +fn insert_into_index(car_file: CarFile, car_iter: CarIter) { + const BUCKETS_BITS: u8 = 24; + let temp_dir = tempfile::tempdir().unwrap(); + let index_path = temp_dir.path().join("storethehash.index"); + let mut index = Index::<_, BUCKETS_BITS>::open(index_path, car_file).unwrap(); + + let mut counter = 0; + for (cid_bytes, _, pos) in car_iter { + if counter % 100000 == 0 { + println!("{} keys inserted", counter); + } + let cid = Cid::try_from(&cid_bytes[..]).unwrap(); + let digest = cid.hash().digest(); + index.put(&digest, pos).unwrap(); + + counter += 1; + } +} + +fn main() { + let car_path = env::args().skip(1).next(); + match car_path { + Some(path) => { + let car_file_for_iter = File::open(&path).unwrap(); + let car_file_for_iter_reader = BufReader::new(car_file_for_iter); + let car_iter = CarIter::new(car_file_for_iter_reader); + + let car_file_for_index = File::open(&path).unwrap(); + let primary_storage = CarFile::new(car_file_for_index); + insert_into_index(primary_storage, car_iter); + } + None => println!("usage: fromcarfile "), + } +} diff --git a/src/primary.rs b/src/primary.rs index 0f6d7c7..6642cb9 100644 --- a/src/primary.rs +++ b/src/primary.rs @@ -11,6 +11,9 @@ pub enum PrimaryError { OutOfBounds, #[error("IO error.")] Io(#[from] std::io::Error), + // Catch-all for errors that could happen within the primary storage. + #[error(transparent)] + Other(Box), } pub trait PrimaryStorage {