From 86c74051b5a546254870f9b2cddf00732cc04832 Mon Sep 17 00:00:00 2001 From: Antonio Date: Wed, 19 Feb 2025 16:15:54 +0100 Subject: [PATCH] chore: update bounds on core traits (#46) --- examples/ping_pong/src/main.rs | 6 +- examples/ping_pong/src/operators.rs | 11 +-- examples/ping_pong/src/service_ping.rs | 13 ++- examples/ping_pong/src/service_pong.rs | 9 +- overwatch-derive/src/lib.rs | 9 +- overwatch-rs/src/lib.rs | 22 ++++- overwatch-rs/src/overwatch/commands.rs | 11 +-- overwatch-rs/src/overwatch/handle.rs | 25 +++-- overwatch-rs/src/overwatch/life_cycle.rs | 3 +- overwatch-rs/src/overwatch/mod.rs | 57 ++++++----- overwatch-rs/src/services/handle.rs | 96 +++++++++--------- overwatch-rs/src/services/life_cycle.rs | 3 +- overwatch-rs/src/services/mod.rs | 14 ++- overwatch-rs/src/services/relay.rs | 59 ++++++----- overwatch-rs/src/services/settings.rs | 30 +++--- overwatch-rs/src/services/state.rs | 121 +++++++++++------------ overwatch-rs/src/services/status.rs | 25 ++--- overwatch-rs/src/utils/const_checks.rs | 1 + overwatch-rs/src/utils/runtime.rs | 1 + overwatch-rs/tests/cancelable_service.rs | 10 +- overwatch-rs/tests/generics.rs | 46 +++------ overwatch-rs/tests/print_service.rs | 17 ++-- overwatch-rs/tests/sequence.rs | 26 ++--- overwatch-rs/tests/settings_update.rs | 17 ++-- overwatch-rs/tests/state_handling.rs | 11 ++- overwatch-rs/tests/try_load.rs | 9 +- 26 files changed, 333 insertions(+), 319 deletions(-) diff --git a/examples/ping_pong/src/main.rs b/examples/ping_pong/src/main.rs index a61a143..a5e018b 100644 --- a/examples/ping_pong/src/main.rs +++ b/examples/ping_pong/src/main.rs @@ -1,7 +1,7 @@ // Crate use overwatch_derive::Services; use overwatch_rs::overwatch::OverwatchRunner; -use overwatch_rs::services::handle::ServiceHandle; +use overwatch_rs::OpaqueServiceHandle; // Internal use crate::service_ping::PingService; use crate::service_pong::PongService; @@ -16,8 +16,8 @@ mod states; #[derive(Services)] struct PingPong { - ping: ServiceHandle, - pong: ServiceHandle, + ping: OpaqueServiceHandle, + pong: OpaqueServiceHandle, } const PING_STATE_SAVE_PATH: &str = const_format::formatcp!( diff --git a/examples/ping_pong/src/operators.rs b/examples/ping_pong/src/operators.rs index 7240969..9228377 100644 --- a/examples/ping_pong/src/operators.rs +++ b/examples/ping_pong/src/operators.rs @@ -1,9 +1,9 @@ // STD use std::fmt::Debug; // Crates -use overwatch_rs::services::state::{ServiceState, StateOperator}; +use overwatch_rs::services::state::StateOperator; // Internal -use crate::states::PingState; +use crate::{settings::PingSettings, states::PingState}; #[derive(Debug, Clone)] pub struct StateSaveOperator { @@ -13,17 +13,16 @@ pub struct StateSaveOperator { #[async_trait::async_trait] impl StateOperator for StateSaveOperator { type StateInput = PingState; + type Settings = PingSettings; type LoadError = std::io::Error; - fn try_load( - settings: &::Settings, - ) -> Result, Self::LoadError> { + fn try_load(settings: &Self::Settings) -> Result, Self::LoadError> { let state_string = std::fs::read_to_string(&settings.state_save_path)?; serde_json::from_str(&state_string) .map_err(|error| std::io::Error::new(std::io::ErrorKind::InvalidData, error)) } - fn from_settings(settings: ::Settings) -> Self { + fn from_settings(settings: Self::Settings) -> Self { Self { save_path: settings.state_save_path, } diff --git a/examples/ping_pong/src/service_ping.rs b/examples/ping_pong/src/service_ping.rs index d581b93..da8d88a 100644 --- a/examples/ping_pong/src/service_ping.rs +++ b/examples/ping_pong/src/service_ping.rs @@ -1,7 +1,6 @@ // Crates -use overwatch_rs::services::handle::ServiceStateHandle; use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId}; -use overwatch_rs::DynError; +use overwatch_rs::{DynError, OpaqueServiceStateHandle}; use std::time::Duration; use tokio::time::sleep; // Internal @@ -12,7 +11,7 @@ use crate::settings::PingSettings; use crate::states::PingState; pub struct PingService { - service_state_handle: ServiceStateHandle, + service_state_handle: OpaqueServiceStateHandle, initial_state: ::State, } @@ -27,7 +26,7 @@ impl ServiceData for PingService { #[async_trait::async_trait] impl ServiceCore for PingService { fn init( - service_state_handle: ServiceStateHandle, + service_state_handle: OpaqueServiceStateHandle, initial_state: Self::State, ) -> Result { Ok(Self { @@ -53,7 +52,7 @@ impl ServiceCore for PingService { loop { tokio::select! { - _ = sleep(Duration::from_secs(1)) => { + () = sleep(Duration::from_secs(1)) => { println!("Sending Ping"); pong_outbound_relay.send(PongMessage::Ping).await.unwrap(); } @@ -64,14 +63,14 @@ impl ServiceCore for PingService { service_state_handle.state_updater.update( Self::State { pong_count } ); - println!("Received Pong. Total: {}", pong_count); + println!("Received Pong. Total: {pong_count}"); } } } true = async { pong_count >= 30 } => { - println!("Received {} Pongs. Exiting...", pong_count); + println!("Received {pong_count} Pongs. Exiting..."); break; } } diff --git a/examples/ping_pong/src/service_pong.rs b/examples/ping_pong/src/service_pong.rs index 76cca89..1d437ca 100644 --- a/examples/ping_pong/src/service_pong.rs +++ b/examples/ping_pong/src/service_pong.rs @@ -1,27 +1,26 @@ // Crates use crate::messages::{PingMessage, PongMessage}; use crate::service_ping::PingService; -use overwatch_rs::services::handle::ServiceStateHandle; use overwatch_rs::services::state::{NoOperator, NoState}; use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId}; -use overwatch_rs::DynError; +use overwatch_rs::{DynError, OpaqueServiceStateHandle}; pub struct PongService { - service_state_handle: ServiceStateHandle, + service_state_handle: OpaqueServiceStateHandle, } impl ServiceData for PongService { const SERVICE_ID: ServiceId = "pong"; type Settings = (); type State = NoState; - type StateOperator = NoOperator; + type StateOperator = NoOperator; type Message = PongMessage; } #[async_trait::async_trait] impl ServiceCore for PongService { fn init( - service_state_handle: ServiceStateHandle, + service_state_handle: OpaqueServiceStateHandle, _initial_state: Self::State, ) -> Result { Ok(Self { diff --git a/overwatch-derive/src/lib.rs b/overwatch-derive/src/lib.rs index 1f65bec..7c43e31 100644 --- a/overwatch-derive/src/lib.rs +++ b/overwatch-derive/src/lib.rs @@ -177,8 +177,8 @@ fn generate_new_impl(fields: &Punctuated) -> proc_macro2::TokenStr quote! { #field_identifier: { let manager = - ::overwatch_rs::services::handle::ServiceHandle::<#service_type>::new( - #settings_field_identifier, overwatch_handle.clone(), + ::overwatch_rs::OpaqueServiceHandle::<#service_type>::new::<<#service_type as ::overwatch_rs::services::ServiceData>::StateOperator>( + #settings_field_identifier, overwatch_handle.clone(), <#service_type as ::overwatch_rs::services::ServiceData>::SERVICE_RELAY_BUFFER_SIZE )?; manager } @@ -203,8 +203,9 @@ fn generate_new_impl(fields: &Punctuated) -> proc_macro2::TokenStr fn generate_start_all_impl(fields: &Punctuated) -> proc_macro2::TokenStream { let call_start = fields.iter().map(|field| { let field_identifier = field.ident.as_ref().expect("A struct attribute identifier"); + let type_id = utils::extract_type_from(&field.ty); quote! { - self.#field_identifier.service_runner().run()? + self.#field_identifier.service_runner::<<#type_id as ::overwatch_rs::services::ServiceData>::StateOperator>().run::<#type_id>()? } }); @@ -223,7 +224,7 @@ fn generate_start_impl(fields: &Punctuated) -> proc_macro2::TokenS let type_id = utils::extract_type_from(&field.ty); quote! { <#type_id as ::overwatch_rs::services::ServiceData>::SERVICE_ID => { - self.#field_identifier.service_runner().run()?; + self.#field_identifier.service_runner::<<#type_id as ::overwatch_rs::services::ServiceData>::StateOperator>().run::<#type_id>()?; ::std::result::Result::Ok(()) } } diff --git a/overwatch-rs/src/lib.rs b/overwatch-rs/src/lib.rs index debaf9a..ac62615 100644 --- a/overwatch-rs/src/lib.rs +++ b/overwatch-rs/src/lib.rs @@ -1,5 +1,5 @@ -//! Overwatch is a framework to easily construct applications that requires of several independent -//! parts that needs communication between them. +//! Overwatch is a framework to easily construct applications that are composed of several independent +//! parts requiring communication between them. //! Everything is self contained and it matches somewhat the advantages of microservices. //! //! ## Design Goals @@ -13,22 +13,34 @@ //! - It is easier to isolate problems //! - Minimal sharing when unavoidable //! -//! - Debuggeability +//! - Debuggability //! - Easy to track workflow //! - Easy to test //! - Easy to measure -//! - Asynchronous Communication +//! - Asynchronous communication //! //! ## Main components //! -//! - Overwatch: the main messenger relay component (internal communications). It is also be responsible of managing other components lifecycle and handling configuration updates. +//! - Overwatch: the main messenger relay component (internal communications). It is also responsible for managing other components lifecycle and handling configuration updates. //! - Services (handled by the *overwatch*) +use crate::services::ServiceData; + pub mod overwatch; pub mod services; pub mod utils; pub type DynError = Box; +pub type OpaqueServiceHandle = crate::services::handle::ServiceHandle< + ::Message, + ::Settings, + ::State, +>; +pub type OpaqueServiceStateHandle = crate::services::handle::ServiceStateHandle< + ::Message, + ::Settings, + ::State, +>; #[cfg(feature = "derive")] pub use overwatch_derive::*; diff --git a/overwatch-rs/src/overwatch/commands.rs b/overwatch-rs/src/overwatch/commands.rs index 625a8e4..7a05614 100644 --- a/overwatch-rs/src/overwatch/commands.rs +++ b/overwatch-rs/src/overwatch/commands.rs @@ -11,16 +11,16 @@ use crate::services::status::StatusWatcher; use crate::services::ServiceId; #[derive(Debug)] -pub(crate) struct ReplyChannel(pub(crate) oneshot::Sender); +pub(crate) struct ReplyChannel(pub(crate) oneshot::Sender); -impl From> for ReplyChannel { - fn from(sender: oneshot::Sender) -> Self { +impl From> for ReplyChannel { + fn from(sender: oneshot::Sender) -> Self { Self(sender) } } -impl ReplyChannel { - pub async fn reply(self, message: M) -> Result<(), M> { +impl ReplyChannel { + pub fn reply(self, message: Message) -> Result<(), Message> { self.0.send(message) } } @@ -40,7 +40,6 @@ pub struct StatusCommand { } /// Command for managing [`ServiceCore`](crate::services::ServiceCore) lifecycle -#[allow(unused)] #[derive(Debug)] pub struct ServiceLifeCycleCommand { pub service_id: ServiceId, diff --git a/overwatch-rs/src/overwatch/handle.rs b/overwatch-rs/src/overwatch/handle.rs index 6853168..60975ca 100644 --- a/overwatch-rs/src/overwatch/handle.rs +++ b/overwatch-rs/src/overwatch/handle.rs @@ -1,4 +1,5 @@ // std +use std::fmt::Debug; // crates use crate::overwatch::commands::{ @@ -20,12 +21,12 @@ use crate::services::status::StatusWatcher; /// It handles communications to the main Overwatch runner. #[derive(Clone, Debug)] pub struct OverwatchHandle { - #[allow(unused)] runtime_handle: Handle, sender: Sender, } impl OverwatchHandle { + #[must_use] pub fn new(runtime_handle: Handle, sender: Sender) -> Self { Self { runtime_handle, @@ -33,27 +34,32 @@ impl OverwatchHandle { } } + #[must_use] /// Request for a relay - pub fn relay(&self) -> Relay { + pub fn relay(&self) -> Relay + where + Service: ServiceData, + Service::Message: 'static, + { Relay::new(self.clone()) } // Request a status watcher for a service - pub async fn status_watcher(&self) -> StatusWatcher { - info!("Requesting status watcher for {}", S::SERVICE_ID); + pub async fn status_watcher(&self) -> StatusWatcher { + info!("Requesting status watcher for {}", Service::SERVICE_ID); let (sender, receiver) = tokio::sync::oneshot::channel(); let watcher_request = self .sender .send(OverwatchCommand::Status(StatusCommand { - service_id: S::SERVICE_ID, + service_id: Service::SERVICE_ID, reply_channel: ReplyChannel::from(sender), })) .await; match watcher_request { - Ok(_) => receiver.await.unwrap_or_else(|_| { + Ok(()) => receiver.await.unwrap_or_else(|_| { panic!( "Service {} watcher should always be available", - S::SERVICE_ID + Service::SERVICE_ID ) }), Err(_) => { @@ -103,7 +109,7 @@ impl OverwatchHandle { #[cfg_attr(feature = "instrumentation", instrument(skip(self)))] pub async fn update_settings(&self, settings: S::Settings) where - S::Settings: Send, + S::Settings: Send + Debug + 'static, { if let Err(e) = self .sender @@ -112,10 +118,11 @@ impl OverwatchHandle { )))) .await { - error!(error=?e, "Error updating settings") + error!(error=?e, "Error updating settings"); } } + #[must_use] pub fn runtime(&self) -> &Handle { &self.runtime_handle } diff --git a/overwatch-rs/src/overwatch/life_cycle.rs b/overwatch-rs/src/overwatch/life_cycle.rs index 00d0240..da7758c 100644 --- a/overwatch-rs/src/overwatch/life_cycle.rs +++ b/overwatch-rs/src/overwatch/life_cycle.rs @@ -17,9 +17,10 @@ pub struct ServicesLifeCycleHandle { } impl ServicesLifeCycleHandle { + #[must_use] pub fn empty() -> Self { Self { - handlers: Default::default(), + handlers: HashMap::default(), } } diff --git a/overwatch-rs/src/overwatch/mod.rs b/overwatch-rs/src/overwatch/mod.rs index 946f313..90366e5 100644 --- a/overwatch-rs/src/overwatch/mod.rs +++ b/overwatch-rs/src/overwatch/mod.rs @@ -9,7 +9,6 @@ use std::future::Future; // crates -use async_trait::async_trait; use thiserror::Error; use tokio::runtime::{Handle, Runtime}; use tokio::sync::mpsc::Receiver; @@ -65,11 +64,10 @@ pub type AnySettings = Box; /// An overwatch run anything that implements this trait /// An implementor of this trait would have to handle the inner [`ServiceCore`](crate::services::ServiceCore) -#[async_trait] pub trait Services: Sized { /// Inner [`ServiceCore::Settings`](crate::services::ServiceCore) grouping type. /// Normally this will be a settings object that group all the inner services settings. - type Settings: Debug + 'static; // 'static is required for cast to `AnySetting` + type Settings; /// Spawn a new instance of the Services object /// It returns a `(ServiceId, Runtime)` where Runtime is the `tokio::runtime::Runtime` attached for each @@ -103,27 +101,28 @@ pub trait Services: Sized { /// it is usually one-shot. It contains what it is needed just to be run as a main loop /// and a system to be able to stop it running. Meaning that it i responsible of the Overwatch /// application lifecycle. -pub struct OverwatchRunner { - services: S, - #[allow(unused)] +pub struct OverwatchRunner { + services: Services, + #[expect(unused)] handle: OverwatchHandle, finish_signal_sender: oneshot::Sender<()>, + commands_receiver: Receiver, } /// Overwatch thread identifier /// it is used when creating the `tokio::runtime::Runtime` that Overwatch uses internally pub const OVERWATCH_THREAD_NAME: &str = "Overwatch"; -impl OverwatchRunner +impl OverwatchRunner where - S: Services + Send + 'static, + ServicesImpl: Services + Send + 'static, { /// Start the Overwatch runner process /// It creates the `tokio::runtime::Runtime`, initialize the [`Services`] and start listening for /// Overwatch related tasks. /// Returns the [`Overwatch`] instance that handles this runner. pub fn run( - settings: S::Settings, + settings: ServicesImpl::Settings, runtime: Option, ) -> std::result::Result { let runtime = runtime.unwrap_or_else(default_multithread_runtime); @@ -131,14 +130,15 @@ where let (finish_signal_sender, finish_runner_signal) = tokio::sync::oneshot::channel(); let (commands_sender, commands_receiver) = tokio::sync::mpsc::channel(16); let handle = OverwatchHandle::new(runtime.handle().clone(), commands_sender); - let services = S::new(settings, handle.clone())?; + let services = ServicesImpl::new(settings, handle.clone())?; let runner = OverwatchRunner { services, handle: handle.clone(), finish_signal_sender, + commands_receiver, }; - runtime.spawn(async move { runner.run_(commands_receiver).await }); + runtime.spawn(async move { runner.run_().await }); Ok(Overwatch { runtime, @@ -151,21 +151,22 @@ where feature = "instrumentation", instrument(name = "overwatch-run", skip_all) )] - async fn run_(self, mut receiver: Receiver) { + async fn run_(self) { let Self { mut services, - handle: _, finish_signal_sender, + mut commands_receiver, + .. } = self; let lifecycle_handlers = services.start_all().expect("Services to start running"); - while let Some(command) = receiver.recv().await { + while let Some(command) = commands_receiver.recv().await { info!(command = ?command, "Overwatch command received"); match command { OverwatchCommand::Relay(relay_command) => { - Self::handle_relay(&mut services, relay_command).await; + Self::handle_relay(&mut services, relay_command); } OverwatchCommand::Status(status_command) => { - Self::handle_status(&mut services, status_command).await; + Self::handle_status(&mut services, status_command); } OverwatchCommand::ServiceLifeCycle(msg) => match msg { ServiceLifeCycleCommand { @@ -197,7 +198,7 @@ where } } OverwatchCommand::Settings(settings) => { - Self::handle_settings_update(&mut services, settings).await; + Self::handle_settings_update(&mut services, settings); } } } @@ -207,23 +208,20 @@ where .expect("Overwatch run finish signal to be sent properly"); } - async fn handle_relay(services: &mut S, command: RelayCommand) { + fn handle_relay(services: &mut ServicesImpl, command: RelayCommand) { let RelayCommand { service_id, reply_channel, } = command; // send requested rely channel result to requesting service - if let Err(Err(e)) = reply_channel - .reply(services.request_relay(service_id)) - .await - { - info!(error=?e, "Error requesting relay for service {}", service_id) + if let Err(Err(e)) = reply_channel.reply(services.request_relay(service_id)) { + info!(error=?e, "Error requesting relay for service {service_id}"); } } - async fn handle_settings_update(services: &mut S, command: SettingsCommand) { + fn handle_settings_update(services: &mut ServicesImpl, command: SettingsCommand) { let SettingsCommand(settings) = command; - if let Ok(settings) = settings.downcast::() { + if let Ok(settings) = settings.downcast::() { if let Err(e) = services.update_settings(*settings) { // TODO: add proper logging error!("{e}"); @@ -232,8 +230,9 @@ where unreachable!("Statically should always be of the correct type"); } } - async fn handle_status( - services: &mut S, + + fn handle_status( + services: &mut ServicesImpl, StatusCommand { service_id, reply_channel, @@ -242,8 +241,8 @@ where let watcher_result = services.request_status_watcher(service_id); match watcher_result { Ok(watcher) => { - if reply_channel.reply(watcher).await.is_err() { - error!("Error reporting back status watcher for service: {service_id}") + if reply_channel.reply(watcher).is_err() { + error!("Error reporting back status watcher for service: {service_id}"); } } Err(e) => { diff --git a/overwatch-rs/src/services/handle.rs b/overwatch-rs/src/services/handle.rs index 0574fb3..ae91d91 100644 --- a/overwatch-rs/src/services/handle.rs +++ b/overwatch-rs/src/services/handle.rs @@ -8,58 +8,67 @@ use crate::services::relay::{relay, InboundRelay, OutboundRelay}; use crate::services::settings::{SettingsNotifier, SettingsUpdater}; use crate::services::state::{StateHandle, StateOperator, StateUpdater}; use crate::services::status::{StatusHandle, StatusWatcher}; -use crate::services::{ServiceCore, ServiceData, ServiceId, ServiceState}; +use crate::services::{ServiceCore, ServiceId, ServiceState}; // TODO: Abstract handle over state, to differentiate when the service is running and when it is not // that way we can expose a better API depending on what is happenning. Would get rid of the probably // unnecessary Option and cloning. /// Service handle /// This is used to access different parts of the service -pub struct ServiceHandle { +pub struct ServiceHandle { /// Message channel relay /// Would be None if service is not running /// Will contain the channel if service is running - outbound_relay: Option>, + outbound_relay: Option>, /// Handle to overwatch overwatch_handle: OverwatchHandle, - settings: SettingsUpdater, - status: StatusHandle, - initial_state: S::State, + settings: SettingsUpdater, + status: StatusHandle, + initial_state: State, + relay_buffer_size: usize, } /// Service core resources /// It contains whatever is necessary to start a new service runner -pub struct ServiceStateHandle { +pub struct ServiceStateHandle { /// Relay channel to communicate with the service runner - pub inbound_relay: InboundRelay, - pub status_handle: StatusHandle, + pub inbound_relay: InboundRelay, + pub status_handle: StatusHandle, /// Overwatch handle pub overwatch_handle: OverwatchHandle, - pub settings_reader: SettingsNotifier, - pub state_updater: StateUpdater, + pub settings_reader: SettingsNotifier, + pub state_updater: StateUpdater, pub lifecycle_handle: LifecycleHandle, } /// Main service executor /// It is the object that hold the necessary information for the service to run -pub struct ServiceRunner { - service_state: ServiceStateHandle, - state_handle: StateHandle, +pub struct ServiceRunner { + service_state: ServiceStateHandle, + state_handle: StateHandle, lifecycle_handle: LifecycleHandle, - initial_state: S::State, + initial_state: State, } -impl ServiceHandle { - pub fn new( - settings: S::Settings, +impl ServiceHandle +where + Settings: Clone, + State: ServiceState + Clone, +{ + pub fn new( + settings: Settings, overwatch_handle: OverwatchHandle, - ) -> Result::Error> { - let initial_state = if let Ok(Some(loaded_state)) = S::StateOperator::try_load(&settings) { + relay_buffer_size: usize, + ) -> Result + where + StateOp: StateOperator, + { + let initial_state = if let Ok(Some(loaded_state)) = StateOp::try_load(&settings) { info!("Loaded state from Operator"); loaded_state } else { info!("Couldn't load state from Operator. Creating from settings."); - S::State::from_settings(&settings)? + State::from_settings(&settings)? }; Ok(Self { @@ -68,13 +77,10 @@ impl ServiceHandle { settings: SettingsUpdater::new(settings), status: StatusHandle::new(), initial_state, + relay_buffer_size, }) } - pub fn id(&self) -> ServiceId { - S::SERVICE_ID - } - /// Service runtime getter /// it is easily cloneable and can be done on demand pub fn runtime(&self) -> &Handle { @@ -88,7 +94,7 @@ impl ServiceHandle { } /// Request a relay with this service - pub fn relay_with(&self) -> Option> { + pub fn relay_with(&self) -> Option> { self.outbound_relay.clone() } @@ -97,21 +103,24 @@ impl ServiceHandle { } /// Update settings - pub fn update_settings(&self, settings: S::Settings) { - self.settings.update(settings) + pub fn update_settings(&self, settings: Settings) { + self.settings.update(settings); } /// Build a runner for this service - pub fn service_runner(&mut self) -> ServiceRunner { + pub fn service_runner(&mut self) -> ServiceRunner + where + StateOp: StateOperator, + { // TODO: add proper status handling here, a service should be able to produce a runner if it is already running. - let (inbound_relay, outbound_relay) = relay::(S::SERVICE_RELAY_BUFFER_SIZE); + let (inbound_relay, outbound_relay) = relay::(self.relay_buffer_size); let settings_reader = self.settings.notifier(); // add relay channel to handle self.outbound_relay = Some(outbound_relay); let settings = self.settings.notifier().get_updated_settings(); - let operator = S::StateOperator::from_settings(settings); + let operator = StateOp::from_settings(settings); let (state_handle, state_updater) = - StateHandle::::new(self.initial_state.clone(), operator); + StateHandle::::new(self.initial_state.clone(), operator); let lifecycle_handle = LifecycleHandle::new(); @@ -133,34 +142,31 @@ impl ServiceHandle { } } -impl ServiceStateHandle { - pub fn id(&self) -> ServiceId { - S::SERVICE_ID - } -} - -impl ServiceRunner +impl ServiceRunner where - S::State: Send + Sync + 'static, - S::StateOperator: Send + 'static, - S: ServiceCore + 'static, + State: Clone + Send + Sync + 'static, + StateOp: StateOperator + Send + 'static, { /// Spawn the service main loop and handle it lifecycle /// Return a handle to abort execution manually - pub fn run(self) -> Result<(ServiceId, LifecycleHandle), crate::DynError> { + pub fn run(self) -> Result<(ServiceId, LifecycleHandle), crate::DynError> + where + Service: ServiceCore + 'static, + { let ServiceRunner { service_state, state_handle, lifecycle_handle, initial_state, + .. } = self; let runtime = service_state.overwatch_handle.runtime().clone(); - let service = S::init(service_state, initial_state)?; + let service = Service::init(service_state, initial_state)?; runtime.spawn(service.run()); runtime.spawn(state_handle.run()); - Ok((S::SERVICE_ID, lifecycle_handle)) + Ok((Service::SERVICE_ID, lifecycle_handle)) } } diff --git a/overwatch-rs/src/services/life_cycle.rs b/overwatch-rs/src/services/life_cycle.rs index 12a306d..9e3b53b 100644 --- a/overwatch-rs/src/services/life_cycle.rs +++ b/overwatch-rs/src/services/life_cycle.rs @@ -39,13 +39,14 @@ impl Clone for LifecycleHandle { } impl LifecycleHandle { + #[must_use] pub fn new() -> Self { // Use a single lifecycle message at a time. Idea is that all computations on lifecycle should // stack so waiting es effective even if later on is somehow reversed (for example for start/stop events). let (notifier, message_channel) = channel(1); Self { - notifier, message_channel, + notifier, } } diff --git a/overwatch-rs/src/services/mod.rs b/overwatch-rs/src/services/mod.rs index 7f67d99..c23acd3 100644 --- a/overwatch-rs/src/services/mod.rs +++ b/overwatch-rs/src/services/mod.rs @@ -14,9 +14,7 @@ use tokio::runtime; // internal use crate::services::relay::RelayError; -use crate::services::state::StateOperator; use handle::ServiceStateHandle; -use relay::RelayMessage; use state::ServiceState; // TODO: Make this type unique for each service? @@ -31,13 +29,13 @@ pub trait ServiceData { /// Service relay buffer size const SERVICE_RELAY_BUFFER_SIZE: usize = 16; /// Service settings object - type Settings: Clone; + type Settings; /// Service state object - type State: ServiceState + Clone; + type State; /// State operator - type StateOperator: StateOperator + Clone; + type StateOperator; /// Service messages that the service itself understands and can react to - type Message: RelayMessage + Debug; + type Message; } /// Main trait for Services initialization and main loop hook @@ -45,7 +43,7 @@ pub trait ServiceData { pub trait ServiceCore: Sized + ServiceData { /// Initialize the service with the given state fn init( - service_state: ServiceStateHandle, + service_state: ServiceStateHandle, initial_state: Self::State, ) -> Result; @@ -75,7 +73,7 @@ impl ServiceRuntime { pub fn runtime(self) -> Option { match self { ServiceRuntime::Custom(runtime) => Some(runtime), - _ => None, + ServiceRuntime::FromParent(_) => None, } } } diff --git a/overwatch-rs/src/services/relay.rs b/overwatch-rs/src/services/relay.rs index 7a678c1..ce73ee2 100644 --- a/overwatch-rs/src/services/relay.rs +++ b/overwatch-rs/src/services/relay.rs @@ -56,24 +56,24 @@ pub trait RelayMessage: 'static {} /// Channel receiver of a relay connection #[derive(Debug)] -pub struct InboundRelay { - receiver: Receiver, +pub struct InboundRelay { + receiver: Receiver, _stats: (), // placeholder } /// Channel sender of a relay connection -pub struct OutboundRelay { - sender: Sender, +pub struct OutboundRelay { + sender: Sender, _stats: (), // placeholder } #[derive(Debug)] -pub struct Relay { +pub struct Relay { overwatch_handle: OverwatchHandle, - _bound: PhantomBound, + _bound: PhantomBound, } -impl Clone for Relay { +impl Clone for Relay { fn clone(&self) -> Self { Self { overwatch_handle: self.overwatch_handle.clone(), @@ -94,7 +94,7 @@ struct PhantomBound { unsafe impl Send for PhantomBound {} unsafe impl Sync for PhantomBound {} -impl Clone for OutboundRelay { +impl Clone for OutboundRelay { fn clone(&self) -> Self { Self { sender: self.sender.clone(), @@ -105,7 +105,8 @@ impl Clone for OutboundRelay { // TODO: make buffer_size const? /// Relay channel builder -pub fn relay(buffer_size: usize) -> (InboundRelay, OutboundRelay) { +#[must_use] +pub fn relay(buffer_size: usize) -> (InboundRelay, OutboundRelay) { let (sender, receiver) = channel(buffer_size); ( InboundRelay { @@ -116,16 +117,16 @@ pub fn relay(buffer_size: usize) -> (InboundRelay, OutboundRelay) { ) } -impl InboundRelay { +impl InboundRelay { /// Receive a message from the relay connections - pub async fn recv(&mut self) -> Option { + pub async fn recv(&mut self) -> Option { self.receiver.recv().await } } -impl OutboundRelay { +impl OutboundRelay { /// Send a message to the relay connection - pub async fn send(&self, message: M) -> Result<(), (RelayError, M)> { + pub async fn send(&self, message: Message) -> Result<(), (RelayError, Message)> { self.sender .send(message) .await @@ -141,22 +142,28 @@ impl OutboundRelay { /// /// This function panics if called within an asynchronous execution /// context. - /// - /// # Exa - pub fn blocking_send(&self, message: M) -> Result<(), (RelayError, M)> { + pub fn blocking_send(&self, message: Message) -> Result<(), (RelayError, Message)> { self.sender .blocking_send(message) .map_err(|e| (RelayError::Send, e.0)) } } -impl OutboundRelay { - pub fn into_sink(self) -> impl Sink { +impl OutboundRelay +where + Message: Send, +{ + pub fn into_sink(self) -> impl Sink { PollSender::new(self.sender) } } -impl Relay { +impl Relay +where + Service: ServiceData, + Service::Message: 'static, +{ + #[must_use] pub fn new(overwatch_handle: OverwatchHandle) -> Self { Self { overwatch_handle, @@ -167,7 +174,7 @@ impl Relay { } #[cfg_attr(feature = "instrumentation", instrument(skip(self), err(Debug)))] - pub async fn connect(self) -> Result, RelayError> { + pub async fn connect(self) -> Result, RelayError> { let (reply, receiver) = oneshot::channel(); self.request_relay(reply).await; self.handle_relay_response(receiver).await @@ -175,7 +182,7 @@ impl Relay { async fn request_relay(&self, reply: oneshot::Sender) { let relay_command = OverwatchCommand::Relay(RelayCommand { - service_id: S::SERVICE_ID, + service_id: Service::SERVICE_ID, reply_channel: ReplyChannel(reply), }); self.overwatch_handle.send(relay_command).await; @@ -185,14 +192,14 @@ impl Relay { async fn handle_relay_response( &self, receiver: oneshot::Receiver, - ) -> Result, RelayError> { + ) -> Result, RelayError> { let response = receiver.await; match response { - Ok(Ok(message)) => match message.downcast::>() { + Ok(Ok(message)) => match message.downcast::>() { Ok(channel) => Ok(*channel), Err(m) => Err(RelayError::InvalidMessage { type_id: format!("{:?}", (*m).type_id()), - service_id: S::SERVICE_ID, + service_id: Service::SERVICE_ID, }), }, Ok(Err(e)) => Err(e), @@ -201,8 +208,8 @@ impl Relay { } } -impl Stream for InboundRelay { - type Item = M; +impl Stream for InboundRelay { + type Item = Message; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.receiver.poll_recv(cx) diff --git a/overwatch-rs/src/services/settings.rs b/overwatch-rs/src/services/settings.rs index 94cdcd0..dd78cd7 100644 --- a/overwatch-rs/src/services/settings.rs +++ b/overwatch-rs/src/services/settings.rs @@ -7,12 +7,16 @@ use tracing::instrument; //internal /// Wrapper around [`tokio::sync::watch::Receiver`] -pub struct SettingsNotifier { - notifier_channel: Receiver, +pub struct SettingsNotifier { + notifier_channel: Receiver, } -impl SettingsNotifier { - pub fn new(notifier_channel: Receiver) -> Self { +impl SettingsNotifier +where + Settings: Clone, +{ + #[must_use] + pub fn new(notifier_channel: Receiver) -> Self { Self { notifier_channel } } @@ -24,19 +28,20 @@ impl SettingsNotifier { // of the method. Another option would be to spawn a task that updates a settings local value // each time an updated settings is received. This could not be so easy to do, since it will // need to hold a &mut to the holder (or needed to use a Cell/RefCell). - pub fn get_updated_settings(&self) -> S { + #[must_use] + pub fn get_updated_settings(&self) -> Settings { self.notifier_channel.borrow().clone() } } /// Settings update notification sender -pub struct SettingsUpdater { - sender: Sender, - receiver: Receiver, +pub struct SettingsUpdater { + sender: Sender, + receiver: Receiver, } -impl SettingsUpdater { - pub fn new(settings: S) -> Self { +impl SettingsUpdater { + pub fn new(settings: Settings) -> Self { let (sender, receiver) = channel(settings); Self { sender, receiver } @@ -44,14 +49,15 @@ impl SettingsUpdater { /// Send a new settings update notification to the watcher end #[cfg_attr(feature = "instrumentation", instrument(skip_all))] - pub fn update(&self, settings: S) { + pub fn update(&self, settings: Settings) { self.sender.send(settings).unwrap_or_else(|_e| { error!("Error sending settings update for service"); }); } /// Get a new notifier channel, used to get latest settings changes updates - pub fn notifier(&self) -> SettingsNotifier { + #[must_use] + pub fn notifier(&self) -> SettingsNotifier { SettingsNotifier { notifier_channel: self.receiver.clone(), } diff --git a/overwatch-rs/src/services/state.rs b/overwatch-rs/src/services/state.rs index 1148720..c993bb0 100644 --- a/overwatch-rs/src/services/state.rs +++ b/overwatch-rs/src/services/state.rs @@ -1,5 +1,4 @@ use std::convert::Infallible; -use std::error::Error; // std use std::marker::PhantomData; use std::pin::Pin; @@ -23,7 +22,7 @@ pub trait ServiceState: Sized { /// Errors that can occur during state initialization type Error; /// Initialize a state using the provided settings. - /// This is called when [StateOperator::try_load] doesn't return a state. + /// This is called when [`StateOperator::try_load`] doesn't return a state. fn from_settings(settings: &Self::Settings) -> Result; } @@ -33,50 +32,49 @@ pub trait ServiceState: Sized { #[async_trait] pub trait StateOperator { /// The type of state that the operator can handle - type StateInput: ServiceState; + type StateInput; + /// The settings to configure the operator + type Settings; /// Errors that can occur during state loading - type LoadError: Error; + type LoadError; /// State initialization method /// In contrast to [ServiceState::from_settings], this is used to try to initialize /// a (saved) [ServiceState] from an external source (e.g. file, database, etc.) - fn try_load( - settings: &::Settings, - ) -> Result, Self::LoadError>; + fn try_load(settings: &Self::Settings) -> Result, Self::LoadError>; /// Operator initialization method. Can be implemented over some subset of settings - fn from_settings(settings: ::Settings) -> Self; + fn from_settings(settings: Self::Settings) -> Self; /// Asynchronously perform an operation for a given state async fn run(&mut self, state: Self::StateInput); } /// Operator that doesn't perform any operation upon state update #[derive(Copy)] -pub struct NoOperator(PhantomData<*const StateInput>); +pub struct NoOperator(PhantomData<(*const StateInput, *const Settings)>); // NoOperator does not actually hold anything and is thus Sync. // Note that we don't use PhantomData as that would // suggest we indeed hold an instance of StateInput, see // https://doc.rust-lang.org/std/marker/struct.PhantomData.html#ownership-and-the-drop-check -unsafe impl Send for NoOperator {} +unsafe impl Send for NoOperator {} // auto derive introduces unnecessary Clone bound on T -impl Clone for NoOperator { +impl Clone for NoOperator { fn clone(&self) -> Self { Self(PhantomData) } } #[async_trait] -impl StateOperator for NoOperator { +impl StateOperator for NoOperator { type StateInput = StateInput; + type Settings = Settings; type LoadError = Infallible; - fn try_load( - _settings: &::Settings, - ) -> Result, Self::LoadError> { + fn try_load(_settings: &Self::Settings) -> Result, Self::LoadError> { Ok(None) } - fn from_settings(_settings: ::Settings) -> Self { + fn from_settings(_settings: Self::Settings) -> Self { NoOperator(PhantomData) } @@ -97,7 +95,7 @@ impl StateOperator for NoOperator { pub struct NoState(PhantomData); // auto derive introduces unnecessary Clone bound on T -impl Clone for NoState { +impl Clone for NoState { fn clone(&self) -> Self { Self(PhantomData) } @@ -105,26 +103,25 @@ impl Clone for NoState { impl ServiceState for NoState { type Settings = Settings; - type Error = crate::DynError; fn from_settings(_settings: &Self::Settings) -> Result { - Ok(Self(Default::default())) + Ok(Self(PhantomData)) } } /// Receiver part of the state handling mechanism. /// A state handle watches a stream of incoming states and triggers the attached operator handling /// method over it. -pub struct StateHandle { - watcher: StateWatcher, +pub struct StateHandle { + watcher: StateWatcher, operator: Operator, } // auto derive introduces unnecessary Clone bound on T -impl Clone for StateHandle +impl Clone for StateHandle where - O: Clone, + Operator: Clone, { fn clone(&self) -> Self { Self { @@ -134,14 +131,26 @@ where } } +impl StateHandle { + pub fn new(initial_state: State, operator: Operator) -> (Self, StateUpdater) { + let (sender, receiver) = channel(initial_state); + let watcher = StateWatcher { receiver }; + let updater = StateUpdater { + sender: Arc::new(sender), + }; + + (Self { watcher, operator }, updater) + } +} + /// Sender part of the state handling mechanism. /// Update the current state and notifies the [`StateHandle`]. -pub struct StateUpdater { - sender: Arc>, +pub struct StateUpdater { + sender: Arc>, } // auto derive introduces unnecessary Clone bound on T -impl Clone for StateUpdater { +impl Clone for StateUpdater { fn clone(&self) -> Self { Self { sender: self.sender.clone(), @@ -149,13 +158,22 @@ impl Clone for StateUpdater { } } +impl StateUpdater { + /// Send a new state and notify the [`StateWatcher`] + pub fn update(&self, new_state: State) { + self.sender.send(new_state).unwrap_or_else(|_e| { + error!("Error updating state"); + }); + } +} + /// Wrapper over [`tokio::sync::watch::Receiver`] -pub struct StateWatcher { - receiver: Receiver, +pub struct StateWatcher { + receiver: Receiver, } // auto derive introduces unnecessary Clone bound on T -impl Clone for StateWatcher { +impl Clone for StateWatcher { fn clone(&self) -> Self { Self { receiver: self.receiver.clone(), @@ -163,52 +181,30 @@ impl Clone for StateWatcher { } } -impl StateUpdater { - /// Send a new state and notify the [`StateWatcher`] - pub fn update(&self, new_state: S) { - self.sender.send(new_state).unwrap_or_else(|_e| { - error!("Error updating state"); - }); - } -} - -impl StateWatcher +impl StateWatcher where - S: ServiceState + Clone, + State: Clone, { /// Get a copy of the most updated state - pub fn state_cloned(&self) -> S { + #[must_use] + pub fn state_cloned(&self) -> State { self.receiver.borrow().clone() } } -impl StateWatcher -where - S: ServiceState, -{ +impl StateWatcher { /// Get a [`Ref`](tokio::sync::watch::Ref) to the last state, this blocks incoming updates until /// the `Ref` is dropped. Use with caution. - pub fn state_ref(&self) -> Ref { + #[must_use] + pub fn state_ref(&self) -> Ref { self.receiver.borrow() } } -impl StateHandle { - pub fn new(initial_state: S, operator: O) -> (Self, StateUpdater) { - let (sender, receiver) = channel(initial_state); - let watcher = StateWatcher { receiver }; - let updater = StateUpdater { - sender: Arc::new(sender), - }; - - (Self { watcher, operator }, updater) - } -} - -impl StateHandle +impl StateHandle where - S: ServiceState + Clone + Send + Sync + 'static, - Operator: StateOperator, + State: Clone + Send + Sync + 'static, + Operator: StateOperator, { /// Wait for new state updates and run the operator handling method pub async fn run(self) { @@ -249,6 +245,7 @@ mod test { #[async_trait] impl StateOperator for PanicOnGreaterThanTen { type StateInput = UsizeCounter; + type Settings = (); type LoadError = Infallible; fn try_load( @@ -273,7 +270,7 @@ mod test { } #[tokio::test] - #[should_panic] + #[should_panic(expected = "assertion failed: value < 10")] async fn state_stream_collects() { let (handle, updater): ( StateHandle, diff --git a/overwatch-rs/src/services/status.rs b/overwatch-rs/src/services/status.rs index f0d1175..d5def84 100644 --- a/overwatch-rs/src/services/status.rs +++ b/overwatch-rs/src/services/status.rs @@ -1,10 +1,9 @@ // std use std::default::Default; -use std::marker::PhantomData; use std::sync::Arc; use std::time::Duration; // crates -use crate::services::{ServiceData, ServiceId}; +use crate::services::ServiceId; use thiserror::Error; use tokio::sync::watch; // internal @@ -30,7 +29,7 @@ impl StatusUpdater { pub fn update(&self, status: ServiceStatus) { self.0 .send(status) - .expect("Overwatch always maintain an open watcher, send should always succeed") + .expect("Overwatch always maintain an open watcher, send should always succeed"); } } @@ -55,43 +54,39 @@ impl StatusWatcher { } } -pub struct StatusHandle { +pub struct StatusHandle { updater: Arc, watcher: StatusWatcher, - _phantom: PhantomData, } -impl Clone for StatusHandle { +impl Clone for StatusHandle { fn clone(&self) -> Self { Self { updater: Arc::clone(&self.updater), watcher: self.watcher.clone(), - _phantom: Default::default(), } } } -impl StatusHandle { +impl StatusHandle { + #[must_use] pub fn new() -> Self { let (updater, watcher) = watch::channel(ServiceStatus::Uninitialized); let updater = Arc::new(StatusUpdater(updater)); let watcher = StatusWatcher(watcher); - Self { - updater, - watcher, - _phantom: Default::default(), - } + Self { updater, watcher } } + #[must_use] pub fn updater(&self) -> &StatusUpdater { &self.updater } - + #[must_use] pub fn watcher(&self) -> StatusWatcher { self.watcher.clone() } } -impl Default for StatusHandle { +impl Default for StatusHandle { fn default() -> Self { Self::new() } diff --git a/overwatch-rs/src/utils/const_checks.rs b/overwatch-rs/src/utils/const_checks.rs index 2871f07..7dc2f90 100644 --- a/overwatch-rs/src/utils/const_checks.rs +++ b/overwatch-rs/src/utils/const_checks.rs @@ -1,5 +1,6 @@ use crate::services::ServiceId; +#[must_use] pub const fn unique_ids(to_check: &[ServiceId]) -> bool { if to_check.is_empty() { return true; diff --git a/overwatch-rs/src/utils/runtime.rs b/overwatch-rs/src/utils/runtime.rs index 932460a..b0e526f 100644 --- a/overwatch-rs/src/utils/runtime.rs +++ b/overwatch-rs/src/utils/runtime.rs @@ -1,5 +1,6 @@ use crate::overwatch::OVERWATCH_THREAD_NAME; +#[must_use] pub fn default_multithread_runtime() -> tokio::runtime::Runtime { tokio::runtime::Builder::new_multi_thread() .enable_all() diff --git a/overwatch-rs/tests/cancelable_service.rs b/overwatch-rs/tests/cancelable_service.rs index bc95ad1..f59e5e9 100644 --- a/overwatch-rs/tests/cancelable_service.rs +++ b/overwatch-rs/tests/cancelable_service.rs @@ -1,32 +1,32 @@ use overwatch_derive::Services; use overwatch_rs::overwatch::commands::{OverwatchCommand, ServiceLifeCycleCommand}; use overwatch_rs::overwatch::OverwatchRunner; -use overwatch_rs::services::handle::{ServiceHandle, ServiceStateHandle}; use overwatch_rs::services::life_cycle::LifecycleMessage; use overwatch_rs::services::relay::NoMessage; use overwatch_rs::services::state::{NoOperator, NoState}; use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId}; use overwatch_rs::DynError; +use overwatch_rs::{OpaqueServiceHandle, OpaqueServiceStateHandle}; use std::time::Duration; use tokio::time::sleep; use tokio_stream::StreamExt; pub struct CancellableService { - service_state: ServiceStateHandle, + service_state: OpaqueServiceStateHandle, } impl ServiceData for CancellableService { const SERVICE_ID: ServiceId = "cancel-me-please"; type Settings = (); type State = NoState; - type StateOperator = NoOperator; + type StateOperator = NoOperator; type Message = NoMessage; } #[async_trait::async_trait] impl ServiceCore for CancellableService { fn init( - service_state: ServiceStateHandle, + service_state: OpaqueServiceStateHandle, _initial_state: Self::State, ) -> Result { Ok(Self { service_state }) @@ -62,7 +62,7 @@ impl ServiceCore for CancellableService { #[derive(Services)] struct CancelableServices { - cancelable: ServiceHandle, + cancelable: OpaqueServiceHandle, } #[test] diff --git a/overwatch-rs/tests/generics.rs b/overwatch-rs/tests/generics.rs index 4363395..023eec2 100644 --- a/overwatch-rs/tests/generics.rs +++ b/overwatch-rs/tests/generics.rs @@ -2,51 +2,36 @@ use async_trait::async_trait; use futures::future::select; use overwatch_derive::Services; use overwatch_rs::overwatch::OverwatchRunner; -use overwatch_rs::services::handle::{ServiceHandle, ServiceStateHandle}; -use overwatch_rs::services::relay::RelayMessage; +use overwatch_rs::services::handle::ServiceStateHandle; use overwatch_rs::services::state::{NoOperator, NoState}; use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId}; +use overwatch_rs::OpaqueServiceHandle; use std::fmt::Debug; use std::time::Duration; use tokio::time::sleep; -pub struct GenericService -where - T: Debug + 'static + Sync, -{ - state: ServiceStateHandle, - _phantom: std::marker::PhantomData, +pub struct GenericService { + state: ServiceStateHandle>, } #[derive(Clone, Debug)] pub struct GenericServiceMessage(String); -impl RelayMessage for GenericServiceMessage {} - -impl ServiceData for GenericService -where - T: Debug + 'static + Sync, -{ +impl ServiceData for GenericService { const SERVICE_ID: ServiceId = "FooService"; type Settings = (); type State = NoState; - type StateOperator = NoOperator; + type StateOperator = NoOperator; type Message = GenericServiceMessage; } #[async_trait] -impl ServiceCore for GenericService -where - T: Debug + 'static + Sync, -{ +impl ServiceCore for GenericService { fn init( - state: ServiceStateHandle, + state: ServiceStateHandle, _initial_state: Self::State, ) -> Result { - Ok(Self { - state, - _phantom: std::marker::PhantomData, - }) + Ok(Self { state }) } async fn run(mut self) -> Result<(), overwatch_rs::DynError> { @@ -97,21 +82,18 @@ where } #[derive(Services)] -struct TestApp -where - T: Debug + 'static + Sync, -{ - generic_service: ServiceHandle>, +struct TestApp { + generic_service: OpaqueServiceHandle, } #[test] fn derive_generic_service() { - let settings: TestAppServiceSettings = TestAppServiceSettings { + let settings: TestAppServiceSettings = TestAppServiceSettings { generic_service: (), }; - let overwatch = OverwatchRunner::>::run(settings, None).unwrap(); + let overwatch = OverwatchRunner::::run(settings, None).unwrap(); let handle = overwatch.handle().clone(); - let generic_service_relay = handle.relay::>(); + let generic_service_relay = handle.relay::(); overwatch.spawn(async move { let generic_service_relay = generic_service_relay diff --git a/overwatch-rs/tests/print_service.rs b/overwatch-rs/tests/print_service.rs index dc37538..b0d7195 100644 --- a/overwatch-rs/tests/print_service.rs +++ b/overwatch-rs/tests/print_service.rs @@ -2,15 +2,15 @@ use async_trait::async_trait; use futures::future::select; use overwatch_derive::Services; use overwatch_rs::overwatch::OverwatchRunner; -use overwatch_rs::services::handle::{ServiceHandle, ServiceStateHandle}; use overwatch_rs::services::relay::RelayMessage; use overwatch_rs::services::state::{NoOperator, NoState}; use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId}; +use overwatch_rs::{OpaqueServiceHandle, OpaqueServiceStateHandle}; use std::time::Duration; use tokio::time::sleep; pub struct PrintService { - state: ServiceStateHandle, + state: OpaqueServiceStateHandle, } #[derive(Clone, Debug)] @@ -22,14 +22,14 @@ impl ServiceData for PrintService { const SERVICE_ID: ServiceId = "FooService"; type Settings = (); type State = NoState; - type StateOperator = NoOperator; + type StateOperator = NoOperator; type Message = PrintServiceMessage; } #[async_trait] impl ServiceCore for PrintService { fn init( - state: ServiceStateHandle, + state: OpaqueServiceStateHandle, _initial_state: Self::State, ) -> Result { Ok(Self { state }) @@ -39,9 +39,10 @@ impl ServiceCore for PrintService { use tokio::io::{self, AsyncWriteExt}; let Self { - state: ServiceStateHandle { - mut inbound_relay, .. - }, + state: + OpaqueServiceStateHandle:: { + mut inbound_relay, .. + }, } = self; let print = async move { @@ -83,7 +84,7 @@ impl ServiceCore for PrintService { #[derive(Services)] struct TestApp { - print_service: ServiceHandle, + print_service: OpaqueServiceHandle, } #[test] diff --git a/overwatch-rs/tests/sequence.rs b/overwatch-rs/tests/sequence.rs index ee7923b..a7e0670 100644 --- a/overwatch-rs/tests/sequence.rs +++ b/overwatch-rs/tests/sequence.rs @@ -1,30 +1,30 @@ use overwatch_derive::Services; use overwatch_rs::overwatch::OverwatchRunner; -use overwatch_rs::services::handle::{ServiceHandle, ServiceStateHandle}; use overwatch_rs::services::relay::NoMessage; use overwatch_rs::services::state::{NoOperator, NoState}; use overwatch_rs::services::status::{ServiceStatus, StatusWatcher}; use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId}; use overwatch_rs::DynError; +use overwatch_rs::{OpaqueServiceHandle, OpaqueServiceStateHandle}; use std::time::Duration; pub struct AwaitService1 { - service_state: ServiceStateHandle, + service_state: OpaqueServiceStateHandle, } pub struct AwaitService2 { - service_state: ServiceStateHandle, + service_state: OpaqueServiceStateHandle, } pub struct AwaitService3 { - service_state: ServiceStateHandle, + service_state: OpaqueServiceStateHandle, } impl ServiceData for AwaitService1 { const SERVICE_ID: ServiceId = "S1"; type Settings = (); type State = NoState; - type StateOperator = NoOperator; + type StateOperator = NoOperator; type Message = NoMessage; } @@ -32,7 +32,7 @@ impl ServiceData for AwaitService2 { const SERVICE_ID: ServiceId = "S2"; type Settings = (); type State = NoState; - type StateOperator = NoOperator; + type StateOperator = NoOperator; type Message = NoMessage; } @@ -40,14 +40,14 @@ impl ServiceData for AwaitService3 { const SERVICE_ID: ServiceId = "S3"; type Settings = (); type State = NoState; - type StateOperator = NoOperator; + type StateOperator = NoOperator; type Message = NoMessage; } #[async_trait::async_trait] impl ServiceCore for AwaitService1 { fn init( - service_state: ServiceStateHandle, + service_state: OpaqueServiceStateHandle, _initial_state: Self::State, ) -> Result { Ok(Self { service_state }) @@ -71,7 +71,7 @@ impl ServiceCore for AwaitService1 { #[async_trait::async_trait] impl ServiceCore for AwaitService2 { fn init( - service_state: ServiceStateHandle, + service_state: OpaqueServiceStateHandle, _initial_state: Self::State, ) -> Result { Ok(Self { service_state }) @@ -111,7 +111,7 @@ impl ServiceCore for AwaitService2 { #[async_trait::async_trait] impl ServiceCore for AwaitService3 { fn init( - service_state: ServiceStateHandle, + service_state: OpaqueServiceStateHandle, _initial_state: Self::State, ) -> Result { Ok(Self { service_state }) @@ -150,9 +150,9 @@ impl ServiceCore for AwaitService3 { #[derive(Services)] struct SequenceServices { - c: ServiceHandle, - b: ServiceHandle, - a: ServiceHandle, + c: OpaqueServiceHandle, + b: OpaqueServiceHandle, + a: OpaqueServiceHandle, } #[test] diff --git a/overwatch-rs/tests/settings_update.rs b/overwatch-rs/tests/settings_update.rs index 7ddaca5..e8aa0d5 100644 --- a/overwatch-rs/tests/settings_update.rs +++ b/overwatch-rs/tests/settings_update.rs @@ -1,15 +1,15 @@ use async_trait::async_trait; use overwatch_derive::Services; use overwatch_rs::overwatch::OverwatchRunner; -use overwatch_rs::services::handle::{ServiceHandle, ServiceStateHandle}; use overwatch_rs::services::relay::RelayMessage; use overwatch_rs::services::state::{NoOperator, NoState}; use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId}; +use overwatch_rs::{OpaqueServiceHandle, OpaqueServiceStateHandle}; use std::time::Duration; use tokio::time::sleep; pub struct SettingsService { - state: ServiceStateHandle, + state: OpaqueServiceStateHandle, } type SettingsServiceSettings = String; @@ -23,14 +23,14 @@ impl ServiceData for SettingsService { const SERVICE_ID: ServiceId = "FooService"; type Settings = SettingsServiceSettings; type State = NoState; - type StateOperator = NoOperator; + type StateOperator = NoOperator; type Message = SettingsMsg; } #[async_trait] impl ServiceCore for SettingsService { fn init( - state: ServiceStateHandle, + state: OpaqueServiceStateHandle, _initial_state: Self::State, ) -> Result { Ok(Self { state }) @@ -38,9 +38,10 @@ impl ServiceCore for SettingsService { async fn run(mut self) -> Result<(), overwatch_rs::DynError> { let Self { - state: ServiceStateHandle { - settings_reader, .. - }, + state: + OpaqueServiceStateHandle:: { + settings_reader, .. + }, } = self; let print = async move { @@ -64,7 +65,7 @@ impl ServiceCore for SettingsService { #[derive(Services)] struct TestApp { - settings_service: ServiceHandle, + settings_service: OpaqueServiceHandle, } #[test] diff --git a/overwatch-rs/tests/state_handling.rs b/overwatch-rs/tests/state_handling.rs index 85821bb..c7a744b 100644 --- a/overwatch-rs/tests/state_handling.rs +++ b/overwatch-rs/tests/state_handling.rs @@ -1,17 +1,17 @@ use async_trait::async_trait; use overwatch_derive::Services; use overwatch_rs::overwatch::OverwatchRunner; -use overwatch_rs::services::handle::{ServiceHandle, ServiceStateHandle}; use overwatch_rs::services::relay::RelayMessage; use overwatch_rs::services::state::{ServiceState, StateOperator}; use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId}; +use overwatch_rs::{OpaqueServiceHandle, OpaqueServiceStateHandle}; use std::convert::Infallible; use std::time::Duration; use tokio::io::{self, AsyncWriteExt}; use tokio::time::sleep; pub struct UpdateStateService { - state: ServiceStateHandle, + state: OpaqueServiceStateHandle, } #[derive(Clone, Debug)] @@ -50,6 +50,7 @@ pub struct CounterStateOperator; #[async_trait] impl StateOperator for CounterStateOperator { type StateInput = CounterState; + type Settings = (); type LoadError = Infallible; fn try_load( @@ -84,7 +85,7 @@ impl ServiceData for UpdateStateService { #[async_trait] impl ServiceCore for UpdateStateService { fn init( - state: ServiceStateHandle, + state: OpaqueServiceStateHandle, _initial_state: Self::State, ) -> Result { Ok(Self { state }) @@ -92,7 +93,7 @@ impl ServiceCore for UpdateStateService { async fn run(mut self) -> Result<(), overwatch_rs::DynError> { let Self { - state: ServiceStateHandle { state_updater, .. }, + state: OpaqueServiceStateHandle:: { state_updater, .. }, } = self; for value in 0..10 { state_updater.update(CounterState { value }); @@ -104,7 +105,7 @@ impl ServiceCore for UpdateStateService { #[derive(Services)] struct TestApp { - update_state_service: ServiceHandle, + update_state_service: OpaqueServiceHandle, } #[test] diff --git a/overwatch-rs/tests/try_load.rs b/overwatch-rs/tests/try_load.rs index 0fbc16a..f9b5844 100644 --- a/overwatch-rs/tests/try_load.rs +++ b/overwatch-rs/tests/try_load.rs @@ -4,11 +4,11 @@ use std::time::Duration; use async_trait::async_trait; use overwatch_derive::Services; use overwatch_rs::overwatch::OverwatchRunner; -use overwatch_rs::services::handle::{ServiceHandle, ServiceStateHandle}; use overwatch_rs::services::relay::NoMessage; use overwatch_rs::services::state::{ServiceState, StateOperator}; use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId}; use overwatch_rs::DynError; +use overwatch_rs::{OpaqueServiceHandle, OpaqueServiceStateHandle}; use tokio::sync::broadcast; use tokio::sync::broadcast::error::SendError; @@ -32,6 +32,7 @@ struct TryLoadOperator; #[async_trait] impl StateOperator for TryLoadOperator { type StateInput = TryLoadState; + type Settings = TryLoadSettings; type LoadError = SendError; fn try_load( @@ -56,7 +57,7 @@ struct TryLoadSettings { } struct TryLoad { - service_state_handle: ServiceStateHandle, + service_state_handle: OpaqueServiceStateHandle, } impl ServiceData for TryLoad { @@ -70,7 +71,7 @@ impl ServiceData for TryLoad { #[async_trait] impl ServiceCore for TryLoad { fn init( - service_state: ServiceStateHandle, + service_state: OpaqueServiceStateHandle, _initial_state: Self::State, ) -> Result { Ok(Self { @@ -91,7 +92,7 @@ impl ServiceCore for TryLoad { #[derive(Services)] struct TryLoadApp { - try_load: ServiceHandle, + try_load: OpaqueServiceHandle, } #[test]