Skip to content

Commit

Permalink
feat: add example that uses a CAR file as primary storage
Browse files Browse the repository at this point in the history
  • Loading branch information
vmx committed Nov 27, 2020
1 parent ee223fd commit f0fc55a
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ thiserror = "1.0.22"

[dev-dependencies]
tempfile = "3.1.0"
cid = "0.6.0"
95 changes: 95 additions & 0 deletions examples/fromcarfile/cariter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use std::convert::TryFrom;
use std::io::{self, Read};

/// Read and unsigen varint (LEB128) from a reader.
///
/// Code is based on the Rust compiler:
/// https://github.com/rust-lang/rust/blob/0beba9333754ead8febc5101fc5c35f7dcdfaadf/compiler/rustc_serialize/src/leb128.rs
pub fn read_u64_leb128<R: Read>(reader: &mut R) -> Result<(u64, usize), io::Error> {
let mut result = 0;
let mut shift = 0;
let mut position = 0;
let mut buf = [0];

loop {
reader.read_exact(&mut buf)?;
let byte = buf[0];
position += 1;
if (byte & 0x80) == 0 {
result |= (byte as u64) << shift;
return Ok((result, position));
} else {
result |= ((byte & 0x7F) as u64) << shift;
}
shift += 7;
}
}

/// An iterator over a car file.
#[derive(Debug)]
pub struct CarIter<R: Read> {
/// The data we are iterating over
reader: R,
/// Position within the reader
pos: u64,
}

impl<R: Read> CarIter<R> {
pub fn new(mut reader: R) -> Self {
// Ignore the header for now
let (_header, bytes_read) = read_data(&mut reader).unwrap();
CarIter {
reader,
pos: bytes_read,
}
}
}

/// Read some data prefixed with a varint.
pub fn read_data<R: Read>(reader: &mut R) -> Result<(Vec<u8>, u64), io::Error> {
let (size, bytes_read): (u64, usize) = read_u64_leb128(reader)?;
let mut data = Vec::with_capacity(usize::try_from(size).unwrap());
reader.take(size).read_to_end(&mut data)?;
Ok((data, u64::try_from(bytes_read).unwrap() + size))
}

/// Read a CID together with some data.
pub fn read_block(block: &[u8]) -> (Vec<u8>, Vec<u8>) {
// A block is a CID together with some data.
let (_version, version_offset) = read_u64_leb128(&mut &block[..]).unwrap();
let (_codec, codec_offset) = read_u64_leb128(&mut &block[version_offset..]).unwrap();
let (_multihash_code, multihash_code_offset) =
read_u64_leb128(&mut &block[version_offset + codec_offset..]).unwrap();
let (multihash_size, multihash_size_offset) =
read_u64_leb128(&mut &block[version_offset + codec_offset + multihash_code_offset..])
.unwrap();
let cid_size = version_offset
+ codec_offset
+ multihash_code_offset
+ multihash_size_offset
+ usize::try_from(multihash_size).unwrap();
let (cid, data) = block.split_at(cid_size);
(cid.to_vec(), data.to_vec())
}

impl<R: Read> Iterator for CarIter<R> {
type Item = (Vec<u8>, Vec<u8>, u64);

fn next(&mut self) -> Option<Self::Item> {
match read_data(&mut self.reader) {
Ok((block, bytes_read)) => {
let (cid, data) = read_block(&block);

// Get the current position in order to return it and update it for the next
// iteration.
let pos = self.pos;
self.pos += bytes_read;

Some((cid, data, pos))
}
// We might have hit the end of the file => stop iterating
Err(error) if error.kind() == std::io::ErrorKind::UnexpectedEof => None,
Err(error) => panic!(error),
}
}
}
76 changes: 76 additions & 0 deletions examples/fromcarfile/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
mod cariter;

use std::convert::TryFrom;
use std::env;
use std::fs::File;
use std::io::{BufReader, Read, Seek, SeekFrom};

use cid::Cid;
use storethehash::index::Index;
use storethehash::primary::{PrimaryError, PrimaryStorage};

use cariter::CarIter;

/// CAR file storage implementation.
///
/// The primary storage is a CAR file.
#[derive(Debug)]
struct CarFile(File);

impl CarFile {
pub fn new(file: File) -> Self {
Self(file)
}
}

impl PrimaryStorage for CarFile {
fn get_key(&mut self, pos: u64) -> Result<Vec<u8>, PrimaryError> {
let file_size = self.0.seek(SeekFrom::End(0))?;
if pos > file_size {
return Err(PrimaryError::OutOfBounds);
}

self.0.seek(SeekFrom::Start(pos))?;
let (block, _bytes_read) = cariter::read_data(&mut self.0)?;
let (cid_bytes, _data) = cariter::read_block(&block);
let cid =
Cid::try_from(&cid_bytes[..]).map_err(|error| PrimaryError::Other(Box::new(error)))?;
let digest = cid.hash().digest();
Ok(digest.to_vec())
}
}

fn insert_into_index<R: Read>(car_file: CarFile, car_iter: CarIter<R>) {
const BUCKETS_BITS: u8 = 24;
let temp_dir = tempfile::tempdir().unwrap();
let index_path = temp_dir.path().join("storethehash.index");
let mut index = Index::<_, BUCKETS_BITS>::open(index_path, car_file).unwrap();

let mut counter = 0;
for (cid_bytes, _, pos) in car_iter {
if counter % 100000 == 0 {
println!("{} keys inserted", counter);
}
let cid = Cid::try_from(&cid_bytes[..]).unwrap();
let digest = cid.hash().digest();
index.put(&digest, pos).unwrap();

counter += 1;
}
}

fn main() {
let car_path = env::args().skip(1).next();
match car_path {
Some(path) => {
let car_file_for_iter = File::open(&path).unwrap();
let car_file_for_iter_reader = BufReader::new(car_file_for_iter);
let car_iter = CarIter::new(car_file_for_iter_reader);

let car_file_for_index = File::open(&path).unwrap();
let primary_storage = CarFile::new(car_file_for_index);
insert_into_index(primary_storage, car_iter);
}
None => println!("usage: fromcarfile <path-to-car-file>"),
}
}
3 changes: 3 additions & 0 deletions src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ pub enum PrimaryError {
OutOfBounds,
#[error("IO error.")]
Io(#[from] std::io::Error),
// Catch-all for errors that could happen within the primary storage.
#[error(transparent)]
Other(Box<dyn std::error::Error>),
}

pub trait PrimaryStorage {
Expand Down

0 comments on commit f0fc55a

Please sign in to comment.