Skip to content

Commit

Permalink
Support native StoreKey in FilesystemStore
Browse files Browse the repository at this point in the history
  • Loading branch information
KGrewal1 committed Nov 22, 2024
1 parent 22707d7 commit 4ebc432
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 69 deletions.
134 changes: 80 additions & 54 deletions nativelink-store/src/filesystem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ const DEFAULT_BUFF_SIZE: usize = 32 * 1024;
// Default block size of all major filesystems is 4KB
const DEFAULT_BLOCK_SIZE: u64 = 4 * 1024;

pub const STR_PREFIX: &str = "s-";
pub const DIGEST_PREFIX: &str = "d-";

#[derive(Debug, MetricsComponent)]
pub struct SharedContext {
// Used in testing to know how many active drop() spawns are running.
Expand All @@ -70,37 +73,31 @@ enum PathType {
Custom(OsString),
}

// Note: We don't store the full path of the file because it would cause
// a lot of needless memeory bloat. There's a high chance we'll end up with a
// lot of small files, so to prevent storing duplicate data, we store an Arc
// to the path of the directory where the file is stored and the packed digest.
// Resulting in usize + sizeof(DigestInfo).
type FileNameDigest = DigestInfo;
pub struct EncodedFilePath {
shared_context: Arc<SharedContext>,
path_type: PathType,
digest: FileNameDigest,
key: StoreKey<'static>,
}

impl EncodedFilePath {
#[inline]
fn get_file_path(&self) -> Cow<'_, OsStr> {
get_file_path_raw(&self.path_type, self.shared_context.as_ref(), &self.digest)
get_file_path_raw(&self.path_type, self.shared_context.as_ref(), &self.key)
}
}

#[inline]
fn get_file_path_raw<'a>(
path_type: &'a PathType,
shared_context: &SharedContext,
digest: &DigestInfo,
key: &StoreKey<'a>,
) -> Cow<'a, OsStr> {
let folder = match path_type {
PathType::Content => &shared_context.content_path,
PathType::Temp => &shared_context.temp_path,
PathType::Custom(path) => return Cow::Borrowed(path),
};
Cow::Owned(to_full_path_from_digest(folder, digest))
Cow::Owned(to_full_path_from_key(folder, key))
}

