Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ arc-swap = "1.7.1"
backon = { version = "1.6.0", default-features = false, features = ["tokio-sleep"] }
bytes = "1.10"
bytesize = "2.1.0"
camino = "1.2.1"
clap = { version = "4.5.47", features = ["derive"] }
flatbuffers = "25.2"
foyer = "0.20.0"
Expand Down
87 changes: 73 additions & 14 deletions pond-fs/src/fuse.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
use pond::{ErrorKind, Fd, Ino, Volume};
use std::collections::HashMap;
use std::ffi::OsStr;
use std::hash::DefaultHasher;
use std::hash::Hash;
use std::hash::Hasher;
use std::time::Duration;
use std::time::SystemTime;
use tracing::instrument;

type FilenameHash = i64;

const READDIR_BATCH: usize = 64;

pub struct Pond {
volume: Volume,
runtime: tokio::runtime::Runtime,
uid: u32,
gid: u32,
kernel_cache_timeout: Duration,
readdir_offsets: HashMap<(Ino, FilenameHash), String>,
}

impl Pond {
Expand All @@ -28,6 +37,7 @@ impl Pond {
uid,
gid,
kernel_cache_timeout,
readdir_offsets: Default::default(),
}
}
}
Expand Down Expand Up @@ -88,7 +98,7 @@ impl fuser::Filesystem for Pond {
match fs_try!(reply, self.volume.lookup(parent.into(), name)) {
Some(attr) => reply.entry(
&self.kernel_cache_timeout,
&fuse_attr(self.uid, self.gid, attr),
&fuse_attr(self.uid, self.gid, &attr),
0,
),
None => reply.error(libc::ENOENT),
Expand All @@ -106,7 +116,7 @@ impl fuser::Filesystem for Pond {
let attr = fs_try!(reply, self.volume.getattr(ino.into()));
reply.attr(
&self.kernel_cache_timeout,
&fuse_attr(self.uid, self.gid, attr),
&fuse_attr(self.uid, self.gid, &attr),
);
}

Expand All @@ -116,20 +126,62 @@ impl fuser::Filesystem for Pond {
_req: &fuser::Request<'_>,
ino: u64,
_fh: u64,
offset: i64,
offset: FilenameHash,
mut reply: fuser::ReplyDirectory,
) {
let iter = fs_try!(reply, self.volume.readdir(ino.into()));
let offset = fs_try!(reply, offset.try_into().map_err(|_| ErrorKind::InvalidData));
// translate the offset into the filename where the last readdir left off before its buffer
// was full. keep the cookie around so we can fall back to it if nothing new was returned.
let mut token: Option<String> = if offset == 0 {
None
} else {
let name = fs_try!(
reply,
// note that we aren't removing it here. this means that the map grows over the
// lifetime of the mount. this allows uses to call readdir with the same offset
// multiple times, whereas removing it would cause subsequent calls to fail
// completely. if this becomes a problem, we can revisit removing the entry here or
// having a TTL map.
self.readdir_offsets
.get(&(ino.into(), offset))
.ok_or_else(|| pond::Error::new(
pond::ErrorKind::InvalidData,
format!("bad offset passed to readdir: {offset}"),
))
);
Some(name.clone())
};

'outer: loop {
let chunk_iter = fs_try!(
reply,
self.volume
.readdir(ino.into(), token.clone(), READDIR_BATCH,)
);

let mut num_entries = 0;
for entry in chunk_iter {
num_entries += 1;
let name = entry.name();
let attr = entry.attr();

let full_buffer =
reply.add(attr.ino.into(), hash(name), fuse_kind(attr.kind), name);
if full_buffer {
break 'outer;
}

token = Some(name.to_string());
}

for (i, entry) in iter.enumerate().skip(offset) {
let attr = entry.attr();
let name = entry.name();
let is_full = reply.add(attr.ino.into(), (i + 1) as i64, fuse_kind(attr.kind), name);
if is_full {
// if this emits less than the READDIR_BATCH we asked for, we've reached EOF.
if num_entries < READDIR_BATCH {
break;
}
}

if let Some(name) = token {
self.readdir_offsets.insert((ino.into(), hash(&name)), name);
}
reply.ok();
}

Expand All @@ -147,7 +199,7 @@ impl fuser::Filesystem for Pond {
let attr = fs_try!(reply, self.volume.mkdir(parent.into(), name.to_string()));
reply.entry(
&self.kernel_cache_timeout,
&fuse_attr(self.uid, self.gid, attr),
&fuse_attr(self.uid, self.gid, &attr),
0,
);
}
Expand Down Expand Up @@ -213,11 +265,12 @@ impl fuser::Filesystem for Pond {
let name = fs_try!(reply, from_os_str(name));
let (attr, fd) = fs_try!(
reply,
self.volume.create(parent.into(), name.to_string(), excl)
self.runtime
.block_on(self.volume.create(parent.into(), name.to_string(), excl))
);
reply.created(
&self.kernel_cache_timeout,
&fuse_attr(self.uid, self.gid, attr),
&fuse_attr(self.uid, self.gid, &attr),
0,
fd.into(),
0,
Expand Down Expand Up @@ -344,7 +397,7 @@ impl fuser::Filesystem for Pond {
let attr = fs_try!(reply, self.volume.getattr(ino));
reply.attr(
&self.kernel_cache_timeout,
&fuse_attr(self.uid, self.gid, attr),
&fuse_attr(self.uid, self.gid, &attr),
);
}

Expand Down Expand Up @@ -451,3 +504,9 @@ fn getuid() -> u32 {
fn getgid() -> u32 {
unsafe { libc::getgid() }
}

fn hash(s: &str) -> FilenameHash {
let mut hasher = DefaultHasher::new();
s.hash(&mut hasher);
hasher.finish() as i64
}
2 changes: 1 addition & 1 deletion pond-fs/tests/fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ fn test_commit(
// we have all the data from before the commit and nothing
// from after it.
let volume = runtime.block_on(client.load_volume(&None)).unwrap();
assert_eq!(volume.version(), &Version::from_static("v1"));
assert_eq!(volume.version(), Version::from_static("v1"));

let mount = spawn_mount(mount_dir, volume);
let expected = read_entries(expected_dir);
Expand Down
1 change: 1 addition & 0 deletions pond/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ edition = "2024"
arc-swap.workspace = true
backon.workspace = true
bytes.workspace = true
camino.workspace = true
flatbuffers.workspace = true
foyer.workspace = true
foyer-memory.workspace = true
Expand Down
56 changes: 47 additions & 9 deletions pond/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod metrics;
mod storage;
mod volume;

use camino::Utf8PathBuf;
pub use client::Client;
pub use error::{Error, ErrorKind, Result};
pub use location::Location;
Expand Down Expand Up @@ -146,15 +147,14 @@ impl FileAttr {
}

#[derive(Debug, Clone)]
pub struct DirEntry<'a> {
pub struct DirEntryRef<'a> {
name: &'a str,
parents: Vec<&'a str>,
attr: &'a FileAttr,
locations: &'a [Location],
data: &'a metadata::EntryData,
}

impl<'a> DirEntry<'a> {
impl<'a> DirEntryRef<'a> {
pub fn name(&self) -> &str {
self.name
}
Expand All @@ -163,15 +163,12 @@ impl<'a> DirEntry<'a> {
self.attr
}

pub fn location(&self) -> Option<(&Location, ByteRange)> {
pub fn location(&self) -> Option<(Location, ByteRange)> {
match self.data {
metadata::EntryData::File {
location_idx,
location,
byte_range,
} => {
let location = &self.locations[*location_idx];
Some((location, *byte_range))
}
} => Some((location.clone(), *byte_range)),
_ => None,
}
}
Expand All @@ -185,6 +182,47 @@ impl<'a> DirEntry<'a> {
pub fn is_regular(&self) -> bool {
self.attr.ino.is_regular()
}

pub(crate) fn to_owned(&self) -> DirEntry {
let location = self.location().map(|(loc, range)| (loc.clone(), range));
let mut path: Utf8PathBuf = self.parents.iter().collect();
path.push(self.name());
DirEntry {
path,
attr: self.attr().clone(),
location,
}
}
}

/// Owned equivalent of [`DirEntryRef`] that does not borrow from the underlying volume.
#[derive(Debug, Clone)]
pub struct DirEntry {
path: Utf8PathBuf,
attr: FileAttr,
location: Option<(Location, ByteRange)>,
}

impl DirEntry {
pub fn name(&self) -> &str {
self.path.file_name().expect("BUG: path ends in '..'")
}

pub fn path(&self) -> &str {
self.path.as_str()
}

pub fn attr(&self) -> &FileAttr {
&self.attr
}

pub fn location(&self) -> Option<(&Location, ByteRange)> {
self.location.as_ref().map(|(loc, range)| (loc, *range))
}

pub fn is_regular(&self) -> bool {
self.attr.ino.is_regular()
}
}

// TODO: add checksums/etags here?
Expand Down
23 changes: 14 additions & 9 deletions pond/src/location.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,29 @@
use std::{borrow::Cow, sync::Arc};

/// Location is an enum that acts as a pointer to a blob of bytes.
///
/// Location is a lightweight as each enum is basically just an Arc. Cheap to clone.
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum Location {
Staged { path: std::path::PathBuf },
Committed { key: Arc<str> },
Staged {
path: Arc<std::path::PathBuf>,
generation: u64,
},
Committed {
key: Arc<str>,
},
}

impl std::fmt::Display for Location {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Location::Staged { path } => write!(f, "{}", path.display()),
Location::Staged { path, .. } => write!(f, "{}", path.display()),
Location::Committed { key } => write!(f, "{key}"),
}
}
}

impl Location {
pub(crate) fn is_staged(&self) -> bool {
matches!(self, Location::Staged { .. })
}

pub(crate) fn committed<'a>(key: impl Into<Cow<'a, str>>) -> Self {
let key = match key.into() {
Cow::Borrowed(str) => Arc::from(str),
Expand All @@ -28,9 +32,10 @@ impl Location {
Location::Committed { key }
}

pub(crate) fn staged(path: impl AsRef<std::path::Path>) -> Self {
pub(crate) fn staged(path: impl AsRef<std::path::Path>, generation: u64) -> Self {
Location::Staged {
path: path.as_ref().to_path_buf(),
path: path.as_ref().to_path_buf().into(),
generation,
}
}
}
Loading