Skip to content

Commit

Permalink
fix master conflict
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace committed Oct 22, 2020
1 parent 63f18e5 commit 28803f4
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 27 deletions.
8 changes: 5 additions & 3 deletions src/cache_evict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl CacheSubmitor {
self.block_on_full = false;
}

pub fn get_cache_tracker(&mut self, file_num: u64, offset: u64) -> Option<Arc<AtomicUsize>> {
pub fn get_cache_tracker(&mut self, file_num: u64, offset: u64) -> Option<CacheTracker> {
if self.cache_limit == 0 {
return None;
}
Expand Down Expand Up @@ -104,13 +104,15 @@ impl CacheSubmitor {
}
}

Some(self.size_tracker.clone())
Some(CacheTracker::new(
self.global_stats.clone(),
self.size_tracker.clone(),
))
}

pub fn fill_chunk(&mut self, size: usize) {
self.chunk_size += size;
self.size_tracker.fetch_add(size, Ordering::Release);
self.global_stats.add_mem_change(size);
}

fn reset(&mut self, file_num: u64, offset: u64) {
Expand Down
9 changes: 3 additions & 6 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,14 @@ where
log_batch: &mut LogBatch<E, W>,
sync: bool,
) -> BoxFuture<'static, Result<usize>> {
let mut entries_size = 0;
let now = Instant::now();
if let Some(content) = log_batch.encode_to_bytes(&mut entries_size) {
if let Some(content) = log_batch.encode_to_bytes() {
let (sender, r) = future_channel::oneshot::channel();
let bytes = content.len();
let task = WriteTask {
content,
sync,
entries_size,
entries_size: log_batch.entries_size(),
sender,
};
if let Err(_) = self.wal_sender.send(LogMsg::Write(task)) {
Expand Down Expand Up @@ -378,14 +377,12 @@ where
if let Some(tracker) =
cache_submitor.get_cache_tracker(file_num, offset)
{
let mut encoded_size = 0;
for item in log_batch.items.iter_mut() {
if let LogItemContent::Entries(entries) = &mut item.content {
entries.attach_cache_tracker(tracker.clone());
encoded_size += entries.encoded_size;
}
}
cache_submitor.fill_chunk(encoded_size);
cache_submitor.fill_chunk(log_batch.entries_size());
}
}
self.apply_to_memtable(&mut log_batch, queue, file_num);
Expand Down
33 changes: 19 additions & 14 deletions src/log_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl<E: Message + PartialEq> PartialEq for Entries<E> {

impl<E: Message> Entries<E> {
pub fn new(entries: Vec<E>, entries_index: Option<Vec<EntryIndex>>) -> Entries<E> {
let entries_index =
let entries_index =
entries_index.unwrap_or_else(|| vec![EntryIndex::default(); entries.len()]);
Entries {
entries,
Expand Down Expand Up @@ -202,7 +202,7 @@ impl<E: Message> Entries<E> {
// This offset doesn't count the header.
self.entries_index[i].offset = vec.len() as u64;
self.entries_index[i].len = content.len() as u64;
*entries_size += entries_index[i].len as usize;
*entries_size += self.entries_index[i].len as usize;
}

vec.extend_from_slice(&content);
Expand Down Expand Up @@ -232,12 +232,12 @@ impl<E: Message> Entries<E> {
}
}

pub fn attach_cache_tracker(&mut self, chunk_size: Arc<AtomicUsize>) {
pub fn attach_cache_tracker(&mut self, tracker: CacheTracker) {
for idx in self.entries_index.iter_mut() {
idx.cache_tracker = Some(CacheTracker {
chunk_size: chunk_size.clone(),
sub_on_drop: idx.len as usize,
});
let mut tkr = tracker.clone();
tkr.global_stats.add_mem_change(idx.len as usize);
tkr.sub_on_drop = idx.len as usize;
idx.cache_tracker = Some(tkr);
}
}

Expand Down Expand Up @@ -466,7 +466,7 @@ where
W: EntryExt<E>,
{
pub items: Vec<LogItem<E>>,
entries_size: RefCell<usize>,
entries_size: usize,
_phantom: PhantomData<W>,
}

Expand All @@ -478,7 +478,7 @@ where
fn default() -> Self {
Self {
items: Vec::with_capacity(16),
entries_size: RefCell::new(0),
entries_size: 0,
_phantom: PhantomData,
}
}
Expand All @@ -496,7 +496,7 @@ where
pub fn with_capacity(cap: usize) -> Self {
Self {
items: Vec::with_capacity(cap),
entries_size: RefCell::new(0),
entries_size: 0,
_phantom: PhantomData,
}
}
Expand Down Expand Up @@ -576,7 +576,7 @@ where
file_num,
base_offset,
content_offset,
&mut log_batch.entries_size.borrow_mut(),
&mut log_batch.entries_size,
)?;
log_batch.items.push(item);
items_count -= 1;
Expand All @@ -603,12 +603,17 @@ where
let mut vec = Vec::with_capacity(4096);
vec.encode_u64(0).unwrap();
vec.encode_var_u64(self.items.len() as u64).unwrap();
for item in &self.items {
item.encode_to::<W>(&mut vec, &mut *self.entries_size.borrow_mut())
for item in self.items.iter_mut() {
item.encode_to::<W>(&mut vec, &mut self.entries_size)
.unwrap();
}

let compression_type = CompressionType::None;
let compression_type = if vec.len() > COMPRESSION_SIZE {
vec = lz4::encode_block(&vec[HEADER_LEN..], HEADER_LEN, 4);
CompressionType::Lz4
} else {
CompressionType::None
};

let checksum = crc32(&vec[8..]);
vec.encode_u32_le(checksum).unwrap();
Expand Down
6 changes: 2 additions & 4 deletions src/wal.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use futures::channel::oneshot::Sender;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::Receiver;
use std::sync::Arc;

use crate::cache_evict::CacheSubmitor;
use crate::cache_evict::{CacheSubmitor, CacheTracker};
use crate::errors::Result;
use crate::pipe_log::{GenericPipeLog, LogQueue};
use crate::util::Statistic;
Expand All @@ -14,7 +12,7 @@ pub struct WriteTask {
pub content: Vec<u8>,
pub entries_size: usize,
pub sync: bool,
pub sender: Sender<(u64, u64, Option<Arc<AtomicUsize>>)>,
pub sender: Sender<(u64, u64, Option<CacheTracker>)>,
}

pub enum LogMsg {
Expand Down

0 comments on commit 28803f4

Please sign in to comment.