impl Drop for EncodedFilePath {
Expand Down Expand Up @@ -132,8 +129,13 @@ impl Drop for EncodedFilePath {
}

#[inline]
fn to_full_path_from_digest(folder: &str, digest: &DigestInfo) -> OsString {
format!("{folder}/{digest}").into()
fn to_full_path_from_key(folder: &str, key: &StoreKey<'_>) -> OsString {
// appropriately prefix file name
match key {
StoreKey::Str(str) => format!("{folder}/{STR_PREFIX}{str}"),
StoreKey::Digest(digest_info) => format!("{folder}/{DIGEST_PREFIX}{digest_info}"),
}
.into()
}

pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
Expand Down Expand Up @@ -300,7 +302,7 @@ impl Debug for FileEntryImpl {
}
}

fn make_temp_digest(digest: &mut DigestInfo) {
fn make_temp_digest(mut digest: DigestInfo) -> DigestInfo {
static DELETE_FILE_COUNTER: AtomicU64 = AtomicU64::new(0);
let mut hash = *digest.packed_hash();
hash[24..].clone_from_slice(
Expand All @@ -309,6 +311,11 @@ fn make_temp_digest(digest: &mut DigestInfo) {
.to_le_bytes(),
);
digest.set_packed_hash(*hash);
digest
}

fn make_temp_key(key: &StoreKey) -> StoreKey<'static> {
StoreKey::Digest(make_temp_digest(key.borrow().into_digest()))
}

impl LenEntry for FileEntryImpl {
Expand Down Expand Up @@ -362,16 +369,15 @@ impl LenEntry for FileEntryImpl {
return;
}
let from_path = encoded_file_path.get_file_path();
let mut new_digest = encoded_file_path.digest;
make_temp_digest(&mut new_digest);
let new_key = make_temp_key(&encoded_file_path.key);

let to_path =
to_full_path_from_digest(&encoded_file_path.shared_context.temp_path, &new_digest);
to_full_path_from_key(&encoded_file_path.shared_context.temp_path, &new_key);

if let Err(err) = fs::rename(&from_path, &to_path).await {
event!(
Level::WARN,
digest = ?encoded_file_path.digest,
key = ?encoded_file_path.key,
?from_path,
?to_path,
?err,
Expand All @@ -380,61 +386,74 @@ impl LenEntry for FileEntryImpl {
} else {
event!(
Level::INFO,
digest = ?encoded_file_path.digest,
key = ?encoded_file_path.key,
?from_path,
?to_path,
"Renamed file",
);
encoded_file_path.path_type = PathType::Temp;
encoded_file_path.digest = new_digest;
encoded_file_path.key = new_key;
}
}
}
}

#[inline]
pub fn digest_from_filename(file_name: &str) -> Result<DigestInfo, Error> {
fn digest_from_filename(file_name: &str) -> Result<DigestInfo, Error> {
let (hash, size) = file_name.split_once('-').err_tip(|| "")?;
let size = size.parse::<i64>()?;
DigestInfo::try_new(hash, size)
}

pub fn key_from_filename(mut file_name: &str) -> Result<StoreKey<'_>, Error> {
if let Some(file_name) = file_name.strip_prefix(STR_PREFIX) {
return Ok(StoreKey::new_str(file_name));
}

// Remove the digest prefix if it exists. Permit unprefixed hashes for backwards compatibility.
if let Some(name) = file_name.strip_prefix(DIGEST_PREFIX) {
file_name = name;
}

digest_from_filename(file_name).map(StoreKey::Digest)
}

/// The number of files to read the metadata for at the same time when running
/// add_files_to_cache.
const SIMULTANEOUS_METADATA_READS: usize = 200;

async fn add_files_to_cache<Fe: FileEntry>(
evicting_map: &EvictingMap<DigestInfo, Arc<Fe>, SystemTime>,
evicting_map: &EvictingMap<StoreKey<'static>, Arc<Fe>, SystemTime>,
anchor_time: &SystemTime,
shared_context: &Arc<SharedContext>,
block_size: u64,
) -> Result<(), Error> {
async fn process_entry<Fe: FileEntry>(
evicting_map: &EvictingMap<DigestInfo, Arc<Fe>, SystemTime>,
evicting_map: &EvictingMap<StoreKey<'static>, Arc<Fe>, SystemTime>,
file_name: &str,
atime: SystemTime,
data_size: u64,
block_size: u64,
anchor_time: &SystemTime,
shared_context: &Arc<SharedContext>,
) -> Result<(), Error> {
let digest = digest_from_filename(file_name)?;
let key = key_from_filename(file_name)?;

let file_entry = Fe::create(
data_size,
block_size,
RwLock::new(EncodedFilePath {
shared_context: shared_context.clone(),
path_type: PathType::Content,
digest,
key: key.borrow().into_owned(),
}),
);
let time_since_anchor = anchor_time
.duration_since(atime)
.map_err(|_| make_input_err!("File access time newer than now"))?;
evicting_map
.insert_with_time(
digest,
key.into_owned(),
Arc::new(file_entry),
time_since_anchor.as_secs() as i32,
)
Expand Down Expand Up @@ -525,7 +544,7 @@ pub struct FilesystemStore<Fe: FileEntry = FileEntryImpl> {
#[metric]
shared_context: Arc<SharedContext>,
#[metric(group = "evicting_map")]
evicting_map: Arc<EvictingMap<DigestInfo, Arc<Fe>, SystemTime>>,
evicting_map: Arc<EvictingMap<StoreKey<'static>, Arc<Fe>, SystemTime>>,
#[metric(help = "Block size of the configured filesystem")]
block_size: u64,
#[metric(help = "Size of the configured read buffer size")]
Expand Down Expand Up @@ -595,16 +614,16 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {

pub async fn get_file_entry_for_digest(&self, digest: &DigestInfo) -> Result<Arc<Fe>, Error> {
self.evicting_map
.get(digest)
.get(&digest.into())
.await
.ok_or_else(|| make_err!(Code::NotFound, "{digest} not found in filesystem store"))
.ok_or_else(|| make_err!(Code::NotFound, "{} not found in filesystem store", digest))
}

async fn update_file<'a>(
self: Pin<&'a Self>,
mut entry: Fe,
mut resumeable_temp_file: fs::ResumeableFileSlot,
final_digest: DigestInfo,
final_digest: StoreKey<'static>,
mut reader: DropCloserReadHalf,
) -> Result<(), Error> {
let mut data_size = 0;
Expand Down Expand Up @@ -652,7 +671,7 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
self.emplace_file(final_digest, Arc::new(entry)).await
}

async fn emplace_file(&self, digest: DigestInfo, entry: Arc<Fe>) -> Result<(), Error> {
async fn emplace_file(&self, key: StoreKey<'_>, entry: Arc<Fe>) -> Result<(), Error> {
// This sequence of events is quite ticky to understand due to the amount of triggers that
// happen, async'ness of it and the locking. So here is a breakdown of what happens:
// 1. Here will hold a write lock on any file operations of this FileEntry.
Expand All @@ -673,17 +692,22 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
let evicting_map = self.evicting_map.clone();
let rename_fn = self.rename_fn;

// we need to extend the lifetime into 'static, for background spawn
let key = key.borrow().into_owned();

// We need to guarantee that this will get to the end even if the parent future is dropped.
// See: https://github.com/TraceMachina/nativelink/issues/495
background_spawn!("filesystem_store_emplace_file", async move {
let mut encoded_file_path = entry.get_encoded_file_path().write().await;
let final_path = get_file_path_raw(
&PathType::Content,
encoded_file_path.shared_context.as_ref(),
&digest,
&key,
);

evicting_map.insert(digest, entry.clone()).await;
evicting_map
.insert(key.borrow().into_owned(), entry.clone())
.await;

let from_path = encoded_file_path.get_file_path();
// Internally tokio spawns fs commands onto a blocking thread anyways.
Expand All @@ -710,12 +734,12 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
// It is possible that the item in our map is no longer the item we inserted,
// So, we need to conditionally remove it only if the pointers are the same.
evicting_map
.remove_if(&digest, |map_entry| Arc::<Fe>::ptr_eq(map_entry, &entry))
.remove_if(&key, |map_entry| Arc::<Fe>::ptr_eq(map_entry, &entry))
.await;
return Err(err);
}
encoded_file_path.path_type = PathType::Content;
encoded_file_path.digest = digest;
encoded_file_path.key = key.borrow().into_owned();
Ok(())
})
.await
Expand All @@ -734,22 +758,22 @@ impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
// existence_cache. We need to convert the digests to owned values to be able to
// insert them into the cache. In theory it should be able to elide this conversion
// but it seems to be a bit tricky to get right.
let keys: Vec<_> = keys.iter().map(|v| v.borrow().into_digest()).collect();
let keys: Vec<_> = keys.iter().map(|v| v.borrow().into_owned()).collect();
self.evicting_map
.sizes_for_keys(&keys, results, false /* peek */)
.await;
// We need to do a special pass to ensure our zero files exist.
// If our results failed and the result was a zero file, we need to
// create the file by spec.
for (digest, result) in keys.iter().zip(results.iter_mut()) {
if result.is_some() || !is_zero_digest(digest) {
for (key, result) in keys.iter().zip(results.iter_mut()) {
if result.is_some() || !is_zero_digest(key.borrow()) {
continue;
}
let (mut tx, rx) = make_buf_channel_pair();
let send_eof_result = tx.send_eof();
self.update(digest.into(), rx, UploadSizeInfo::ExactSize(0))
self.update(key.borrow(), rx, UploadSizeInfo::ExactSize(0))
.await
.err_tip(|| format!("Failed to create zero file for key {digest}"))
.err_tip(|| format!("Failed to create zero file for key {}", key.as_str()))
.merge(
send_eof_result
.err_tip(|| "Failed to send zero file EOF in filesystem store has"),
Expand All @@ -766,21 +790,18 @@ impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
reader: DropCloserReadHalf,
_upload_size: UploadSizeInfo,
) -> Result<(), Error> {
let digest = key.into_digest();
let mut temp_digest = digest;
make_temp_digest(&mut temp_digest);

let temp_key = make_temp_key(&key);
let (entry, temp_file, temp_full_path) = Fe::make_and_open_file(
self.block_size,
EncodedFilePath {
shared_context: self.shared_context.clone(),
path_type: PathType::Temp,
digest: temp_digest,
key: temp_key,
},
)
.await?;

self.update_file(entry, temp_file, digest, reader)
self.update_file(entry, temp_file, key.borrow().into_owned(), reader)
.await
.err_tip(|| format!("While processing with temp file {temp_full_path:?}"))
}
Expand All @@ -795,7 +816,6 @@ impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
mut file: fs::ResumeableFileSlot,
upload_size: UploadSizeInfo,
) -> Result<Option<fs::ResumeableFileSlot>, Error> {
let digest = key.into_digest();
let path = file.get_path().as_os_str().to_os_string();
let file_size = match upload_size {
UploadSizeInfo::ExactSize(size) => size,
Expand All @@ -818,13 +838,13 @@ impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
RwLock::new(EncodedFilePath {
shared_context: self.shared_context.clone(),
path_type: PathType::Custom(path),
digest,
key: key.borrow().into_owned(),
}),
);
// We are done with the file, if we hold a reference to the file here, it could
// result in a deadlock if `emplace_file()` also needs file descriptors.
drop(file);
self.emplace_file(digest, Arc::new(entry))
self.emplace_file(key, Arc::new(entry))
.await
.err_tip(|| "Could not move file into store in upload_file_to_store, maybe dest is on different volume?")?;
return Ok(None);
Expand All @@ -837,9 +857,8 @@ impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
offset: u64,
length: Option<u64>,
) -> Result<(), Error> {
let digest = key.into_digest();
if is_zero_digest(digest) {
self.has(digest.into())
if is_zero_digest(key.borrow()) {
self.has(key.borrow())
.await
.err_tip(|| "Failed to check if zero digest exists in filesystem store")?;
writer
Expand All @@ -848,9 +867,16 @@ impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
return Ok(());
}

let entry =
self.evicting_map.get(&digest).await.ok_or_else(|| {
make_err!(Code::NotFound, "{digest} not found in filesystem store")
let entry = self
.evicting_map
.get(&key.borrow().into_owned())
.await
.ok_or_else(|| {
make_err!(
Code::NotFound,
"{} not found in filesystem store here",
key.as_str()
)
})?;
let read_limit = length.unwrap_or(u64::MAX);
let mut resumeable_temp_file = entry.read_file_part(offset, read_limit).await?;
Expand Down
Loading

0 comments on commit 4ebc432

Please sign in to comment.