Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 10 additions & 10 deletions docs/developer-guide/internals/session.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ or other shared resources.

The session is built on two primitives from the `vortex-session` crate:

- **`VortexSession`** -- a cloneable, thread-safe map from Rust `TypeId` to a boxed value. Any
type that is `Send + Sync + Debug + 'static` can be stored as a session variable.
- **`VortexSession`** -- a cloneable, thread-safe map from Rust `TypeId` to a shared
(`Arc`-wrapped) value. Any type that is `Clone + Send + Sync + Debug + 'static` can be stored
as a session variable.
- **`Registry<T>`** -- a concurrent map from string IDs to values of type `T`, used by each
component to look up registered plugins at runtime.

Because `VortexSession` is backed by an `Arc<DashMap>`, cloning is cheap and all clones share
the same state. This makes it safe to hand the session to multiple threads, tasks, or I/O
operations without coordination.
Because `VortexSession` is backed by an `ArcSwap`, cloning is cheap and all clones share the
same state, with lock-free reads and copy-on-write writes. This makes it safe to hand the
session to multiple threads, tasks, or I/O operations without coordination.

## Component Registries

Expand Down Expand Up @@ -95,14 +96,13 @@ all built-in components and encodings:
let session = VortexSession::default();
```

For tests or specialized use-cases, sessions can be assembled from individual components using
the `.with::<T>()` builder:
For tests or specialized use-cases, sessions can be assembled from individual components by
starting from an empty session and chaining the `.with::<T>()` helpers:

```rust
let session = VortexSession::builder()
let session = VortexSession::empty()
.with::<ArraySession>()
.with::<LayoutSession>()
.with::<ScalarFnSession>()
.with::<RuntimeSession>()
.build();
.with::<RuntimeSession>();
```
5 changes: 2 additions & 3 deletions encodings/fastlanes/src/rle/kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ use crate::RLE;
use crate::rle::RLEArrayExt;

pub(crate) fn initialize(session: &VortexSession) {
session
.kernels()
.register_execute_parent_kernel(Slice.id(), RLE, SliceExecuteAdaptor(RLE));
let kernels = session.kernels();
kernels.register_execute_parent_kernel(Slice.id(), RLE, SliceExecuteAdaptor(RLE));
}

impl SliceKernel for RLE {
Expand Down
5 changes: 2 additions & 3 deletions encodings/zigzag/src/kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use vortex_session::VortexSession;
use crate::ZigZag;

pub(crate) fn initialize(session: &VortexSession) {
session
.kernels()
.register_execute_parent_kernel(Dict.id(), ZigZag, TakeExecuteAdaptor(ZigZag));
let kernels = session.kernels();
kernels.register_execute_parent_kernel(Dict.id(), ZigZag, TakeExecuteAdaptor(ZigZag));
}
1 change: 1 addition & 0 deletions vortex-array/src/aggregate_fn/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ impl<V: AggregateFnVTable> DynAccumulator for Accumulator<V> {
mod tests {
use vortex_buffer::buffer;
use vortex_error::VortexResult;
use vortex_session::SessionExt;
use vortex_session::VortexSession;

use crate::ArrayRef;
Expand Down
5 changes: 2 additions & 3 deletions vortex-array/src/aggregate_fn/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,9 @@ mod tests {

#[test]
fn unknown_aggregate_fn_id_allow_unknown() {
let session = VortexSession::builder()
let session = VortexSession::empty()
.with::<AggregateFnSession>()
.allow_unknown()
.build();
.allow_unknown();

let proto = pb::AggregateFn {
id: "vortex.test.foreign_aggregate".to_string(),
Expand Down
3 changes: 2 additions & 1 deletion vortex-array/src/aggregate_fn/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::any::Any;
use std::sync::Arc;

use vortex_session::SessionExt;
use vortex_session::SessionGuard;
use vortex_session::SessionVar;

use crate::aggregate_fn::AggregateFnId;
Expand Down Expand Up @@ -221,7 +222,7 @@ impl AggregateFnSession {
/// Extension trait for accessing aggregate function session data.
pub trait AggregateFnSessionExt: SessionExt {
/// Returns the aggregate function session data.
fn aggregate_fns(&self) -> &AggregateFnSession {
fn aggregate_fns(&self) -> SessionGuard<'_, AggregateFnSession> {
self.get::<AggregateFnSession>()
}
}
Expand Down
5 changes: 2 additions & 3 deletions vortex-array/src/arrays/filter/kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ use crate::optimizer::kernels::ArrayKernelsExt;
use crate::optimizer::rules::ArrayParentReduceRule;

pub(crate) fn initialize(session: &VortexSession) {
session
.kernels()
.register_execute_parent_kernel(Dict.id(), Filter, TakeExecuteAdaptor(Filter));
let kernels = session.kernels();
kernels.register_execute_parent_kernel(Dict.id(), Filter, TakeExecuteAdaptor(Filter));
}

pub trait FilterReduce: VTable {
Expand Down
8 changes: 4 additions & 4 deletions vortex-array/src/arrays/struct_/compute/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ use crate::scalar_fn::ScalarFnVTable;
use crate::scalar_fn::fns::cast::Cast;

pub(crate) fn initialize(session: &VortexSession) {
session
.kernels()
.register_execute_parent_kernel(Cast.id(), Struct, StructCastKernel);
let kernels = session.kernels();
kernels.register_execute_parent_kernel(Cast.id(), Struct, StructCastKernel);
}

#[derive(Debug)]
Expand Down Expand Up @@ -215,7 +214,8 @@ mod tests {
.unwrap();
let parent_id = cast.encoding_id();
let session = VortexSession::empty().with_some(KernelSession::empty());
session.kernels().register_execute_parent(
let kernels = session.kernels();
kernels.register_execute_parent(
parent_id,
child_id,
&[null_struct_cast_execute_parent as ExecuteParentFn],
Expand Down
5 changes: 2 additions & 3 deletions vortex-array/src/arrays/struct_/vtable/kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use crate::scalar_fn::fns::zip::Zip;
use crate::scalar_fn::fns::zip::ZipExecuteAdaptor;

pub(crate) fn initialize(session: &VortexSession) {
session
.kernels()
.register_execute_parent_kernel(Zip.id(), Struct, ZipExecuteAdaptor(Struct));
let kernels = session.kernels();
kernels.register_execute_parent_kernel(Zip.id(), Struct, ZipExecuteAdaptor(Struct));
}
5 changes: 2 additions & 3 deletions vortex-array/src/arrays/variant/vtable/kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ use crate::scalar_fn::fns::variant_get::VariantPath;
use crate::scalar_fn::fns::variant_get::VariantPathElement;

pub(crate) fn initialize(session: &VortexSession) {
session
.kernels()
.register_execute_parent_kernel(VariantGet.id(), Variant, VariantGetKernel);
let kernels = session.kernels();
kernels.register_execute_parent_kernel(VariantGet.id(), Variant, VariantGetKernel);
}

#[derive(Default, Debug)]
Expand Down
5 changes: 3 additions & 2 deletions vortex-array/src/arrow/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_error::vortex_ensure;
use vortex_session::SessionExt;
use vortex_session::SessionGuard;
use vortex_session::SessionVar;
use vortex_session::registry::Id;

Expand Down Expand Up @@ -609,11 +610,11 @@ impl SessionVar for ArrowSession {
/// Extension trait for accessing the [`ArrowSession`] on a Vortex session.
pub trait ArrowSessionExt: SessionExt {
/// Get the Arrow session.
fn arrow(&self) -> &ArrowSession;
fn arrow(&self) -> SessionGuard<'_, ArrowSession>;
}

impl<S: SessionExt> ArrowSessionExt for S {
fn arrow(&self) -> &ArrowSession {
fn arrow(&self) -> SessionGuard<'_, ArrowSession> {
self.get::<ArrowSession>()
}
}
Expand Down
5 changes: 3 additions & 2 deletions vortex-array/src/dtype/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::any::Any;
use std::sync::Arc;

use vortex_session::SessionExt;
use vortex_session::SessionGuard;
use vortex_session::SessionVar;
use vortex_session::registry::Registry;

Expand Down Expand Up @@ -68,11 +69,11 @@ impl DTypeSession {
/// Extension trait for accessing the DType session.
pub trait DTypeSessionExt: SessionExt {
/// Get the DType session.
fn dtypes(&self) -> &DTypeSession;
fn dtypes(&self) -> SessionGuard<'_, DTypeSession>;
}

impl<S: SessionExt> DTypeSessionExt for S {
fn dtypes(&self) -> &DTypeSession {
fn dtypes(&self) -> SessionGuard<'_, DTypeSession> {
self.get::<DTypeSession>()
}
}
8 changes: 3 additions & 5 deletions vortex-array/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,10 +333,7 @@ impl ExecutionCtx {
/// registered after this context is created are not visible to it; create a new
/// [`ExecutionCtx`] after registration to use newly registered kernels.
pub fn new(session: VortexSession) -> Self {
let execute_parent_kernels = session
.kernels_opt()
.map(|kernels| kernels.execute_parent_snapshot())
.unwrap_or_default();
let execute_parent_kernels = session.kernels().execute_parent_snapshot();
Self {
session,
execute_parent_kernels,
Expand Down Expand Up @@ -888,7 +885,8 @@ mod tests {
.contains_key(&key)
);

session.kernels().register_execute_parent(
let kernels = session.kernels();
kernels.register_execute_parent(
Bool.id(),
Primitive.id(),
&[noop_execute_parent as ExecuteParentFn],
Expand Down
7 changes: 3 additions & 4 deletions vortex-array/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,14 @@ pub use mask_future::*;
pub use metadata::*;
pub use smallvec;
pub use vortex_array_macros::array_slots;
use vortex_session::SessionExt;
use vortex_session::VortexSession;
use vortex_session::registry::Context;

use crate::aggregate_fn::session::AggregateFnSession;
use crate::arrow::ArrowSession;
use crate::dtype::session::DTypeSession;
use crate::memory::MemorySession;
use crate::optimizer::kernels::ArrayKernelsExt;
use crate::optimizer::kernels::KernelSession;
use crate::scalar_fn::session::ScalarFnSession;
use crate::session::ArraySession;
Expand Down Expand Up @@ -157,7 +157,7 @@ pub mod flatbuffers {
/// If the session contains a [`KernelSession`], this registers into its registry. Sessions that use
/// [`KernelSession::default`] already receive these built-in kernels.
pub fn initialize(session: &VortexSession) {
if session.kernels_opt().is_some() {
if session.get_opt::<KernelSession>().is_some() {
arrays::initialize(session);
}
}
Expand All @@ -170,7 +170,7 @@ pub fn initialize(session: &VortexSession) {
/// additional encodings or kernels into it without affecting any other session. This does not
/// register file, layout, or runtime state — those live in higher-level crates.
pub fn array_session() -> VortexSession {
VortexSession::builder()
VortexSession::empty()
.with::<ArraySession>()
.with::<KernelSession>()
.with::<DTypeSession>()
Expand All @@ -179,7 +179,6 @@ pub fn array_session() -> VortexSession {
.with::<AggregateFnSession>()
.with::<ArrowSession>()
.with::<MemorySession>()
.build()
}

// TODO(ngates): canonicalize doesn't currently take a session, therefore we cannot invoke execute
Expand Down
12 changes: 7 additions & 5 deletions vortex-array/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use vortex_error::VortexResult;
use vortex_error::vortex_ensure;
use vortex_error::vortex_err;
use vortex_session::SessionExt;
use vortex_session::SessionGuard;
use vortex_session::SessionVar;
use vortex_session::VortexSession;

Expand Down Expand Up @@ -213,7 +214,7 @@ impl SessionVar for MemorySession {
/// Extension trait for accessing session-scoped memory configuration.
pub trait MemorySessionExt: SessionExt {
/// Returns the memory session for this execution/session context.
fn memory(&self) -> &MemorySession {
fn memory(&self) -> SessionGuard<'_, MemorySession> {
self.get::<MemorySession>()
}

Expand All @@ -222,11 +223,12 @@ pub trait MemorySessionExt: SessionExt {
self.memory().allocator()
}

/// Returns a new session configured to use `allocator` as its host allocator.
/// Configures the session to use `allocator` as its host allocator, mutating it in place and
/// returning it for chaining.
fn with_allocator(self, allocator: HostAllocatorRef) -> VortexSession {
let mut builder = self.session().to_builder();
builder.get_mut::<MemorySession>().set_allocator(allocator);
builder.build()
let session = self.session();
session.get_mut::<MemorySession>().set_allocator(allocator);
session
}
}

Expand Down
39 changes: 26 additions & 13 deletions vortex-array/src/optimizer/kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ use std::any::Any;
use std::borrow::Borrow;
use std::fmt::Debug;
use std::hash::BuildHasher;
use std::ops::Deref;
use std::sync::Arc;
use std::sync::LazyLock;

use vortex_error::VortexResult;
use vortex_error::vortex_panic;
use vortex_session::SessionExt;
use vortex_session::SessionGuard;
use vortex_session::SessionVar;
use vortex_session::VortexSession;
use vortex_session::registry::Id;
Expand Down Expand Up @@ -316,6 +317,17 @@ impl KernelSession {
}
}

/// Derefs to the held [`ArrayKernels`] registry, so a [`KernelSession`] (or a
/// [`SessionGuard<KernelSession>`](SessionGuard) read from a session) can be used wherever an
/// `&ArrayKernels` is expected.
impl Deref for KernelSession {
type Target = ArrayKernels;

fn deref(&self) -> &ArrayKernels {
&self.kernels
}
}

impl Default for KernelSession {
fn default() -> Self {
// `ArrayKernels::default` installs the built-in parent-reduce kernels. The execute-parent
Expand Down Expand Up @@ -343,17 +355,16 @@ impl SessionVar for KernelSession {

/// Extension trait for accessing the optimizer kernel registry from a [`VortexSession`].
pub trait ArrayKernelsExt: SessionExt {
/// Returns the active [`ArrayKernels`] registry if the session contains a [`KernelSession`].
fn kernels_opt(&self) -> Option<&ArrayKernels> {
self.get_opt::<KernelSession>().map(KernelSession::kernels)
}

/// Returns the active [`ArrayKernels`] registry.
/// Returns the session's [`KernelSession`], inserting a default one (with the built-in
/// kernels) if it does not exist.
///
/// Panics if the session does not contain a [`KernelSession`].
fn kernels(&self) -> &ArrayKernels {
self.kernels_opt()
.unwrap_or_else(|| vortex_panic!("Session does not contain a KernelSession"))
/// The returned [`SessionGuard`] borrows the session snapshot it was read from (so the registry
/// stays alive even if the session is concurrently mutated) and derefs through [`KernelSession`]
/// to the [`ArrayKernels`] registry, so it can be used wherever an `&ArrayKernels` is expected.
/// The registry shares its storage with the session, so kernels registered through it remain
/// visible to the session.
fn kernels(&self) -> SessionGuard<'_, KernelSession> {
self.get::<KernelSession>()
}
}

Expand Down Expand Up @@ -389,9 +400,11 @@ mod tests {
}

#[test]
fn kernels_opt_is_none_without_kernel_session() {
fn kernels_inserts_default_kernel_session() {
let session = VortexSession::empty();

assert!(session.kernels_opt().is_none());
// `kernels()` uses `get`, so it inserts a default `KernelSession` (with the built-in
// kernels) rather than returning `None`.
assert!(session.kernels().has_execute_parent(Binary.id(), Bool.id()));
}
}
2 changes: 1 addition & 1 deletion vortex-array/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ fn try_optimize(
) -> VortexResult<Option<ArrayRef>> {
let mut current_array = array.clone();
let mut any_optimizations = false;
let array_ref = session.and_then(|s| s.kernels_opt());
let array_ref = session.map(|s| s.kernels());

// Apply reduction rules to the current array until no more rules apply.
let mut loop_counter = 0;
Expand Down
Loading
Loading