Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

use crate::path::Path;
use crate::{
Attributes, Extensions, ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions,
PutPayloadMut, TagSet, WriteMultipart,
Attributes, Extensions, ObjectMeta, ObjectStore, ObjectStoreExt, PutMultipartOptions,
PutOptions, PutPayloadMut, TagSet, WriteMultipart,
};
use bytes::Bytes;
use futures::future::{BoxFuture, FutureExt};
Expand All @@ -37,7 +37,7 @@ pub const DEFAULT_BUFFER_SIZE: usize = 1024 * 1024;

/// An async-buffered reader compatible with the tokio IO traits
///
/// Internally this maintains a buffer of the requested size, and uses [`ObjectStore::get_range`]
/// Internally this maintains a buffer of the requested size, and uses [`ObjectStoreExt::get_range`]
/// to populate its internal buffer once depleted. This buffer is cleared on seek.
///
/// Whilst simple, this interface will typically be outperformed by the native [`ObjectStore`]
Expand All @@ -46,7 +46,7 @@ pub const DEFAULT_BUFFER_SIZE: usize = 1024 * 1024;
/// round-trips is critical to throughput.
///
/// Systems looking to sequentially scan a file should instead consider using [`ObjectStoreExt::get`],
/// or [`ObjectStore::get_opts`], or [`ObjectStore::get_range`] to read a particular range.
/// or [`ObjectStore::get_opts`], or [`ObjectStoreExt::get_range`] to read a particular range.
///
/// Systems looking to read multiple ranges of a file should instead consider using
/// [`ObjectStore::get_ranges`], which will optimise the vectored IO.
Expand Down
5 changes: 1 addition & 4 deletions src/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,10 @@ impl ObjectStore for ChunkedStore {
})
}

async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
self.inner.get_range(location, range).await
}

async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
self.inner.get_ranges(location, ranges).await
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
self.inner.head(location).await
}
Expand Down
86 changes: 42 additions & 44 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -823,46 +823,6 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
/// ```
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult>;

/// Return the bytes that are stored at the specified location
/// in the given byte range.
///
/// See [`GetRange::Bounded`] for more details on how `range` gets interpreted.
///
/// To retrieve a range of bytes from a versioned object, use [`ObjectStore::get_opts`] by specifying the range in the [`GetOptions`].
///
/// ## Examples
///
/// This example uses a basic local filesystem object store to get a byte range from an object.
///
/// ```ignore-wasm32
/// # use object_store::local::LocalFileSystem;
/// # use tempfile::tempdir;
/// # use object_store::{path::Path, ObjectStore, ObjectStoreExt};
/// async fn get_range_example() {
/// let tmp = tempdir().unwrap();
/// let store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
/// let location = Path::from("example.txt");
/// let content = b"Hello, Object Store!";
///
/// // Put the object into the store
/// store
/// .put(&location, content.as_ref().into())
/// .await
/// .expect("Failed to put object");
///
/// // Get the object from the store
/// let bytes = store
/// .get_range(&location, 0..5)
/// .await
/// .expect("Failed to get object");
/// println!("Retrieved range [0-5]: {}", String::from_utf8_lossy(&bytes));
/// }
/// ```
async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
let options = GetOptions::new().with_range(Some(range));
self.get_opts(location, options).await?.bytes().await
}

/// Return the bytes that are stored at the specified location
/// in the given byte ranges
async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
Expand Down Expand Up @@ -1138,10 +1098,6 @@ macro_rules! as_ref_impl {
self.as_ref().get_opts(location, options).await
}

async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
self.as_ref().get_range(location, range).await
}

async fn get_ranges(
&self,
location: &Path,
Expand Down Expand Up @@ -1257,6 +1213,43 @@ pub trait ObjectStoreExt: ObjectStore {
/// }
/// ```
fn get(&self, location: &Path) -> impl Future<Output = Result<GetResult>>;

/// Return the bytes that are stored at the specified location
/// in the given byte range.
///
/// See [`GetRange::Bounded`] for more details on how `range` gets interpreted.
///
/// To retrieve a range of bytes from a versioned object, use [`ObjectStore::get_opts`] by specifying the range in the [`GetOptions`].
///
/// ## Examples
///
/// This example uses a basic local filesystem object store to get a byte range from an object.
///
/// ```ignore-wasm32
/// # use object_store::local::LocalFileSystem;
/// # use tempfile::tempdir;
/// # use object_store::{path::Path, ObjectStore, ObjectStoreExt};
/// async fn get_range_example() {
/// let tmp = tempdir().unwrap();
/// let store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
/// let location = Path::from("example.txt");
/// let content = b"Hello, Object Store!";
///
/// // Put the object into the store
/// store
/// .put(&location, content.as_ref().into())
/// .await
/// .expect("Failed to put object");
///
/// // Get the object from the store
/// let bytes = store
/// .get_range(&location, 0..5)
/// .await
/// .expect("Failed to get object");
/// println!("Retrieved range [0-5]: {}", String::from_utf8_lossy(&bytes));
/// }
/// ```
fn get_range(&self, location: &Path, range: Range<u64>) -> impl Future<Output = Result<Bytes>>;
}

impl<T> ObjectStoreExt for T
Expand All @@ -1276,6 +1269,11 @@ where
async fn get(&self, location: &Path) -> Result<GetResult> {
self.get_opts(location, GetOptions::default()).await
}

async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
let options = GetOptions::new().with_range(Some(range));
self.get_opts(location, options).await?.bytes().await
}
}

/// Result of a list call that includes objects, prefixes (directories) and a
Expand Down
5 changes: 0 additions & 5 deletions src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,6 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
Ok(permit_get_result(r, permit))
}

async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
let _permit = self.semaphore.acquire().await.unwrap();
self.inner.get_range(location, range).await
}

async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
let _permit = self.semaphore.acquire().await.unwrap();
self.inner.get_ranges(location, ranges).await
Expand Down
9 changes: 0 additions & 9 deletions src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,15 +429,6 @@ impl ObjectStore for LocalFileSystem {
.await
}

async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
let path = self.path_to_filesystem(location)?;
maybe_spawn_blocking(move || {
let (mut file, _) = open_file(&path)?;
read_range(&mut file, &path, range)
})
.await
}

async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
let path = self.path_to_filesystem(location)?;
let ranges = ranges.to_vec();
Expand Down
5 changes: 0 additions & 5 deletions src/prefix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,6 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
self.inner.put_multipart_opts(&full_path, opts).await
}

async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
let full_path = self.full_path(location);
self.inner.get_range(&full_path, range).await
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let full_path = self.full_path(location);
self.inner.get_opts(&full_path, options).await
Expand Down
11 changes: 0 additions & 11 deletions src/throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,17 +181,6 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
Ok(throttle_get(result, wait_get_per_byte))
}

async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
let config = self.config();

let sleep_duration =
config.wait_get_per_call + config.wait_get_per_byte * (range.end - range.start) as u32;

sleep(sleep_duration).await;

self.inner.get_range(location, range).await
}

async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
let config = self.config();

Expand Down