Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
50623b5
Proof of concept
koerberm Nov 23, 2021
f1cda0f
Merge branch 'master' into executor
koerberm Dec 1, 2021
ad65b85
Adapted to new executor api
koerberm Dec 10, 2021
e03b9fd
Merge branch 'master' into executor
koerberm Dec 10, 2021
b622945
First draft of executor integration
koerberm Dec 10, 2021
f85fedc
Merge branch 'master' into executor
koerberm Jan 10, 2022
ec09bc3
removed clap dependency;
koerberm Jan 10, 2022
1ebc788
Clarified error message when returning a stream proxied by the executor;
koerberm Jan 10, 2022
30a6c58
Made executor API available in PRO only
koerberm Jan 12, 2022
7cd7fd0
Removed executor/task manager from non-pro version;
koerberm Jan 14, 2022
d5d159b
Fixed pro import;
koerberm Jan 14, 2022
7efd9fe
Introduced ReceiverStream<T> to the replay channel;
koerberm Jan 14, 2022
e887daa
Added test for ReceiverStream<T>
koerberm Jan 14, 2022
7894ba5
Update operators/src/pro/executor/mod.rs
koerberm Jan 17, 2022
714d3d5
reverted commit suggestion;
koerberm Jan 17, 2022
11819ad
Made unreachable error explicit;
koerberm Jan 17, 2022
6be3632
Completed test with dropped consumer;
koerberm Jan 17, 2022
eb1e3e9
Clippy & Typo;
koerberm Jan 19, 2022
b3cce36
Added docs
koerberm Jan 19, 2022
f9b207e
Made replay channel queue size configurable;
koerberm Jan 19, 2022
c23bad5
Merge remote-tracking branch 'origin/master' into executor
koerberm Jan 26, 2022
937b37d
tokio time feature now in regular dependencies;
koerberm Jan 26, 2022
1bc31f9
Merge branch 'spatial-resolution-fix' into executor
koerberm Jan 26, 2022
fe648d3
Draft for adding the QueryRectangle to the executor key;
koerberm Jan 26, 2022
3c89f70
Revert "Draft for adding the QueryRectangle to the executor key;"
koerberm Jan 28, 2022
377a1ad
Second draft for adding the QueryRectangle to the Executor Key;
koerberm Jan 28, 2022
0d4a097
Merge branch 'master' into executor
koerberm Jan 28, 2022
d45ebaa
Executor: First draft for advanced result sharing (recovered previous…
koerberm Feb 9, 2022
f34062b
Merge branch 'master' into executor
koerberm Feb 9, 2022
a6dc6c6
Added ExecutorDescriptions for Vector data;
koerberm Feb 9, 2022
23e1c4d
Fixed SpatialPartition2D::contains: Ensuring that equal instances con…
koerberm Feb 14, 2022
82d40b2
Implementer Intersects<BoundingBox2D> for all GeometryRef types;
koerberm Feb 14, 2022
851cea0
Implemented Intersects<BoundingBox2D> and Intersects<TimeInterval> fo…
koerberm Feb 14, 2022
d24134f
Added Debug trait to ExecutorTaskDescription, refined logging and docs;
koerberm Feb 14, 2022
f57d65d
New trait 'OneshotQueryProcessor' to turn a QueryProcessor into a 'st…
koerberm Feb 14, 2022
115a267
Added new png-rendering which directly consumes a stream of tiles;
koerberm Feb 14, 2022
016b0d5
Implemented ExecutorTaskDescriptions for Plots, Vector- and Raster st…
koerberm Feb 14, 2022
b4a6c0d
Added executors for all data types to the TaskManager;
koerberm Feb 14, 2022
865840e
Implemented Pro-Handlers for wms and wfs that utilize the correspondi…
koerberm Feb 14, 2022
0ecee9a
Fixed intersects implementations of primitives and added tests;
koerberm Feb 14, 2022
9534040
Added tests for OneshotQueryProcessor implementations;
koerberm Feb 14, 2022
4a40f0b
Added tests for executor task descriptions
koerberm Feb 14, 2022
c28f6f3
clippy;
koerberm Feb 14, 2022
04d1607
Merge branch 'master' into executor
koerberm Feb 14, 2022
6efff61
Snafu
koerberm Feb 14, 2022
448c9c2
Renamed can_join -> is_contained_in in ExecutorTaskDescription
koerberm Feb 28, 2022
693a17d
Introduced constant for the Executor task queue size;
koerberm Feb 28, 2022
6bd7fad
Clarified that a submitted future always returns a result;
koerberm Feb 28, 2022
4b83dea
Merge branch 'master' into executor
koerberm Feb 28, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions operators/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ uuid = { version = "0.8", features = ["serde", "v4", "v5"] }

[dev-dependencies]
geo-rand = { git = "https://github.com/lelongg/geo-rand", tag = "v0.3.0" }
tokio-util = "0.6"
rand = "0.8"
tempfile = "3.1"

Expand Down
11 changes: 11 additions & 0 deletions operators/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,11 @@ pub enum Error {
Statistics {
source: crate::util::statistics::StatisticsError,
},

#[snafu(display("Executor error: {}", source))]
Executor {
source: crate::executor::error::ExecutorError,
},
}

impl From<geoengine_datatypes::error::Error> for Error {
Expand Down Expand Up @@ -353,3 +358,9 @@ impl From<crate::util::statistics::StatisticsError> for Error {
Error::Statistics { source }
}
}

impl From<crate::executor::error::ExecutorError> for Error {
fn from(source: crate::executor::error::ExecutorError) -> Self {
Self::Executor { source }
}
}
39 changes: 39 additions & 0 deletions operators/src/executor/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use snafu::Snafu;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::oneshot::error::RecvError;
use tokio::task::JoinError;

pub type Result<T> = std::result::Result<T, ExecutorError>;

#[derive(Debug, Clone, Snafu)]
pub enum ExecutorError {
Submission { message: String },
Panic,
Cancelled,
}

impl From<JoinError> for ExecutorError {
fn from(src: JoinError) -> Self {
if src.is_cancelled() {
ExecutorError::Cancelled
} else {
ExecutorError::Panic
}
}
}

impl<T> From<SendError<T>> for ExecutorError {
fn from(e: SendError<T>) -> Self {
Self::Submission {
message: e.to_string(),
}
}
}

impl From<RecvError> for ExecutorError {
fn from(e: RecvError) -> Self {
Self::Submission {
message: e.to_string(),
}
}
}
106 changes: 106 additions & 0 deletions operators/src/executor/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
use error::Result;
use futures::{Future, Stream, StreamExt};
use std::hash::Hash;
use std::marker::PhantomData;
use std::sync::Arc;

pub mod error;

pub struct Executor<Key, T>
where
Key: Hash + Clone + Eq + Send + 'static,
T: Sync + Send + 'static,
{
_pk: PhantomData<Key>,
_pv: PhantomData<T>,
}

impl<Key, T> Executor<Key, T>
where
Key: Hash + Clone + Eq + Send + 'static,
T: Sync + Send + 'static,
{
/// Creates a new `Executor` instance, ready to serve computations. The buffer
/// size determines how much elements are at most kept in memory per computation.
pub fn new() -> Executor<Key, T> {
Executor {
_pk: Default::default(),
_pv: Default::default(),
}
}

/// Submits a streaming computation to this executor. In contrast
/// to `Executor.submit_stream`, this method returns a Stream of
/// `Arc<T>` that allows to use the executor with non-cloneable
/// results.
///
/// #Errors
/// This call fails, if the `Executor` was already closed.
pub async fn submit_stream_ref<F>(
&self,
_key: &Key,
stream: F,
) -> Result<impl Stream<Item = Arc<T>>>
where
F: Stream<Item = T> + Send + 'static,
{
Ok(stream.map(|x| Arc::new(x)))
}

/// Submits a single-result computation to this executor. In contrast
/// to `Executor.submit`, this method returns an
/// `Arc<T>` that allows to use the executor with non-cloneable
/// results.
///
/// #Errors
/// This call fails, if the `Executor` was already closed.
pub async fn submit_ref<F>(&self, _key: &Key, f: F) -> Result<Arc<T>>
where
F: Future<Output = T> + Send + 'static,
{
Ok(Arc::new(f.await))
}

pub async fn close(self) -> Result<()> {
Ok(())
}
}

impl<Key, T> Executor<Key, T>
where
Key: Hash + Clone + Eq + Send + 'static,
T: Clone + Sync + Send + 'static,
{
/// Submits a streaming computation to this executor. This method
/// returns a stream providing the results of the original (given) stream.
///
/// #Errors
/// This call fails, if the `Executor` was already closed.
pub async fn submit_stream<F>(&self, _key: &Key, stream: F) -> Result<impl Stream<Item = T>>
where
F: Stream<Item = T> + Send + 'static,
{
Ok(stream)
}

/// Submits a single-result computation to this executor.
///
/// #Errors
/// This call fails, if the `Executor` was already closed.
pub async fn submit<F>(&self, _key: &Key, f: F) -> Result<T>
where
F: Future<Output = T> + Send + 'static,
{
Ok(f.await)
}
}

impl<Key, T> Default for Executor<Key, T>
where
Key: Hash + Clone + Eq + Send + 'static,
T: Sync + Send + 'static,
{
fn default() -> Self {
Self::new()
}
}
1 change: 1 addition & 0 deletions operators/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub mod adapters;
#[macro_use]
pub mod engine;
pub mod error;
pub mod executor;
pub mod mock;
pub mod opencl;
pub mod plot;
Expand Down
Loading