Skip to content

Commit

Permalink
feature: Add mechanism to synchronize and update services status (Upd…
Browse files Browse the repository at this point in the history
…ated from #30) (#41)

Co-authored-by: danielsanchezq <[email protected]>
  • Loading branch information
AlejandroCabeza and danielSanchezQ authored Jan 24, 2025
1 parent 1b83775 commit f5f7ea0
Show file tree
Hide file tree
Showing 8 changed files with 380 additions and 3 deletions.
30 changes: 30 additions & 0 deletions overwatch-derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ fn generate_services_impl(
let impl_start = generate_start_impl(fields);
let impl_stop = generate_stop_impl(fields);
let impl_relay = generate_request_relay_impl(fields);
let impl_status = generate_request_status_watcher_impl(fields);
let impl_update_settings = generate_update_settings_impl(fields);

let (impl_generics, ty_generics, where_clause) = generics.split_for_impl();

quote! {
Expand All @@ -152,6 +154,8 @@ fn generate_services_impl(

#impl_relay

#impl_status

#impl_update_settings
}
}
Expand Down Expand Up @@ -286,6 +290,32 @@ fn generate_request_relay_impl(fields: &Punctuated<Field, Comma>) -> proc_macro2
}
}

fn generate_request_status_watcher_impl(
fields: &Punctuated<Field, Comma>,
) -> proc_macro2::TokenStream {
let cases = fields.iter().map(|field| {
let field_identifier = field.ident.as_ref().expect("A struct attribute identifier");
let type_id = utils::extract_type_from(&field.ty);
quote! {
<#type_id as ::overwatch_rs::services::ServiceData>::SERVICE_ID => {
::std::result::Result::Ok(self.#field_identifier.status_watcher())
}
}
});

quote! {
#[::tracing::instrument(skip(self), err)]
fn request_status_watcher(&self, service_id: ::overwatch_rs::services::ServiceId) -> ::overwatch_rs::services::status::ServiceStatusResult {
{
match service_id {
#( #cases )*
service_id => ::std::result::Result::Err(::overwatch_rs::services::status::ServiceStatusError::Unavailable { service_id })
}
}
}
}
}

