Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that EvictingMap is threadsafe #1564

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 43 additions & 21 deletions nativelink-util/src/evicting_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl<T: LenEntry + Send + Sync> LenEntry for Arc<T> {
}

#[derive(MetricsComponent)]
struct State<K: Ord + Hash + Eq + Clone + Debug, T: LenEntry + Debug> {
struct State<K: Ord + Hash + Eq + Clone + Debug + Send, T: LenEntry + Debug + Send> {
lru: LruCache<K, EvictionItem<T>>,
btree: Option<BTreeSet<K>>,
#[metric(help = "Total size of all items in the store")]
Expand All @@ -116,12 +116,14 @@ struct State<K: Ord + Hash + Eq + Clone + Debug, T: LenEntry + Debug> {
lifetime_inserted_bytes: Counter,
}

impl<K: Ord + Hash + Eq + Clone + Debug, T: LenEntry + Debug + Sync> State<K, T> {
impl<K: Ord + Hash + Eq + Clone + Debug + Send + Sync, T: LenEntry + Debug + Sync + Send>
State<K, T>
{
/// Removes an item from the cache.
async fn remove<Q>(&mut self, key: &Q, eviction_item: &EvictionItem<T>, replaced: bool)
where
K: Borrow<Q>,
Q: Ord + Hash + Eq + Debug,
Q: Ord + Hash + Eq + Debug + Sync,
{
if let Some(btree) = &mut self.btree {
btree.remove(key.borrow());
Expand Down Expand Up @@ -153,7 +155,11 @@ impl<K: Ord + Hash + Eq + Clone + Debug, T: LenEntry + Debug + Sync> State<K, T>
}

#[derive(MetricsComponent)]
pub struct EvictingMap<K: Ord + Hash + Eq + Clone + Debug, T: LenEntry + Debug, I: InstantWrapper> {
pub struct EvictingMap<
K: Ord + Hash + Eq + Clone + Debug + Send,
T: LenEntry + Debug + Send,
I: InstantWrapper,
> {
#[metric]
state: Mutex<State<K, T>>,
anchor_time: I,
Expand All @@ -169,7 +175,7 @@ pub struct EvictingMap<K: Ord + Hash + Eq + Clone + Debug, T: LenEntry + Debug,

impl<K, T, I> EvictingMap<K, T, I>
where
K: Ord + Hash + Eq + Clone + Debug,
K: Ord + Hash + Eq + Clone + Debug + Send + Sync,
T: LenEntry + Debug + Clone + Send + Sync,
I: InstantWrapper,
{
Expand Down Expand Up @@ -210,11 +216,11 @@ where
/// and return the number of items that were processed.
/// The `handler` function should return `true` to continue processing the next item
/// or `false` to stop processing.
pub async fn range<F, Q>(&self, prefix_range: impl RangeBounds<Q>, mut handler: F) -> u64
pub async fn range<F, Q>(&self, prefix_range: impl RangeBounds<Q> + Send, mut handler: F) -> u64
where
F: FnMut(&K, &T) -> bool,
F: FnMut(&K, &T) -> bool + Send,
K: Borrow<Q> + Ord,
Q: Ord + Hash + Eq + Debug,
Q: Ord + Hash + Eq + Debug + Sync,
{
let mut state = self.state.lock().await;
let btree = if let Some(ref btree) = state.btree {
Expand Down Expand Up @@ -302,7 +308,7 @@ where
pub async fn size_for_key<Q>(&self, key: &Q) -> Option<u64>
where
K: Borrow<Q>,
Q: Ord + Hash + Eq + Debug,
Q: Ord + Hash + Eq + Debug + Sync,
{
let mut results = [None];
self.sizes_for_keys([key], &mut results[..], false).await;
Expand All @@ -317,15 +323,18 @@ where
/// LRU cache. Note: peek may still evict, but won't promote.
pub async fn sizes_for_keys<It, Q, R>(&self, keys: It, results: &mut [Option<u64>], peek: bool)
where
It: IntoIterator<Item = R>,
It: IntoIterator<Item = R> + Send,
// Note: It's not enough to have the inserts themselves be Send. The
// returned iterator should be Send as well.
<It as IntoIterator>::IntoIter: Send,
// This may look strange, but what we are doing is saying:
// * `K` must be able to borrow `Q`
// * `R` (the input stream item type) must also be able to borrow `Q`
// Note: That K and R do not need to be the same type, they just both need
// to be able to borrow a `Q`.
K: Borrow<Q>,
R: Borrow<Q>,
Q: Ord + Hash + Eq + Debug,
R: Borrow<Q> + Send,
Q: Ord + Hash + Eq + Debug + Sync,
{
let mut state = self.state.lock().await;

Expand Down Expand Up @@ -369,7 +378,7 @@ where
pub async fn get<Q>(&self, key: &Q) -> Option<T>
where
K: Borrow<Q>,
Q: Ord + Hash + Eq + Debug,
Q: Ord + Hash + Eq + Debug + Sync,
{
let mut state = self.state.lock().await;
self.evict_items(&mut *state).await;
Expand Down Expand Up @@ -404,7 +413,13 @@ where

/// Same as `insert()`, but optimized for multiple inserts.
/// Returns the replaced items if any.
pub async fn insert_many(&self, inserts: impl IntoIterator<Item = (K, T)>) -> Vec<T> {
pub async fn insert_many<It>(&self, inserts: It) -> Vec<T>
where
It: IntoIterator<Item = (K, T)> + Send,
// Note: It's not enough to have the inserts themselves be Send. The
// returned iterator should be Send as well.
<It as IntoIterator>::IntoIter: Send,
{
let mut inserts = inserts.into_iter().peekable();
// Shortcut for cases where there are no inserts, so we don't need to lock.
if inserts.peek().is_none() {
Expand All @@ -415,12 +430,18 @@ where
.await
}

async fn inner_insert_many(
async fn inner_insert_many<It>(
&self,
state: &mut State<K, T>,
inserts: impl IntoIterator<Item = (K, T)>,
inserts: It,
seconds_since_anchor: i32,
) -> Vec<T> {
) -> Vec<T>
where
It: IntoIterator<Item = (K, T)> + Send,
// Note: It's not enough to have the inserts themselves be Send. The
// returned iterator should be Send as well.
<It as IntoIterator>::IntoIter: Send,
{
let mut replaced_items = Vec::new();
for (key, data) in inserts {
let new_item_size = data.len();
Expand All @@ -442,7 +463,7 @@ where
pub async fn remove<Q>(&self, key: &Q) -> bool
where
K: Borrow<Q>,
Q: Ord + Hash + Eq + Debug,
Q: Ord + Hash + Eq + Debug + Sync,
{
let mut state = self.state.lock().await;
self.inner_remove(&mut state, key).await
Expand All @@ -451,7 +472,7 @@ where
async fn inner_remove<Q>(&self, state: &mut State<K, T>, key: &Q) -> bool
where
K: Borrow<Q>,
Q: Ord + Hash + Eq + Debug,
Q: Ord + Hash + Eq + Debug + Sync,
{
self.evict_items(state).await;
if let Some(entry) = state.lru.pop(key.borrow()) {
Expand All @@ -463,10 +484,11 @@ where

/// Same as `remove()`, but allows for a conditional to be applied to the
/// entry before removal in an atomic fashion.
pub async fn remove_if<Q, F: FnOnce(&T) -> bool>(&self, key: &Q, cond: F) -> bool
pub async fn remove_if<Q, F>(&self, key: &Q, cond: F) -> bool
where
K: Borrow<Q>,
Q: Ord + Hash + Eq + Debug,
Q: Ord + Hash + Eq + Debug + Sync,
F: FnOnce(&T) -> bool + Send,
{
let mut state = self.state.lock().await;
if let Some(entry) = state.lru.get(key.borrow()) {
Expand Down
2 changes: 1 addition & 1 deletion nativelink-util/tests/evicting_map_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ async fn remove_evicts_on_time() -> Result<(), Error> {
async fn range_multiple_items_test() -> Result<(), Error> {
async fn get_map_range(
evicting_map: &EvictingMap<String, BytesWrapper, MockInstantWrapped>,
range: impl std::ops::RangeBounds<String>,
range: impl std::ops::RangeBounds<String> + Send,
) -> Vec<(String, Bytes)> {
let mut found_values = Vec::new();
evicting_map
Expand Down
Loading