Skip to content

Commit

Permalink
fix: use buffered writer
Browse files Browse the repository at this point in the history
Buffered writes should improve the performance.
  • Loading branch information
vmx committed Jan 12, 2021
1 parent f447b81 commit d6d43ed
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 21 deletions.
17 changes: 12 additions & 5 deletions primary/cid/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
//! [Car files]: https://github.com/ipld/specs/blob/d8ae7e9d78e4efe7e21ec2bae427d79b5af95bcd/block-layer/content-addressable-archives.md#format-description
//! [LEB128]: https://en.wikipedia.org/wiki/LEB128
use std::cell::RefCell;
use std::convert::TryFrom;
use std::fs::{File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::io::{BufWriter, Read, Seek, SeekFrom, Write};
use std::path::Path;

use cid::Cid;
Expand All @@ -19,7 +20,10 @@ use wasabi_leb128::{ParseLeb128Error, ReadLeb128, WriteLeb128};

/// A primary storage that is CID aware.
#[derive(Debug)]
pub struct CidPrimary(File);
pub struct CidPrimary {
reader: File,
writer: RefCell<BufWriter<File>>,
}

impl CidPrimary {
pub fn open<P>(path: P) -> Result<Self, PrimaryError>
Expand All @@ -33,13 +37,16 @@ impl CidPrimary {
.append(true)
.open(path)?;
file.seek(SeekFrom::End(0))?;
Ok(Self(file))
Ok(Self {
reader: file.try_clone()?,
writer: RefCell::new(BufWriter::new(file)),
})
}
}

impl PrimaryStorage for CidPrimary {
fn get(&self, pos: u64) -> Result<(Vec<u8>, Vec<u8>), PrimaryError> {
let mut file = &self.0;
let mut file = &self.reader;
let file_size = file.seek(SeekFrom::End(0))?;
if pos > file_size {
return Err(PrimaryError::OutOfBounds);
Expand All @@ -51,7 +58,7 @@ impl PrimaryStorage for CidPrimary {
}

fn put(&self, key: &[u8], value: &[u8]) -> Result<u64, PrimaryError> {
let mut file = &self.0;
let mut file = self.writer.borrow_mut();
let file_size = file.seek(SeekFrom::End(0))?;

let size = key.len() + value.len();
Expand Down
35 changes: 19 additions & 16 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::cell::RefCell;
use std::cmp;
use std::convert::{TryFrom, TryInto};
use std::fs::{File, OpenOptions};
use std::io::{self, BufReader, Read, Seek, SeekFrom, Write};
use std::io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write};
use std::path::Path;

use log::{debug, warn};
Expand Down Expand Up @@ -77,7 +77,8 @@ impl From<&[u8]> for Header {
#[derive(Debug)]
pub struct Index<P: PrimaryStorage, const N: u8> {
buckets: RefCell<Buckets<N>>,
file: File,
reader: File,
writer: RefCell<BufWriter<File>>,
pub primary: P,
}

Expand Down Expand Up @@ -158,7 +159,8 @@ impl<P: PrimaryStorage, const N: u8> Index<P, N> {

Ok(Self {
buckets: RefCell::new(buckets),
file: index_file,
reader: index_file.try_clone()?,
writer: RefCell::new(BufWriter::new(index_file)),
primary,
})
}
Expand All @@ -183,8 +185,9 @@ impl<P: PrimaryStorage, const N: u8> Index<P, N> {
// only full bytes are trimmed off.
let index_key = strip_bucket_prefix(&key, N);

// Reading and seekign needs mutable file accesss.
let mut file = &self.file;
// Reading and seeking needs mutable file accesss.
let mut writer = self.writer.borrow_mut();
let mut reader = &self.reader;

// No records stored in that bucket yet
let new_data = if index_offset == 0 {
Expand All @@ -196,13 +199,13 @@ impl<P: PrimaryStorage, const N: u8> Index<P, N> {
// Read the record list from disk and insert the new key
else {
let mut recordlist_size_buffer = [0; 4];
file.seek(SeekFrom::Start(index_offset))?;
file.read_exact(&mut recordlist_size_buffer)?;
reader.seek(SeekFrom::Start(index_offset))?;
reader.read_exact(&mut recordlist_size_buffer)?;
let recordlist_size = usize::try_from(u32::from_le_bytes(recordlist_size_buffer))
.expect(">=32-bit platform needed");

let mut data = vec![0u8; recordlist_size];
file.read_exact(&mut data)?;
reader.read_exact(&mut data)?;

let records = RecordList::new(&data);
let (pos, prev_record) = records.find_key_position(index_key);
Expand Down Expand Up @@ -281,7 +284,7 @@ impl<P: PrimaryStorage, const N: u8> Index<P, N> {
}
};

let recordlist_pos = file
let recordlist_pos = writer
.seek(SeekFrom::End(0))
.expect("It's always possible to seek to the end of the file.");

Expand All @@ -291,9 +294,9 @@ impl<P: PrimaryStorage, const N: u8> Index<P, N> {
let new_data_size: [u8; 4] = u32::try_from(new_data.len() + BUCKET_PREFIX_SIZE)
.expect("A record list cannot be bigger than 2^32.")
.to_le_bytes();
file.write_all(&new_data_size)?;
file.write_all(&bucket.to_le_bytes())?;
file.write_all(&new_data)?;
writer.write_all(&new_data_size)?;
writer.write_all(&bucket.to_le_bytes())?;
writer.write_all(&new_data)?;
// Fsyncs are expensive
//self.file.sync_data()?;

Expand Down Expand Up @@ -330,14 +333,14 @@ impl<P: PrimaryStorage, const N: u8> Index<P, N> {
// storage.
else {
let mut recordlist_size_buffer = [0; 4];
let mut file = &self.file;
file.seek(SeekFrom::Start(index_offset))?;
file.read_exact(&mut recordlist_size_buffer)?;
let mut reader = &self.reader;
reader.seek(SeekFrom::Start(index_offset))?;
reader.read_exact(&mut recordlist_size_buffer)?;
let recordlist_size = usize::try_from(u32::from_le_bytes(recordlist_size_buffer))
.expect(">=32-bit platform needed");

let mut data = vec![0u8; recordlist_size];
file.read_exact(&mut data)?;
reader.read_exact(&mut data)?;

let records = RecordList::new(&data);
let file_offset = records.get(index_key);
Expand Down

0 comments on commit d6d43ed

Please sign in to comment.