diff --git a/crates/service/src/adapters.rs b/crates/service/src/adapters.rs index ffe5454..7a3b0c2 100644 --- a/crates/service/src/adapters.rs +++ b/crates/service/src/adapters.rs @@ -449,26 +449,28 @@ mod tests { counter: u32, } - impl ServiceState for TestMonitoredState { - fn name(&self) -> &str { - "test_monitored" - } - } + impl ServiceState for TestMonitoredState {} impl Service for TestMonitoredService { type State = TestMonitoredState; type Msg = u32; type Status = TestStatus; + type Context = (); fn get_status(state: &Self::State) -> Self::Status { TestStatus { counter: state.counter, } } + + fn name() -> &'static str { + "test" + } } impl AsyncService for TestMonitoredService { async fn process_input( + _ctx: &Self::Context, state: &mut Self::State, input: &Self::Msg, ) -> anyhow::Result { @@ -492,16 +494,13 @@ mod tests { updates: usize, } - impl ServiceState for TestListenerState { - fn name(&self) -> &str { - "test_listener" - } - } + impl ServiceState for TestListenerState {} impl Service for TestListenerService { type State = TestListenerState; type Msg = TestStatus; type Status = TestListenerStatus; + type Context = (); fn get_status(state: &Self::State) -> Self::Status { TestListenerStatus { @@ -509,10 +508,15 @@ mod tests { updates: state.updates, } } + + fn name() -> &'static str { + "test" + } } impl AsyncService for TestListenerService { async fn process_input( + _ctx: &Self::Context, state: &mut Self::State, input: &Self::Msg, ) -> anyhow::Result { @@ -536,6 +540,7 @@ mod tests { let monitored_monitor = ServiceBuilder::::new() .with_state(monitored_state) .with_input(monitored_input) + .with_context(()) .launch_async("test_monitored", &texec) .await .expect("test: launch monitored service"); @@ -551,6 +556,7 @@ mod tests { let listener_monitor = ServiceBuilder::::new() .with_state(listener_state) .with_input(listener_input) + .with_context(()) .launch_async("test_listener", &texec) .await .expect("test: launch listener service"); diff --git a/crates/service/src/async_worker.rs b/crates/service/src/async_worker.rs index 2510bb1..3447893 100644 --- a/crates/service/src/async_worker.rs +++ b/crates/service/src/async_worker.rs @@ -10,11 +10,12 @@ use crate::{ instrumentation::{ record_shutdown_result, OperationResult, ServiceInstrumentation, ShutdownReason, }, - AsyncService, AsyncServiceInput, Response, ServiceState, + AsyncService, AsyncServiceInput, Response, }; /// Async worker task. pub(crate) async fn worker_task( + ctx: S::Context, mut state: S::State, mut inp: I, status_tx: watch::Sender, @@ -23,8 +24,8 @@ pub(crate) async fn worker_task( where I: AsyncServiceInput, { - let service_name = state.name().to_string(); - let span_prefix = state.span_prefix().to_string(); + let service_name = S::name().to_string(); + let span_prefix = S::span_prefix().to_string(); let instrumentation = ServiceInstrumentation::new(&service_name); // Create parent lifecycle span wrapping entire service lifetime @@ -40,7 +41,7 @@ where let launch_span = info_span!("{}.launch", span_prefix, service.name = %service_name); let start = Instant::now(); - let launch_result = S::on_launch(&mut state).instrument(launch_span).await; + let launch_result = S::on_launch(&ctx, &mut state).instrument(launch_span).await; let duration = start.elapsed(); let result = OperationResult::from(&launch_result); @@ -55,6 +56,7 @@ where let mut exit_fut = Box::pin(shutdown_guard.wait_for_shutdown().fuse()); let mut wkr_fut = Box::pin( worker_task_inner::( + &ctx, &mut state, &mut inp, &status_tx, @@ -81,6 +83,7 @@ where }; handle_shutdown::( + &ctx, &mut state, err.as_ref(), &instrumentation, @@ -95,6 +98,7 @@ where } async fn worker_task_inner( + ctx: &S::Context, state: &mut S::State, inp: &mut I, status_tx: &watch::Sender, @@ -104,7 +108,7 @@ async fn worker_task_inner( where I: AsyncServiceInput, { - let service_name = state.name().to_string(); + let service_name = S::name().to_string(); // Process messages in a loop while let Some(input) = inp.recv_next().await? { @@ -115,7 +119,9 @@ where let start = Instant::now(); // Process the input. - let res = S::process_input(state, &input).instrument(msg_span).await; + let res = S::process_input(ctx, state, &input) + .instrument(msg_span) + .await; let duration = start.elapsed(); let result = OperationResult::from(&res); @@ -157,17 +163,18 @@ where /// shutdown metrics. This runs on every service exit (normal shutdown, error, or signal). /// Unclean exits (SIGKILL, panic, OOM) may skip this handler entirely. async fn handle_shutdown( + ctx: &S::Context, state: &mut S::State, err: Option<&anyhow::Error>, instrumentation: &ServiceInstrumentation, shutdown_reason: ShutdownReason, span_prefix: &str, ) { - let service_name = state.name().to_string(); + let service_name = S::name().to_string(); let shutdown_span = info_span!("{}.shutdown", span_prefix, service.name = %service_name); let start = Instant::now(); - let shutdown_result = S::before_shutdown(state, err) + let shutdown_result = S::before_shutdown(ctx, state, err) .instrument(shutdown_span) .await; diff --git a/crates/service/src/builder.rs b/crates/service/src/builder.rs index 7a7f954..a4ff761 100644 --- a/crates/service/src/builder.rs +++ b/crates/service/src/builder.rs @@ -14,6 +14,18 @@ use crate::{ pub struct ServiceBuilder { state: Option, inp: Option, + context: Option, +} + +impl, I> ServiceBuilder { + /// Constructs a service builder with unit context + pub fn new_with_unit_context() -> Self { + Self { + state: None, + inp: None, + context: Some(()), + } + } } impl ServiceBuilder { @@ -28,6 +40,12 @@ impl ServiceBuilder { self } + /// Initializes new builder with a context + pub fn with_context(mut self, ctx: S::Context) -> ServiceBuilder { + self.context = Some(ctx); + self + } + /// Returns if we're ready to start the service. Further changes MAY be /// able to be made after this starts returning true. /// @@ -58,11 +76,13 @@ where // TODO convert to fallible results? let state = self.state.expect("service/builder: missing state"); let inp = self.inp.expect("service/builder: missing input"); + let ctx = self.context.expect("service/builder: missing context"); let init_status = S::get_status(&state); let (status_tx, status_rx) = watch::channel(init_status); - let worker_fut_cls = move |g| async_worker::worker_task::(state, inp, status_tx, g); + let worker_fut_cls = + move |g| async_worker::worker_task::(ctx, state, inp, status_tx, g); texec.spawn_critical_async_with_shutdown(name, worker_fut_cls); Ok(ServiceMonitor::new(status_rx)) @@ -82,11 +102,12 @@ where // TODO convert to fallible results? let state = self.state.expect("service/builder: missing state"); let inp = self.inp.expect("service/builder: missing input"); + let ctx = self.context.expect("service/builder: missing context"); let init_status = S::get_status(&state); let (status_tx, status_rx) = watch::channel(init_status); - let worker_cls = move |g| sync_worker::worker_task::(state, inp, status_tx, g); + let worker_cls = move |g| sync_worker::worker_task::(ctx, state, inp, status_tx, g); texec.spawn_critical(name, worker_cls); Ok(ServiceMonitor::new(status_rx)) @@ -125,6 +146,7 @@ impl Default for ServiceBuilder { Self { state: None, inp: None, + context: None, } } } diff --git a/crates/service/src/instrumentation.rs b/crates/service/src/instrumentation.rs index 95f7d64..3874f84 100644 --- a/crates/service/src/instrumentation.rs +++ b/crates/service/src/instrumentation.rs @@ -322,7 +322,7 @@ impl ServiceInstrumentation { /// # Arguments /// /// * `span_prefix` - Domain-specific prefix (e.g., "asm", "csm", "chain") - /// * `service_name` - Name of the service (from `ServiceState::name()`) + /// * `service_name` - Name of the service (from `Service::name()`) /// * `service_type` - Type of service ("async" or "sync") /// /// # Returns diff --git a/crates/service/src/sync_worker.rs b/crates/service/src/sync_worker.rs index 5112528..80f4b1b 100644 --- a/crates/service/src/sync_worker.rs +++ b/crates/service/src/sync_worker.rs @@ -9,10 +9,11 @@ use crate::{ instrumentation::{ record_shutdown_result, OperationResult, ServiceInstrumentation, ShutdownReason, }, - Response, ServiceState, SyncService, SyncServiceInput, + Response, SyncService, SyncServiceInput, }; pub(crate) fn worker_task( + ctx: S::Context, mut state: S::State, mut inp: I, status_tx: watch::Sender, @@ -21,8 +22,8 @@ pub(crate) fn worker_task( where I: SyncServiceInput, { - let service_name = state.name().to_string(); - let span_prefix = state.span_prefix().to_string(); + let service_name = S::name().to_string(); + let span_prefix = S::span_prefix().to_string(); let instrumentation = ServiceInstrumentation::new(&service_name); // Create parent lifecycle span wrapping entire service lifetime @@ -38,7 +39,7 @@ where let _g = launch_span.enter(); let start = Instant::now(); - let launch_result = S::on_launch(&mut state); + let launch_result = S::on_launch(&ctx, &mut state); let duration = start.elapsed(); let result = OperationResult::from(&launch_result); @@ -66,7 +67,7 @@ where let start = Instant::now(); // Process the input. - let res = S::process_input(&mut state, &input); + let res = S::process_input(&ctx, &mut state, &input); let duration = start.elapsed(); let result = OperationResult::from(&res); @@ -114,6 +115,7 @@ where }; handle_shutdown::( + &ctx, &mut state, err.as_ref(), &instrumentation, @@ -132,18 +134,19 @@ where /// shutdown metrics. This runs on every service exit (normal shutdown, error, or signal). /// Unclean exits (SIGKILL, panic, OOM) may skip this handler entirely. fn handle_shutdown( + ctx: &S::Context, state: &mut S::State, err: Option<&anyhow::Error>, instrumentation: &ServiceInstrumentation, shutdown_reason: ShutdownReason, span_prefix: &str, ) { - let service_name = state.name().to_string(); + let service_name = S::name().to_string(); let shutdown_span = info_span!("{}.shutdown", span_prefix, service.name = %service_name); let _g = shutdown_span.enter(); let start = Instant::now(); - let shutdown_result = S::before_shutdown(state, err); + let shutdown_result = S::before_shutdown(ctx, state, err); let duration = start.elapsed(); diff --git a/crates/service/src/types.rs b/crates/service/src/types.rs index 2ab7f61..3bc5c0c 100644 --- a/crates/service/src/types.rs +++ b/crates/service/src/types.rs @@ -25,25 +25,26 @@ pub trait Service: Sync + Send + 'static { /// The status type derived from the state. type Status: ServiceStatus; + /// The context for the service + type Context: Send + Sync + 'static; + /// Gets the status from the current state. fn get_status(s: &Self::State) -> Self::Status; -} -/// Trait for service states which exposes common properties. -pub trait ServiceState: Sync + Send + 'static { - /// Name for a service that can be printed in logs. - /// - /// This SHOULD NOT change after the service worker has been started. - fn name(&self) -> &str; + /// The name of the service. Not supposed to change for a service. + fn name() -> &'static str; /// Span prefix for OpenTelemetry tracing. /// /// This is used to create semantic span names like "asm.lifecycle", "csm.lifecycle", etc. - fn span_prefix(&self) -> &str { - "service" + fn span_prefix() -> &'static str { + Self::name() } } +/// Trait for service states which exposes common properties. +pub trait ServiceState: Sync + Send + 'static {} + /// Trait for service messages, which we want to treat like simple dumb data /// containers. /// @@ -69,12 +70,16 @@ impl ServiceStatus f /// Trait for async service impls to define their per-input logic. pub trait AsyncService: Service { /// Called in the worker task after launching. - fn on_launch(_state: &mut Self::State) -> impl Future> + Send { + fn on_launch( + _ctx: &Self::Context, + _state: &mut Self::State, + ) -> impl Future> + Send { async { Ok(()) } } /// Called for each input. fn process_input( + _ctx: &Self::Context, _state: &mut Self::State, _input: &Self::Msg, ) -> impl Future> + Send { @@ -85,6 +90,7 @@ pub trait AsyncService: Service { /// /// Passed an error, if shutting down due to input handling error. fn before_shutdown( + _ctx: &Self::Context, _state: &mut Self::State, _err: Option<&anyhow::Error>, ) -> impl Future> + Send { @@ -95,12 +101,16 @@ pub trait AsyncService: Service { /// Trait for blocking service impls to define their per-input logic. pub trait SyncService: Service { /// Called in the worker thread after launching. - fn on_launch(_state: &mut Self::State) -> anyhow::Result<()> { + fn on_launch(_ctx: &Self::Context, _state: &mut Self::State) -> anyhow::Result<()> { Ok(()) } /// Called for each input. - fn process_input(_state: &mut Self::State, _input: &Self::Msg) -> anyhow::Result { + fn process_input( + _ctx: &Self::Context, + _state: &mut Self::State, + _input: &Self::Msg, + ) -> anyhow::Result { Ok(Response::Continue) } @@ -108,6 +118,7 @@ pub trait SyncService: Service { /// /// Passed an error, if shutting down due to input handling error. fn before_shutdown( + _ctx: &Self::Context, _state: &mut Self::State, _err: Option<&anyhow::Error>, ) -> anyhow::Result<()> {