Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 56 additions & 2 deletions jetstreamer-firehose/src/archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@

use once_cell::sync::Lazy;
use reqwest::Url;
use std::env;
use std::{
env,
path::{Path, PathBuf},
};
use url::ParseError;

#[cfg(feature = "s3-backend")]
Expand Down Expand Up @@ -59,6 +62,13 @@ impl Location {
}
}

fn local(url: Url, base: PathBuf) -> Self {
Self {
url,
kind: LocationBackend::Local(base),
}
}

#[cfg(feature = "s3-backend")]
fn s3(url: Url, cfg: S3Location) -> Self {
Self {
Expand All @@ -77,6 +87,19 @@ impl Location {
matches!(self.kind, LocationBackend::Http)
}

/// Indicates whether this location uses the local filesystem backend.
pub const fn is_local(&self) -> bool {
matches!(self.kind, LocationBackend::Local(_))
}

/// Returns the base path for local filesystem locations.
pub fn as_local_path(&self) -> Option<&Path> {
match &self.kind {
LocationBackend::Local(path) => Some(path.as_path()),
_ => None,
}
}

/// Returns the S3-backed configuration if available.
#[cfg(feature = "s3-backend")]
pub fn as_s3(&self) -> Option<Arc<S3Location>> {
Expand All @@ -90,6 +113,7 @@ impl Location {
#[derive(Debug)]
enum LocationBackend {
Http,
Local(PathBuf),
#[cfg(feature = "s3-backend")]
S3(Arc<S3Location>),
}
Expand Down Expand Up @@ -136,7 +160,34 @@ fn resolve_location(kind: LocationKind) -> Result<Location, LocationError> {
}
}

let url = Url::parse(&raw).map_err(|err| LocationError::InvalidUrl(raw.clone(), err))?;
if raw.starts_with("file:") {
let url = Url::parse(&raw).map_err(|err| LocationError::InvalidUrl(raw.clone(), err))?;
let path = url
.to_file_path()
.map_err(|_| LocationError::InvalidFileUrl(raw.clone()))?;
return Ok(Location::local(url, path));
}

let url = match Url::parse(&raw) {
Ok(url) => url,
Err(err) => {
let maybe_path = Path::new(&raw);
if maybe_path.is_absolute() {
let url = Url::from_directory_path(maybe_path)
.map_err(|_| LocationError::InvalidUrl(raw.clone(), err))?;
return Ok(Location::local(url, maybe_path.to_path_buf()));
}
return Err(LocationError::InvalidUrl(raw.clone(), err));
}
};

if url.scheme() == "file" {
let path = url
.to_file_path()
.map_err(|_| LocationError::InvalidFileUrl(raw.clone()))?;
return Ok(Location::local(url, path));
}

Ok(Location::http(url))
}

Expand All @@ -161,6 +212,9 @@ pub enum LocationError {
/// S3 backend requested but crate built without support.
#[error("S3 backend requested but the crate was compiled without the `s3-backend` feature")]
S3FeatureDisabled,
/// File URL was provided but could not be converted into a path.
#[error("invalid file URL {0}")]
InvalidFileUrl(String),
}

#[cfg(feature = "s3-backend")]
Expand Down
117 changes: 116 additions & 1 deletion jetstreamer-firehose/src/epochs.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use reqwest::Client;
use rseek::Seekable;
use serde::Deserialize;
use std::{fmt, io, pin::Pin};
use std::{fmt, io, path::PathBuf, pin::Pin};
use tokio::io::{AsyncRead, AsyncSeek, BufReader, ReadBuf, SeekFrom};

use crate::archive;
Expand Down Expand Up @@ -104,6 +104,12 @@ pub async fn epoch_exists(epoch: u64, client: &Client) -> bool {
let location = archive::car_location();
let path = format!("{epoch}/epoch-{epoch}.car");

if let Some(base_path) = location.as_local_path() {
let local_path = base_path.join(&path);
let zst_path = car_zst_path(&local_path);
return local_path.exists() || zst_path.exists();
}

if location.is_http() {
let url = location
.url()
Expand Down Expand Up @@ -139,6 +145,15 @@ pub async fn fetch_epoch_stream(epoch: u64, client: &Client) -> EpochStream {
let location = archive::car_location();
let path = format!("{epoch}/epoch-{epoch}.car");

if let Some(base_path) = location.as_local_path() {
let local_path = base_path.join(&path);
let reader = open_local_car(&local_path)
.await
.unwrap_or_else(|err| panic!("failed to open local CAR for epoch {epoch}: {err}"));
let reader = BufReader::with_capacity(8 * 1024 * 1024, reader);
return EpochStream::new(reader);
}

if location.is_http() {
let url = location
.url()
Expand Down Expand Up @@ -166,6 +181,106 @@ pub async fn fetch_epoch_stream(epoch: u64, client: &Client) -> EpochStream {
);
}

async fn open_local_car(path: &PathBuf) -> Result<LocalFile, std::io::Error> {
if path.exists() {
return LocalFile::from_path(path).await;
}

let zst_path = car_zst_path(path);
if zst_path.exists() {
return LocalFile::from_compressed(&zst_path).await;
}

Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("no CAR or CAR.ZST found at {path:?}"),
))
}

fn car_zst_path(path: &PathBuf) -> PathBuf {
let mut zst_path = path.clone();
zst_path.set_extension("car.zst");
zst_path
}

struct LocalFile {
file: tokio::fs::File,
len: u64,
#[allow(dead_code)]
temp: Option<tempfile::TempPath>,
}

impl LocalFile {
async fn from_path(path: &PathBuf) -> Result<Self, std::io::Error> {
let file = tokio::fs::File::open(path).await?;
let len = file.metadata().await?.len();
Ok(Self {
file,
len,
temp: None,
})
}

async fn from_compressed(path: &PathBuf) -> Result<Self, std::io::Error> {
let (file, len, temp) = tokio::task::spawn_blocking({
let path = path.clone();
move || {
let source = std::fs::File::open(&path)?;
let mut decoder = zstd::stream::Decoder::new(source)?;
let temp = tempfile::Builder::new()
.prefix("jetstreamer-epoch-")
.suffix(".car")
.tempfile()?;
std::io::copy(&mut decoder, temp.as_file())?;
let len = temp.as_file().metadata()?.len();
let temp_path = temp.into_temp_path();
let output = std::fs::File::open(&temp_path)?;
Ok::<_, std::io::Error>((output, len, temp_path))
}
})
.await
.unwrap()?;

let file = tokio::fs::File::from_std(file);
Ok(Self {
file,
len,
temp: Some(temp),
})
}
}

impl AsyncRead for LocalFile {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut ReadBuf<'_>,
) -> std::task::Poll<io::Result<()>> {
Pin::new(&mut self.file).poll_read(cx, buf)
}
}

impl AsyncSeek for LocalFile {
fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> {
let this = unsafe { self.get_unchecked_mut() };
Pin::new(&mut this.file).start_seek(position)
}

fn poll_complete(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<io::Result<u64>> {
let this = unsafe { self.get_unchecked_mut() };
Pin::new(&mut this.file).poll_complete(cx)
}
}

impl Len for LocalFile {
fn len(&self) -> u64 {
self.len
}
}

#[cfg(feature = "s3-backend")]
type ReaderFuture =
Pin<Box<dyn Future<Output = io::Result<Pin<Box<dyn AsyncRead + Send>>>> + Send>>;
Expand Down
Loading