fn generate_update_settings_impl(fields: &Punctuated<Field, Comma>) -> proc_macro2::TokenStream {
let fields_settings = fields.iter().map(|field| {
let field_identifier = field.ident.as_ref().expect("A struct attribute identifier");
Expand Down
9 changes: 9 additions & 0 deletions overwatch-rs/src/overwatch/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use tokio::sync::oneshot;

// internal
use crate::services::relay::RelayResult;
use crate::services::status::StatusWatcher;
use crate::services::ServiceId;

#[derive(Debug)]
Expand All @@ -31,6 +32,13 @@ pub struct RelayCommand {
pub(crate) reply_channel: ReplyChannel<RelayResult>,
}

/// Command for requesting
#[derive(Debug)]
pub struct StatusCommand {
pub(crate) service_id: ServiceId,
pub(crate) reply_channel: ReplyChannel<StatusWatcher>,
}

/// Command for managing [`ServiceCore`](crate::services::ServiceCore) lifecycle
#[allow(unused)]
#[derive(Debug)]
Expand All @@ -54,6 +62,7 @@ pub struct SettingsCommand(pub(crate) AnySettings);
#[derive(Debug)]
pub enum OverwatchCommand {
Relay(RelayCommand),
Status(StatusCommand),
ServiceLifeCycle(ServiceLifeCycleCommand),
OverwatchLifeCycle(OverwatchLifeCycleCommand),
Settings(SettingsCommand),
Expand Down
29 changes: 28 additions & 1 deletion overwatch-rs/src/overwatch/handle.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// std

// crates
use crate::overwatch::commands::{OverwatchCommand, OverwatchLifeCycleCommand, SettingsCommand};
use crate::overwatch::commands::{
OverwatchCommand, OverwatchLifeCycleCommand, ReplyChannel, SettingsCommand, StatusCommand,
};
use crate::overwatch::Services;
use crate::services::ServiceData;
use tokio::runtime::Handle;
Expand All @@ -12,6 +14,7 @@ use tracing::{error, info};

// internal
use crate::services::relay::Relay;
use crate::services::status::StatusWatcher;

/// Handler object over the main Overwatch runner
/// It handles communications to the main Overwatch runner.
Expand All @@ -35,6 +38,30 @@ impl OverwatchHandle {
Relay::new(self.clone())
}

// Request a status watcher for a service
pub async fn status_watcher<S: ServiceData>(&self) -> StatusWatcher {
info!("Requesting status watcher for {}", S::SERVICE_ID);
let (sender, receiver) = tokio::sync::oneshot::channel();
let watcher_request = self
.sender
.send(OverwatchCommand::Status(StatusCommand {
service_id: S::SERVICE_ID,
reply_channel: ReplyChannel::from(sender),
}))
.await;
match watcher_request {
Ok(_) => receiver.await.unwrap_or_else(|_| {
panic!(
"Service {} watcher should always be available",
S::SERVICE_ID
)
}),
Err(_) => {
unreachable!("Service watcher should always be available");
}
}
}

/// Send a shutdown signal to the overwatch runner
pub async fn shutdown(&self) {
info!("Shutting down Overwatch");
Expand Down
34 changes: 32 additions & 2 deletions overwatch-rs/src/overwatch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ use tracing::{error, info};
// internal
use crate::overwatch::commands::{
OverwatchCommand, OverwatchLifeCycleCommand, RelayCommand, ServiceLifeCycleCommand,
SettingsCommand,
SettingsCommand, StatusCommand,
};
use crate::overwatch::handle::OverwatchHandle;
pub use crate::overwatch::life_cycle::ServicesLifeCycleHandle;
use crate::services::life_cycle::LifecycleMessage;
use crate::services::relay::RelayResult;
use crate::services::status::ServiceStatusResult;
use crate::services::{ServiceError, ServiceId};
use crate::utils::runtime::default_multithread_runtime;

Expand Down Expand Up @@ -92,6 +93,8 @@ pub trait Services: Sized {
/// Request communication relay to one of the services
fn request_relay(&mut self, service_id: ServiceId) -> RelayResult;

fn request_status_watcher(&self, service_id: ServiceId) -> ServiceStatusResult;

/// Update service settings
fn update_settings(&mut self, settings: Self::Settings) -> Result<(), Error>;
}
Expand Down Expand Up @@ -161,6 +164,9 @@ where
OverwatchCommand::Relay(relay_command) => {
Self::handle_relay(&mut services, relay_command).await;
}
OverwatchCommand::Status(status_command) => {
Self::handle_status(&mut services, status_command).await;
}
OverwatchCommand::ServiceLifeCycle(msg) => match msg {
ServiceLifeCycleCommand {
service_id,
Expand Down Expand Up @@ -220,12 +226,31 @@ where
if let Ok(settings) = settings.downcast::<S::Settings>() {
if let Err(e) = services.update_settings(*settings) {
// TODO: add proper logging
dbg!(e);
error!("{e}");
}
} else {
unreachable!("Statically should always be of the correct type");
}
}
async fn handle_status(
services: &mut S,
StatusCommand {
service_id,
reply_channel,
}: StatusCommand,
) {
let watcher_result = services.request_status_watcher(service_id);
match watcher_result {
Ok(watcher) => {
if reply_channel.reply(watcher).await.is_err() {
error!("Error reporting back status watcher for service: {service_id}")
}
}
Err(e) => {
error!("{e}");
}
}
}
}

/// Main Overwatch entity
Expand Down Expand Up @@ -276,6 +301,7 @@ mod test {
use crate::overwatch::handle::OverwatchHandle;
use crate::overwatch::{Error, OverwatchRunner, Services, ServicesLifeCycleHandle};
use crate::services::relay::{RelayError, RelayResult};
use crate::services::status::{ServiceStatusError, ServiceStatusResult};
use crate::services::ServiceId;
use std::time::Duration;
use tokio::time::sleep;
Expand Down Expand Up @@ -308,6 +334,10 @@ mod test {
Err(RelayError::InvalidRequest { to: service_id })
}

fn request_status_watcher(&self, service_id: ServiceId) -> ServiceStatusResult {
Err(ServiceStatusError::Unavailable { service_id })
}

fn update_settings(&mut self, _settings: Self::Settings) -> Result<(), Error> {
Ok(())
}
Expand Down
9 changes: 9 additions & 0 deletions overwatch-rs/src/services/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::services::life_cycle::LifecycleHandle;
use crate::services::relay::{relay, InboundRelay, OutboundRelay};
use crate::services::settings::{SettingsNotifier, SettingsUpdater};
use crate::services::state::{StateHandle, StateOperator, StateUpdater};
use crate::services::status::{StatusHandle, StatusWatcher};
use crate::services::{ServiceCore, ServiceData, ServiceId, ServiceState};

// TODO: Abstract handle over state, to differentiate when the service is running and when it is not
Expand All @@ -22,6 +23,7 @@ pub struct ServiceHandle<S: ServiceData> {
/// Handle to overwatch
overwatch_handle: OverwatchHandle,
settings: SettingsUpdater<S::Settings>,
status: StatusHandle<S>,
initial_state: S::State,
}

Expand All @@ -30,6 +32,7 @@ pub struct ServiceHandle<S: ServiceData> {
pub struct ServiceStateHandle<S: ServiceData> {
/// Relay channel to communicate with the service runner
pub inbound_relay: InboundRelay<S::Message>,
pub status_handle: StatusHandle<S>,
/// Overwatch handle
pub overwatch_handle: OverwatchHandle,
pub settings_reader: SettingsNotifier<S::Settings>,
Expand Down Expand Up @@ -63,6 +66,7 @@ impl<S: ServiceData> ServiceHandle<S> {
outbound_relay: None,
overwatch_handle,
settings: SettingsUpdater::new(settings),
status: StatusHandle::new(),
initial_state,
})
}
Expand All @@ -88,6 +92,10 @@ impl<S: ServiceData> ServiceHandle<S> {
self.outbound_relay.clone()
}

pub fn status_watcher(&self) -> StatusWatcher {
self.status.watcher()
}

/// Update settings
pub fn update_settings(&self, settings: S::Settings) {
self.settings.update(settings)
Expand All @@ -109,6 +117,7 @@ impl<S: ServiceData> ServiceHandle<S> {

let service_state = ServiceStateHandle {
inbound_relay,
status_handle: self.status.clone(),
overwatch_handle: self.overwatch_handle.clone(),
state_updater,
settings_reader,
Expand Down
1 change: 1 addition & 0 deletions overwatch-rs/src/services/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod life_cycle;
pub mod relay;
pub mod settings;
pub mod state;
pub mod status;

// std
use std::fmt::Debug;
Expand Down
98 changes: 98 additions & 0 deletions overwatch-rs/src/services/status.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// std
use std::default::Default;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
// crates
use crate::services::{ServiceData, ServiceId};
use thiserror::Error;
use tokio::sync::watch;
// internal

#[derive(Error, Debug)]
pub enum ServiceStatusError {
#[error("service {service_id} is not available")]
Unavailable { service_id: ServiceId },
}

pub type ServiceStatusResult = Result<StatusWatcher, ServiceStatusError>;

#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum ServiceStatus {
Uninitialized,
Running,
Stopped,
}

pub struct StatusUpdater(watch::Sender<ServiceStatus>);

impl StatusUpdater {
pub fn update(&self, status: ServiceStatus) {
self.0
.send(status)
.expect("Overwatch always maintain an open watcher, send should always succeed")
}
}

#[derive(Debug, Clone)]
pub struct StatusWatcher(watch::Receiver<ServiceStatus>);

impl StatusWatcher {
pub async fn wait_for(
&mut self,
status: ServiceStatus,
timeout_duration: Option<Duration>,
) -> Result<ServiceStatus, ServiceStatus> {
let current = *self.0.borrow();
if status == current {
return Ok(current);
}
let timeout_duration = timeout_duration.unwrap_or_else(|| Duration::from_secs(u64::MAX));
tokio::time::timeout(timeout_duration, self.0.wait_for(|s| s == &status))
.await
.map(|r| r.map(|s| *s).map_err(|_| current))
.unwrap_or(Err(current))
}
}

pub struct StatusHandle<S: ServiceData> {
updater: Arc<StatusUpdater>,
watcher: StatusWatcher,
_phantom: PhantomData<S>,
}

impl<S: ServiceData> Clone for StatusHandle<S> {
fn clone(&self) -> Self {
Self {
updater: Arc::clone(&self.updater),
watcher: self.watcher.clone(),
_phantom: Default::default(),
}
}
}

impl<S: ServiceData> StatusHandle<S> {
pub fn new() -> Self {
let (updater, watcher) = watch::channel(ServiceStatus::Uninitialized);
let updater = Arc::new(StatusUpdater(updater));
let watcher = StatusWatcher(watcher);
Self {
updater,
watcher,
_phantom: Default::default(),
}
}
pub fn updater(&self) -> &StatusUpdater {
&self.updater
}

pub fn watcher(&self) -> StatusWatcher {
self.watcher.clone()
}
}

impl<S: ServiceData> Default for StatusHandle<S> {
fn default() -> Self {
Self::new()
}
}
Loading

0 comments on commit f5f7ea0

Please sign in to comment.