Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 10 additions & 10 deletions docs/developer-guide/internals/session.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ or other shared resources.

The session is built on two primitives from the `vortex-session` crate:

- **`VortexSession`** -- a cloneable, thread-safe map from Rust `TypeId` to a boxed value. Any
type that is `Send + Sync + Debug + 'static` can be stored as a session variable.
- **`VortexSession`** -- a cloneable, thread-safe map from Rust `TypeId` to a shared
(`Arc`-wrapped) value. Any type that is `Clone + Send + Sync + Debug + 'static` can be stored
as a session variable.
- **`Registry<T>`** -- a concurrent map from string IDs to values of type `T`, used by each
component to look up registered plugins at runtime.

Because `VortexSession` is backed by an `Arc<DashMap>`, cloning is cheap and all clones share
the same state. This makes it safe to hand the session to multiple threads, tasks, or I/O
operations without coordination.
Because `VortexSession` is backed by an `ArcSwap`, cloning is cheap and all clones share the
same state, with lock-free reads and copy-on-write writes. This makes it safe to hand the
session to multiple threads, tasks, or I/O operations without coordination.

## Component Registries

Expand Down Expand Up @@ -95,14 +96,13 @@ all built-in components and encodings:
let session = VortexSession::default();
```

For tests or specialized use-cases, sessions can be assembled from individual components using
the `.with::<T>()` builder:
For tests or specialized use-cases, sessions can be assembled from individual components by
starting from an empty session and chaining the `.with::<T>()` helpers:

```rust
let session = VortexSession::builder()
let session = VortexSession::empty()
.with::<ArraySession>()
.with::<LayoutSession>()
.with::<ScalarFnSession>()
.with::<RuntimeSession>()
.build();
.with::<RuntimeSession>();
```
5 changes: 2 additions & 3 deletions encodings/fastlanes/src/rle/kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ use crate::RLE;
use crate::rle::RLEArrayExt;

pub(crate) fn initialize(session: &VortexSession) {
session
.kernels()
.register_execute_parent_kernel(Slice.id(), RLE, SliceExecuteAdaptor(RLE));
let kernels = session.kernels();
kernels.register_execute_parent_kernel(Slice.id(), RLE, SliceExecuteAdaptor(RLE));
}

impl SliceKernel for RLE {
Expand Down
5 changes: 2 additions & 3 deletions encodings/zigzag/src/kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use vortex_session::VortexSession;
use crate::ZigZag;

pub(crate) fn initialize(session: &VortexSession) {
session
.kernels()
.register_execute_parent_kernel(Dict.id(), ZigZag, TakeExecuteAdaptor(ZigZag));
let kernels = session.kernels();
kernels.register_execute_parent_kernel(Dict.id(), ZigZag, TakeExecuteAdaptor(ZigZag));
}
2 changes: 1 addition & 1 deletion fuzz/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ mod native_runtime {
if vortex_cuda::cuda_available() {
use vortex_cuda::CudaSessionExt;
session = session.with::<vortex_cuda::CudaSession>();
vortex_cuda::initialize_cuda(session.cuda_session());
vortex_cuda::initialize_cuda(&session.cuda_session());
}
session
});
Expand Down
1 change: 1 addition & 0 deletions vortex-array/src/aggregate_fn/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ impl<V: AggregateFnVTable> DynAccumulator for Accumulator<V> {
mod tests {
use vortex_buffer::buffer;
use vortex_error::VortexResult;
use vortex_session::SessionExt;
use vortex_session::VortexSession;

use crate::ArrayRef;
Expand Down
5 changes: 2 additions & 3 deletions vortex-array/src/aggregate_fn/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,9 @@ mod tests {

#[test]
fn unknown_aggregate_fn_id_allow_unknown() {
let session = VortexSession::builder()
let session = VortexSession::empty()
.with::<AggregateFnSession>()
.allow_unknown()
.build();
.allow_unknown();

let proto = pb::AggregateFn {
id: "vortex.test.foreign_aggregate".to_string(),
Expand Down
3 changes: 2 additions & 1 deletion vortex-array/src/aggregate_fn/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::any::Any;
use std::sync::Arc;

use vortex_session::SessionExt;
use vortex_session::SessionGuard;
use vortex_session::SessionVar;

use crate::aggregate_fn::AggregateFnId;
Expand Down Expand Up @@ -221,7 +222,7 @@ impl AggregateFnSession {
/// Extension trait for accessing aggregate function session data.
pub trait AggregateFnSessionExt: SessionExt {
/// Returns the aggregate function session data.
fn aggregate_fns(&self) -> &AggregateFnSession {
fn aggregate_fns(&self) -> SessionGuard<'_, AggregateFnSession> {
self.get::<AggregateFnSession>()
}
}
Expand Down
5 changes: 2 additions & 3 deletions vortex-array/src/arrays/filter/kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ use crate::optimizer::kernels::ArrayKernelsExt;
use crate::optimizer::rules::ArrayParentReduceRule;

pub(crate) fn initialize(session: &VortexSession) {
session
.kernels()
.register_execute_parent_kernel(Dict.id(), Filter, TakeExecuteAdaptor(Filter));
let kernels = session.kernels();
kernels.register_execute_parent_kernel(Dict.id(), Filter, TakeExecuteAdaptor(Filter));
}

pub trait FilterReduce: VTable {
Expand Down
3 changes: 2 additions & 1 deletion vortex-array/src/arrays/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ pub use varbinview::VarBinViewArray;
pub mod variant;
pub use variant::Variant;
pub use variant::VariantArray;
use vortex_session::VortexSession;

pub(crate) fn initialize(session: &vortex_session::VortexSession) {
pub(crate) fn initialize(session: &VortexSession) {
bool::initialize(session);
chunked::initialize(session);
decimal::initialize(session);
Expand Down
8 changes: 4 additions & 4 deletions vortex-array/src/arrays/struct_/compute/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ use crate::scalar_fn::ScalarFnVTable;
use crate::scalar_fn::fns::cast::Cast;

pub(crate) fn initialize(session: &VortexSession) {
session
.kernels()
.register_execute_parent_kernel(Cast.id(), Struct, StructCastKernel);
let kernels = session.kernels();
kernels.register_execute_parent_kernel(Cast.id(), Struct, StructCastKernel);
}

#[derive(Debug)]
Expand Down Expand Up @@ -215,7 +214,8 @@ mod tests {
.unwrap();
let parent_id = cast.encoding_id();
let session = VortexSession::empty().with_some(KernelSession::empty());
session.kernels().register_execute_parent(
let kernels = session.kernels();
kernels.register_execute_parent(
parent_id,
child_id,
&[null_struct_cast_execute_parent as ExecuteParentFn],
Expand Down
5 changes: 2 additions & 3 deletions vortex-array/src/arrays/struct_/vtable/kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use crate::scalar_fn::fns::zip::Zip;
use crate::scalar_fn::fns::zip::ZipExecuteAdaptor;

pub(crate) fn initialize(session: &VortexSession) {
session
.kernels()
.register_execute_parent_kernel(Zip.id(), Struct, ZipExecuteAdaptor(Struct));
let kernels = session.kernels();
kernels.register_execute_parent_kernel(Zip.id(), Struct, ZipExecuteAdaptor(Struct));
}
5 changes: 2 additions & 3 deletions vortex-array/src/arrays/variant/vtable/kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ use crate::scalar_fn::fns::variant_get::VariantPath;
use crate::scalar_fn::fns::variant_get::VariantPathElement;

pub(crate) fn initialize(session: &VortexSession) {
session
.kernels()
.register_execute_parent_kernel(VariantGet.id(), Variant, VariantGetKernel);
let kernels = session.kernels();
kernels.register_execute_parent_kernel(VariantGet.id(), Variant, VariantGetKernel);
}

#[derive(Default, Debug)]
Expand Down
5 changes: 3 additions & 2 deletions vortex-array/src/arrow/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_error::vortex_ensure;
use vortex_session::SessionExt;
use vortex_session::SessionGuard;
use vortex_session::SessionVar;
use vortex_session::registry::Id;

Expand Down Expand Up @@ -609,11 +610,11 @@ impl SessionVar for ArrowSession {
/// Extension trait for accessing the [`ArrowSession`] on a Vortex session.
pub trait ArrowSessionExt: SessionExt {
/// Get the Arrow session.
fn arrow(&self) -> &ArrowSession;
fn arrow(&self) -> SessionGuard<'_, ArrowSession>;
}

impl<S: SessionExt> ArrowSessionExt for S {
fn arrow(&self) -> &ArrowSession {
fn arrow(&self) -> SessionGuard<'_, ArrowSession> {
self.get::<ArrowSession>()
}
}
Expand Down
5 changes: 3 additions & 2 deletions vortex-array/src/dtype/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::any::Any;
use std::sync::Arc;

use vortex_session::SessionExt;
use vortex_session::SessionGuard;
use vortex_session::SessionVar;
use vortex_session::registry::Registry;

Expand Down Expand Up @@ -68,11 +69,11 @@ impl DTypeSession {
/// Extension trait for accessing the DType session.
pub trait DTypeSessionExt: SessionExt {
/// Get the DType session.
fn dtypes(&self) -> &DTypeSession;
fn dtypes(&self) -> SessionGuard<'_, DTypeSession>;
}

impl<S: SessionExt> DTypeSessionExt for S {
fn dtypes(&self) -> &DTypeSession {
fn dtypes(&self) -> SessionGuard<'_, DTypeSession> {
self.get::<DTypeSession>()
}
}
8 changes: 3 additions & 5 deletions vortex-array/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,10 +333,7 @@ impl ExecutionCtx {
/// registered after this context is created are not visible to it; create a new
/// [`ExecutionCtx`] after registration to use newly registered kernels.
pub fn new(session: VortexSession) -> Self {
let execute_parent_kernels = session
.kernels_opt()
.map(|kernels| kernels.execute_parent_snapshot())
.unwrap_or_default();
let execute_parent_kernels = session.kernels().execute_parent_snapshot();
Self {
session,
execute_parent_kernels,
Expand Down Expand Up @@ -888,7 +885,8 @@ mod tests {
.contains_key(&key)
);

session.kernels().register_execute_parent(
let kernels = session.kernels();
kernels.register_execute_parent(
Bool.id(),
Primitive.id(),
&[noop_execute_parent as ExecuteParentFn],
Expand Down
7 changes: 3 additions & 4 deletions vortex-array/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,14 @@ pub use mask_future::*;
pub use metadata::*;
pub use smallvec;
pub use vortex_array_macros::array_slots;
use vortex_session::SessionExt;
use vortex_session::VortexSession;
use vortex_session::registry::Context;

use crate::aggregate_fn::session::AggregateFnSession;
use crate::arrow::ArrowSession;
use crate::dtype::session::DTypeSession;
use crate::memory::MemorySession;
use crate::optimizer::kernels::ArrayKernelsExt;
use crate::optimizer::kernels::KernelSession;
use crate::scalar_fn::session::ScalarFnSession;
use crate::session::ArraySession;
Expand Down Expand Up @@ -157,7 +157,7 @@ pub mod flatbuffers {
/// If the session contains a [`KernelSession`], this registers into its registry. Sessions that use
/// [`KernelSession::default`] already receive these built-in kernels.
pub fn initialize(session: &VortexSession) {
if session.kernels_opt().is_some() {
if session.get_opt::<KernelSession>().is_some() {
arrays::initialize(session);
}
}
Expand All @@ -170,7 +170,7 @@ pub fn initialize(session: &VortexSession) {
/// additional encodings or kernels into it without affecting any other session. This does not
/// register file, layout, or runtime state — those live in higher-level crates.
pub fn array_session() -> VortexSession {
VortexSession::builder()
VortexSession::empty()
.with::<ArraySession>()
.with::<KernelSession>()
.with::<DTypeSession>()
Expand All @@ -179,7 +179,6 @@ pub fn array_session() -> VortexSession {
.with::<AggregateFnSession>()
.with::<ArrowSession>()
.with::<MemorySession>()
.build()
}

// TODO(ngates): canonicalize doesn't currently take a session, therefore we cannot invoke execute
Expand Down
12 changes: 7 additions & 5 deletions vortex-array/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use vortex_error::VortexResult;
use vortex_error::vortex_ensure;
use vortex_error::vortex_err;
use vortex_session::SessionExt;
use vortex_session::SessionGuard;
use vortex_session::SessionVar;
use vortex_session::VortexSession;

Expand Down Expand Up @@ -213,7 +214,7 @@ impl SessionVar for MemorySession {
/// Extension trait for accessing session-scoped memory configuration.
pub trait MemorySessionExt: SessionExt {
/// Returns the memory session for this execution/session context.
fn memory(&self) -> &MemorySession {
fn memory(&self) -> SessionGuard<'_, MemorySession> {
self.get::<MemorySession>()
}

Expand All @@ -222,11 +223,12 @@ pub trait MemorySessionExt: SessionExt {
self.memory().allocator()
}

/// Returns a new session configured to use `allocator` as its host allocator.
/// Configures the session to use `allocator` as its host allocator, mutating it in place and
/// returning it for chaining.
fn with_allocator(self, allocator: HostAllocatorRef) -> VortexSession {
let mut builder = self.session().to_builder();
builder.get_mut::<MemorySession>().set_allocator(allocator);
builder.build()
let session = self.session();
session.get_mut::<MemorySession>().set_allocator(allocator);
session
}
}

Expand Down
39 changes: 26 additions & 13 deletions vortex-array/src/optimizer/kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ use std::any::Any;
use std::borrow::Borrow;
use std::fmt::Debug;
use std::hash::BuildHasher;
use std::ops::Deref;
use std::sync::Arc;
use std::sync::LazyLock;

use vortex_error::VortexResult;
use vortex_error::vortex_panic;
use vortex_session::SessionExt;
use vortex_session::SessionGuard;
use vortex_session::SessionVar;
use vortex_session::VortexSession;
use vortex_session::registry::Id;
Expand Down Expand Up @@ -316,6 +317,17 @@ impl KernelSession {
}
}

/// Derefs to the held [`ArrayKernels`] registry, so a [`KernelSession`] (or a
/// [`SessionGuard<KernelSession>`](SessionGuard) read from a session) can be used wherever an
/// `&ArrayKernels` is expected.
impl Deref for KernelSession {
type Target = ArrayKernels;

fn deref(&self) -> &ArrayKernels {
&self.kernels
}
}

impl Default for KernelSession {
fn default() -> Self {
// `ArrayKernels::default` installs the built-in parent-reduce kernels. The execute-parent
Expand Down Expand Up @@ -343,17 +355,16 @@ impl SessionVar for KernelSession {

/// Extension trait for accessing the optimizer kernel registry from a [`VortexSession`].
pub trait ArrayKernelsExt: SessionExt {
/// Returns the active [`ArrayKernels`] registry if the session contains a [`KernelSession`].
fn kernels_opt(&self) -> Option<&ArrayKernels> {
self.get_opt::<KernelSession>().map(KernelSession::kernels)
}

/// Returns the active [`ArrayKernels`] registry.
/// Returns the session's [`KernelSession`], inserting a default one (with the built-in
/// kernels) if it does not exist.
///
/// Panics if the session does not contain a [`KernelSession`].
fn kernels(&self) -> &ArrayKernels {
self.kernels_opt()
.unwrap_or_else(|| vortex_panic!("Session does not contain a KernelSession"))
/// The returned [`SessionGuard`] borrows the session snapshot it was read from (so the registry
/// stays alive even if the session is concurrently mutated) and derefs through [`KernelSession`]
/// to the [`ArrayKernels`] registry, so it can be used wherever an `&ArrayKernels` is expected.
/// The registry shares its storage with the session, so kernels registered through it remain
/// visible to the session.
fn kernels(&self) -> SessionGuard<'_, KernelSession> {
self.get::<KernelSession>()
}
}

Expand Down Expand Up @@ -389,9 +400,11 @@ mod tests {
}

#[test]
fn kernels_opt_is_none_without_kernel_session() {
fn kernels_inserts_default_kernel_session() {
let session = VortexSession::empty();

assert!(session.kernels_opt().is_none());
// `kernels()` uses `get`, so it inserts a default `KernelSession` (with the built-in
// kernels) rather than returning `None`.
assert!(session.kernels().has_execute_parent(Binary.id(), Bool.id()));
}
}
Loading
Loading