Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions crates/service/src/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,26 +449,28 @@ mod tests {
counter: u32,
}

impl ServiceState for TestMonitoredState {
fn name(&self) -> &str {
"test_monitored"
}
}
impl ServiceState for TestMonitoredState {}

impl Service for TestMonitoredService {
type State = TestMonitoredState;
type Msg = u32;
type Status = TestStatus;
type Context = ();

fn get_status(state: &Self::State) -> Self::Status {
TestStatus {
counter: state.counter,
}
}

fn name() -> &'static str {
"test"
}
}

impl AsyncService for TestMonitoredService {
async fn process_input(
_ctx: &Self::Context,
state: &mut Self::State,
input: &Self::Msg,
) -> anyhow::Result<Response> {
Expand All @@ -492,27 +494,29 @@ mod tests {
updates: usize,
}

impl ServiceState for TestListenerState {
fn name(&self) -> &str {
"test_listener"
}
}
impl ServiceState for TestListenerState {}

impl Service for TestListenerService {
type State = TestListenerState;
type Msg = TestStatus;
type Status = TestListenerStatus;
type Context = ();

fn get_status(state: &Self::State) -> Self::Status {
TestListenerStatus {
last_seen: state.last_seen,
updates: state.updates,
}
}

fn name() -> &'static str {
"test"
}
}

impl AsyncService for TestListenerService {
async fn process_input(
_ctx: &Self::Context,
state: &mut Self::State,
input: &Self::Msg,
) -> anyhow::Result<Response> {
Expand All @@ -536,6 +540,7 @@ mod tests {
let monitored_monitor = ServiceBuilder::<TestMonitoredService, _>::new()
.with_state(monitored_state)
.with_input(monitored_input)
.with_context(())
.launch_async("test_monitored", &texec)
.await
.expect("test: launch monitored service");
Expand All @@ -551,6 +556,7 @@ mod tests {
let listener_monitor = ServiceBuilder::<TestListenerService, _>::new()
.with_state(listener_state)
.with_input(listener_input)
.with_context(())
.launch_async("test_listener", &texec)
.await
.expect("test: launch listener service");
Expand Down
23 changes: 15 additions & 8 deletions crates/service/src/async_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ use crate::{
instrumentation::{
record_shutdown_result, OperationResult, ServiceInstrumentation, ShutdownReason,
},
AsyncService, AsyncServiceInput, Response, ServiceState,
AsyncService, AsyncServiceInput, Response,
};

/// Async worker task.
pub(crate) async fn worker_task<S: AsyncService, I>(
ctx: S::Context,
mut state: S::State,
mut inp: I,
status_tx: watch::Sender<S::Status>,
Expand All @@ -23,8 +24,8 @@ pub(crate) async fn worker_task<S: AsyncService, I>(
where
I: AsyncServiceInput<Msg = S::Msg>,
{
let service_name = state.name().to_string();
let span_prefix = state.span_prefix().to_string();
let service_name = S::name().to_string();
let span_prefix = S::span_prefix().to_string();
let instrumentation = ServiceInstrumentation::new(&service_name);

// Create parent lifecycle span wrapping entire service lifetime
Expand All @@ -40,7 +41,7 @@ where
let launch_span = info_span!("{}.launch", span_prefix, service.name = %service_name);
let start = Instant::now();

let launch_result = S::on_launch(&mut state).instrument(launch_span).await;
let launch_result = S::on_launch(&ctx, &mut state).instrument(launch_span).await;
let duration = start.elapsed();
let result = OperationResult::from(&launch_result);

Expand All @@ -55,6 +56,7 @@ where
let mut exit_fut = Box::pin(shutdown_guard.wait_for_shutdown().fuse());
let mut wkr_fut = Box::pin(
worker_task_inner::<S, I>(
&ctx,
&mut state,
&mut inp,
&status_tx,
Expand All @@ -81,6 +83,7 @@ where
};

handle_shutdown::<S>(
&ctx,
&mut state,
err.as_ref(),
&instrumentation,
Expand All @@ -95,6 +98,7 @@ where
}

async fn worker_task_inner<S: AsyncService, I>(
ctx: &S::Context,
state: &mut S::State,
inp: &mut I,
status_tx: &watch::Sender<S::Status>,
Expand All @@ -104,7 +108,7 @@ async fn worker_task_inner<S: AsyncService, I>(
where
I: AsyncServiceInput<Msg = S::Msg>,
{
let service_name = state.name().to_string();
let service_name = S::name().to_string();

// Process messages in a loop
while let Some(input) = inp.recv_next().await? {
Expand All @@ -115,7 +119,9 @@ where
let start = Instant::now();

// Process the input.
let res = S::process_input(state, &input).instrument(msg_span).await;
let res = S::process_input(ctx, state, &input)
.instrument(msg_span)
.await;

let duration = start.elapsed();
let result = OperationResult::from(&res);
Expand Down Expand Up @@ -157,17 +163,18 @@ where
/// shutdown metrics. This runs on every service exit (normal shutdown, error, or signal).
/// Unclean exits (SIGKILL, panic, OOM) may skip this handler entirely.
async fn handle_shutdown<S: AsyncService>(
ctx: &S::Context,
state: &mut S::State,
err: Option<&anyhow::Error>,
instrumentation: &ServiceInstrumentation,
shutdown_reason: ShutdownReason,
span_prefix: &str,
) {
let service_name = state.name().to_string();
let service_name = S::name().to_string();
let shutdown_span = info_span!("{}.shutdown", span_prefix, service.name = %service_name);
let start = Instant::now();

let shutdown_result = S::before_shutdown(state, err)
let shutdown_result = S::before_shutdown(ctx, state, err)
.instrument(shutdown_span)
.await;

Expand Down
26 changes: 24 additions & 2 deletions crates/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,18 @@ use crate::{
pub struct ServiceBuilder<S: Service, I> {
state: Option<S::State>,
inp: Option<I>,
context: Option<S::Context>,
}

impl<S: Service<Context = ()>, I> ServiceBuilder<S, I> {
/// Constructs a service builder with unit context
pub fn new_with_unit_context() -> Self {
Self {
state: None,
inp: None,
context: Some(()),
}
}
}

impl<S: Service, I> ServiceBuilder<S, I> {
Expand All @@ -28,6 +40,12 @@ impl<S: Service, I> ServiceBuilder<S, I> {
self
}

/// Initializes new builder with a context
pub fn with_context(mut self, ctx: S::Context) -> ServiceBuilder<S, I> {
self.context = Some(ctx);
self
}

/// Returns if we're ready to start the service. Further changes MAY be
/// able to be made after this starts returning true.
///
Expand Down Expand Up @@ -58,11 +76,13 @@ where
// TODO convert to fallible results?
let state = self.state.expect("service/builder: missing state");
let inp = self.inp.expect("service/builder: missing input");
let ctx = self.context.expect("service/builder: missing context");

let init_status = S::get_status(&state);
let (status_tx, status_rx) = watch::channel(init_status);

let worker_fut_cls = move |g| async_worker::worker_task::<S, I>(state, inp, status_tx, g);
let worker_fut_cls =
move |g| async_worker::worker_task::<S, I>(ctx, state, inp, status_tx, g);
texec.spawn_critical_async_with_shutdown(name, worker_fut_cls);

Ok(ServiceMonitor::new(status_rx))
Expand All @@ -82,11 +102,12 @@ where
// TODO convert to fallible results?
let state = self.state.expect("service/builder: missing state");
let inp = self.inp.expect("service/builder: missing input");
let ctx = self.context.expect("service/builder: missing context");

let init_status = S::get_status(&state);
let (status_tx, status_rx) = watch::channel(init_status);

let worker_cls = move |g| sync_worker::worker_task::<S, I>(state, inp, status_tx, g);
let worker_cls = move |g| sync_worker::worker_task::<S, I>(ctx, state, inp, status_tx, g);
texec.spawn_critical(name, worker_cls);

Ok(ServiceMonitor::new(status_rx))
Expand Down Expand Up @@ -125,6 +146,7 @@ impl<S: Service, I> Default for ServiceBuilder<S, I> {
Self {
state: None,
inp: None,
context: None,
}
}
}
2 changes: 1 addition & 1 deletion crates/service/src/instrumentation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ impl ServiceInstrumentation {
/// # Arguments
///
/// * `span_prefix` - Domain-specific prefix (e.g., "asm", "csm", "chain")
/// * `service_name` - Name of the service (from `ServiceState::name()`)
/// * `service_name` - Name of the service (from `Service::name()`)
/// * `service_type` - Type of service ("async" or "sync")
///
/// # Returns
Expand Down
17 changes: 10 additions & 7 deletions crates/service/src/sync_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ use crate::{
instrumentation::{
record_shutdown_result, OperationResult, ServiceInstrumentation, ShutdownReason,
},
Response, ServiceState, SyncService, SyncServiceInput,
Response, SyncService, SyncServiceInput,
};

pub(crate) fn worker_task<S: SyncService, I>(
ctx: S::Context,
mut state: S::State,
mut inp: I,
status_tx: watch::Sender<S::Status>,
Expand All @@ -21,8 +22,8 @@ pub(crate) fn worker_task<S: SyncService, I>(
where
I: SyncServiceInput<Msg = S::Msg>,
{
let service_name = state.name().to_string();
let span_prefix = state.span_prefix().to_string();
let service_name = S::name().to_string();
let span_prefix = S::span_prefix().to_string();
let instrumentation = ServiceInstrumentation::new(&service_name);

// Create parent lifecycle span wrapping entire service lifetime
Expand All @@ -38,7 +39,7 @@ where
let _g = launch_span.enter();
let start = Instant::now();

let launch_result = S::on_launch(&mut state);
let launch_result = S::on_launch(&ctx, &mut state);
let duration = start.elapsed();
let result = OperationResult::from(&launch_result);

Expand Down Expand Up @@ -66,7 +67,7 @@ where
let start = Instant::now();

// Process the input.
let res = S::process_input(&mut state, &input);
let res = S::process_input(&ctx, &mut state, &input);

let duration = start.elapsed();
let result = OperationResult::from(&res);
Expand Down Expand Up @@ -114,6 +115,7 @@ where
};

handle_shutdown::<S>(
&ctx,
&mut state,
err.as_ref(),
&instrumentation,
Expand All @@ -132,18 +134,19 @@ where
/// shutdown metrics. This runs on every service exit (normal shutdown, error, or signal).
/// Unclean exits (SIGKILL, panic, OOM) may skip this handler entirely.
fn handle_shutdown<S: SyncService>(
ctx: &S::Context,
state: &mut S::State,
err: Option<&anyhow::Error>,
instrumentation: &ServiceInstrumentation,
shutdown_reason: ShutdownReason,
span_prefix: &str,
) {
let service_name = state.name().to_string();
let service_name = S::name().to_string();
let shutdown_span = info_span!("{}.shutdown", span_prefix, service.name = %service_name);
let _g = shutdown_span.enter();
let start = Instant::now();

let shutdown_result = S::before_shutdown(state, err);
let shutdown_result = S::before_shutdown(ctx, state, err);

let duration = start.elapsed();

Expand Down
Loading
Loading