Skip to content

Commit

Permalink
chore: update bounds on core traits (#46)
Browse files Browse the repository at this point in the history
  • Loading branch information
ntn-x2 authored Feb 19, 2025
1 parent 1ef684e commit 86c7405
Show file tree
Hide file tree
Showing 26 changed files with 333 additions and 319 deletions.
6 changes: 3 additions & 3 deletions examples/ping_pong/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Crate
use overwatch_derive::Services;
use overwatch_rs::overwatch::OverwatchRunner;
use overwatch_rs::services::handle::ServiceHandle;
use overwatch_rs::OpaqueServiceHandle;
// Internal
use crate::service_ping::PingService;
use crate::service_pong::PongService;
Expand All @@ -16,8 +16,8 @@ mod states;

#[derive(Services)]
struct PingPong {
ping: ServiceHandle<PingService>,
pong: ServiceHandle<PongService>,
ping: OpaqueServiceHandle<PingService>,
pong: OpaqueServiceHandle<PongService>,
}

const PING_STATE_SAVE_PATH: &str = const_format::formatcp!(
Expand Down
11 changes: 5 additions & 6 deletions examples/ping_pong/src/operators.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
// STD
use std::fmt::Debug;
// Crates
use overwatch_rs::services::state::{ServiceState, StateOperator};
use overwatch_rs::services::state::StateOperator;
// Internal
use crate::states::PingState;
use crate::{settings::PingSettings, states::PingState};

#[derive(Debug, Clone)]
pub struct StateSaveOperator {
Expand All @@ -13,17 +13,16 @@ pub struct StateSaveOperator {
#[async_trait::async_trait]
impl StateOperator for StateSaveOperator {
type StateInput = PingState;
type Settings = PingSettings;
type LoadError = std::io::Error;

fn try_load(
settings: &<Self::StateInput as ServiceState>::Settings,
) -> Result<Option<Self::StateInput>, Self::LoadError> {
fn try_load(settings: &Self::Settings) -> Result<Option<Self::StateInput>, Self::LoadError> {
let state_string = std::fs::read_to_string(&settings.state_save_path)?;
serde_json::from_str(&state_string)
.map_err(|error| std::io::Error::new(std::io::ErrorKind::InvalidData, error))
}

fn from_settings(settings: <Self::StateInput as ServiceState>::Settings) -> Self {
fn from_settings(settings: Self::Settings) -> Self {
Self {
save_path: settings.state_save_path,
}
Expand Down
13 changes: 6 additions & 7 deletions examples/ping_pong/src/service_ping.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Crates
use overwatch_rs::services::handle::ServiceStateHandle;
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
use overwatch_rs::DynError;
use overwatch_rs::{DynError, OpaqueServiceStateHandle};
use std::time::Duration;
use tokio::time::sleep;
// Internal
Expand All @@ -12,7 +11,7 @@ use crate::settings::PingSettings;
use crate::states::PingState;

pub struct PingService {
service_state_handle: ServiceStateHandle<Self>,
service_state_handle: OpaqueServiceStateHandle<Self>,
initial_state: <Self as ServiceData>::State,
}

Expand All @@ -27,7 +26,7 @@ impl ServiceData for PingService {
#[async_trait::async_trait]
impl ServiceCore for PingService {
fn init(
service_state_handle: ServiceStateHandle<Self>,
service_state_handle: OpaqueServiceStateHandle<Self>,
initial_state: Self::State,
) -> Result<Self, DynError> {
Ok(Self {
Expand All @@ -53,7 +52,7 @@ impl ServiceCore for PingService {

loop {
tokio::select! {
_ = sleep(Duration::from_secs(1)) => {
() = sleep(Duration::from_secs(1)) => {
println!("Sending Ping");
pong_outbound_relay.send(PongMessage::Ping).await.unwrap();
}
Expand All @@ -64,14 +63,14 @@ impl ServiceCore for PingService {
service_state_handle.state_updater.update(
Self::State { pong_count }
);
println!("Received Pong. Total: {}", pong_count);
println!("Received Pong. Total: {pong_count}");
}
}
}
true = async {
pong_count >= 30
} => {
println!("Received {} Pongs. Exiting...", pong_count);
println!("Received {pong_count} Pongs. Exiting...");
break;
}
}
Expand Down
9 changes: 4 additions & 5 deletions examples/ping_pong/src/service_pong.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,26 @@
// Crates
use crate::messages::{PingMessage, PongMessage};
use crate::service_ping::PingService;
use overwatch_rs::services::handle::ServiceStateHandle;
use overwatch_rs::services::state::{NoOperator, NoState};
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
use overwatch_rs::DynError;
use overwatch_rs::{DynError, OpaqueServiceStateHandle};

pub struct PongService {
service_state_handle: ServiceStateHandle<Self>,
service_state_handle: OpaqueServiceStateHandle<Self>,
}

impl ServiceData for PongService {
const SERVICE_ID: ServiceId = "pong";
type Settings = ();
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type StateOperator = NoOperator<Self::State, Self::Settings>;
type Message = PongMessage;
}

#[async_trait::async_trait]
impl ServiceCore for PongService {
fn init(
service_state_handle: ServiceStateHandle<Self>,
service_state_handle: OpaqueServiceStateHandle<Self>,
_initial_state: Self::State,
) -> Result<Self, DynError> {
Ok(Self {
Expand Down
9 changes: 5 additions & 4 deletions overwatch-derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ fn generate_new_impl(fields: &Punctuated<Field, Comma>) -> proc_macro2::TokenStr
quote! {
#field_identifier: {
let manager =
::overwatch_rs::services::handle::ServiceHandle::<#service_type>::new(
#settings_field_identifier, overwatch_handle.clone(),
::overwatch_rs::OpaqueServiceHandle::<#service_type>::new::<<#service_type as ::overwatch_rs::services::ServiceData>::StateOperator>(
#settings_field_identifier, overwatch_handle.clone(), <#service_type as ::overwatch_rs::services::ServiceData>::SERVICE_RELAY_BUFFER_SIZE
)?;
manager
}
Expand All @@ -203,8 +203,9 @@ fn generate_new_impl(fields: &Punctuated<Field, Comma>) -> proc_macro2::TokenStr
fn generate_start_all_impl(fields: &Punctuated<Field, Comma>) -> proc_macro2::TokenStream {
let call_start = fields.iter().map(|field| {
let field_identifier = field.ident.as_ref().expect("A struct attribute identifier");
let type_id = utils::extract_type_from(&field.ty);
quote! {
self.#field_identifier.service_runner().run()?
self.#field_identifier.service_runner::<<#type_id as ::overwatch_rs::services::ServiceData>::StateOperator>().run::<#type_id>()?
}
});

Expand All @@ -223,7 +224,7 @@ fn generate_start_impl(fields: &Punctuated<Field, Comma>) -> proc_macro2::TokenS
let type_id = utils::extract_type_from(&field.ty);
quote! {
<#type_id as ::overwatch_rs::services::ServiceData>::SERVICE_ID => {
self.#field_identifier.service_runner().run()?;
self.#field_identifier.service_runner::<<#type_id as ::overwatch_rs::services::ServiceData>::StateOperator>().run::<#type_id>()?;
::std::result::Result::Ok(())
}
}
Expand Down
22 changes: 17 additions & 5 deletions overwatch-rs/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! Overwatch is a framework to easily construct applications that requires of several independent
//! parts that needs communication between them.
//! Overwatch is a framework to easily construct applications that are composed of several independent
//! parts requiring communication between them.
//! Everything is self contained and it matches somewhat the advantages of microservices.
//!
//! ## Design Goals
Expand All @@ -13,22 +13,34 @@
//! - It is easier to isolate problems
//! - Minimal sharing when unavoidable
//!
//! - Debuggeability
//! - Debuggability
//! - Easy to track workflow
//! - Easy to test
//! - Easy to measure
//! - Asynchronous Communication
//! - Asynchronous communication
//!
//! ## Main components
//!
//! - Overwatch: the main messenger relay component (internal communications). It is also be responsible of managing other components lifecycle and handling configuration updates.
//! - Overwatch: the main messenger relay component (internal communications). It is also responsible for managing other components lifecycle and handling configuration updates.
//! - Services (handled by the *overwatch*)
use crate::services::ServiceData;

pub mod overwatch;
pub mod services;
pub mod utils;

pub type DynError = Box<dyn std::error::Error + Send + Sync + 'static>;
pub type OpaqueServiceHandle<S> = crate::services::handle::ServiceHandle<
<S as ServiceData>::Message,
<S as ServiceData>::Settings,
<S as ServiceData>::State,
>;
pub type OpaqueServiceStateHandle<S> = crate::services::handle::ServiceStateHandle<
<S as ServiceData>::Message,
<S as ServiceData>::Settings,
<S as ServiceData>::State,
>;

#[cfg(feature = "derive")]
pub use overwatch_derive::*;
11 changes: 5 additions & 6 deletions overwatch-rs/src/overwatch/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ use crate::services::status::StatusWatcher;
use crate::services::ServiceId;

#[derive(Debug)]
pub(crate) struct ReplyChannel<M>(pub(crate) oneshot::Sender<M>);
pub(crate) struct ReplyChannel<Message>(pub(crate) oneshot::Sender<Message>);

impl<M> From<oneshot::Sender<M>> for ReplyChannel<M> {
fn from(sender: oneshot::Sender<M>) -> Self {
impl<Message> From<oneshot::Sender<Message>> for ReplyChannel<Message> {
fn from(sender: oneshot::Sender<Message>) -> Self {
Self(sender)
}
}

impl<M> ReplyChannel<M> {
pub async fn reply(self, message: M) -> Result<(), M> {
impl<Message> ReplyChannel<Message> {
pub fn reply(self, message: Message) -> Result<(), Message> {
self.0.send(message)
}
}
Expand All @@ -40,7 +40,6 @@ pub struct StatusCommand {
}

/// Command for managing [`ServiceCore`](crate::services::ServiceCore) lifecycle
#[allow(unused)]
#[derive(Debug)]
pub struct ServiceLifeCycleCommand {
pub service_id: ServiceId,
Expand Down
25 changes: 16 additions & 9 deletions overwatch-rs/src/overwatch/handle.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// std
use std::fmt::Debug;

// crates
use crate::overwatch::commands::{
Expand All @@ -20,40 +21,45 @@ use crate::services::status::StatusWatcher;
/// It handles communications to the main Overwatch runner.
#[derive(Clone, Debug)]
pub struct OverwatchHandle {
#[allow(unused)]
runtime_handle: Handle,
sender: Sender<OverwatchCommand>,
}

impl OverwatchHandle {
#[must_use]
pub fn new(runtime_handle: Handle, sender: Sender<OverwatchCommand>) -> Self {
Self {
runtime_handle,
sender,
}
}

#[must_use]
/// Request for a relay
pub fn relay<S: ServiceData>(&self) -> Relay<S> {
pub fn relay<Service>(&self) -> Relay<Service>
where
Service: ServiceData,
Service::Message: 'static,
{
Relay::new(self.clone())
}

// Request a status watcher for a service
pub async fn status_watcher<S: ServiceData>(&self) -> StatusWatcher {
info!("Requesting status watcher for {}", S::SERVICE_ID);
pub async fn status_watcher<Service: ServiceData>(&self) -> StatusWatcher {
info!("Requesting status watcher for {}", Service::SERVICE_ID);
let (sender, receiver) = tokio::sync::oneshot::channel();
let watcher_request = self
.sender
.send(OverwatchCommand::Status(StatusCommand {
service_id: S::SERVICE_ID,
service_id: Service::SERVICE_ID,
reply_channel: ReplyChannel::from(sender),
}))
.await;
match watcher_request {
Ok(_) => receiver.await.unwrap_or_else(|_| {
Ok(()) => receiver.await.unwrap_or_else(|_| {
panic!(
"Service {} watcher should always be available",
S::SERVICE_ID
Service::SERVICE_ID
)
}),
Err(_) => {
Expand Down Expand Up @@ -103,7 +109,7 @@ impl OverwatchHandle {
#[cfg_attr(feature = "instrumentation", instrument(skip(self)))]
pub async fn update_settings<S: Services>(&self, settings: S::Settings)
where
S::Settings: Send,
S::Settings: Send + Debug + 'static,
{
if let Err(e) = self
.sender
Expand All @@ -112,10 +118,11 @@ impl OverwatchHandle {
))))
.await
{
error!(error=?e, "Error updating settings")
error!(error=?e, "Error updating settings");
}
}

#[must_use]
pub fn runtime(&self) -> &Handle {
&self.runtime_handle
}
Expand Down
3 changes: 2 additions & 1 deletion overwatch-rs/src/overwatch/life_cycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ pub struct ServicesLifeCycleHandle {
}

impl ServicesLifeCycleHandle {
#[must_use]
pub fn empty() -> Self {
Self {
handlers: Default::default(),
handlers: HashMap::default(),
}
}

Expand Down
Loading

0 comments on commit 86c7405

Please sign in to comment.