Skip to content

Commit 8166e7b

Browse files
author
Devdutt Shenoi
authored
refactor: signal handling is not related to http alone (#1094)
1 parent 55edd85 commit 8166e7b

File tree

6 files changed

+52
-64
lines changed

6 files changed

+52
-64
lines changed

Diff for: src/handlers/http/health_check.rs

+4-37
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,7 @@ use actix_web::middleware::Next;
2525
use actix_web::{Error, HttpResponse};
2626
use lazy_static::lazy_static;
2727
use std::sync::Arc;
28-
use tokio::signal::ctrl_c;
29-
use tracing::info;
30-
31-
use tokio::sync::{oneshot, Mutex};
28+
use tokio::sync::Mutex;
3229

3330
// Create a global variable to store signal status
3431
lazy_static! {
@@ -53,46 +50,16 @@ pub async fn check_shutdown_middleware(
5350
}
5451
}
5552

56-
pub async fn handle_signals(shutdown_signal: Arc<Mutex<Option<oneshot::Sender<()>>>>) {
57-
#[cfg(windows)]
58-
{
59-
tokio::select! {
60-
_ = ctrl_c() => {
61-
info!("Received SIGINT signal at Readiness Probe Handler");
62-
shutdown(shutdown_signal).await;
63-
}
64-
}
65-
}
66-
#[cfg(unix)]
67-
{
68-
use tokio::signal::unix::{signal, SignalKind};
69-
let mut sigterm = signal(SignalKind::terminate()).unwrap();
70-
tokio::select! {
71-
_ = ctrl_c() => {
72-
info!("Received SIGINT signal at Readiness Probe Handler");
73-
shutdown(shutdown_signal).await;
74-
},
75-
_ = sigterm.recv() => {
76-
info!("Received SIGTERM signal at Readiness Probe Handler");
77-
shutdown(shutdown_signal).await;
78-
}
79-
}
80-
}
81-
}
82-
83-
async fn shutdown(shutdown_signal: Arc<Mutex<Option<oneshot::Sender<()>>>>) {
53+
// This function is called when the server is shutting down
54+
pub async fn shutdown() {
8455
// Set the shutdown flag to true
8556
let mut shutdown_flag = SIGNAL_RECEIVED.lock().await;
8657
*shutdown_flag = true;
8758

8859
// Sync to local
8960
crate::event::STREAM_WRITERS.unset_all();
90-
91-
// Trigger graceful shutdown
92-
if let Some(shutdown_sender) = shutdown_signal.lock().await.take() {
93-
let _ = shutdown_sender.send(());
94-
}
9561
}
62+
9663
pub async fn readiness() -> HttpResponse {
9764
// Check the object store connection
9865
if CONFIG.storage().get_object_store().check().await.is_ok() {

Diff for: src/handlers/http/modal/ingest_server.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ use bytes::Bytes;
5151
use once_cell::sync::Lazy;
5252
use relative_path::RelativePathBuf;
5353
use serde_json::Value;
54+
use tokio::sync::oneshot;
5455
use tracing::error;
5556

5657
/// ! have to use a guard before using it
@@ -97,7 +98,7 @@ impl ParseableServer for IngestServer {
9798
}
9899

99100
/// configure the server and start an instance to ingest data
100-
async fn init(&self) -> anyhow::Result<()> {
101+
async fn init(&self, shutdown_rx: oneshot::Receiver<()>) -> anyhow::Result<()> {
101102
let prometheus = metrics::build_metrics_handler();
102103
CONFIG.storage().register_store_metrics(&prometheus);
103104

@@ -114,7 +115,7 @@ impl ParseableServer for IngestServer {
114115
set_ingestor_metadata().await?;
115116

116117
// Ingestors shouldn't have to deal with OpenId auth flow
117-
let app = self.start(prometheus, None);
118+
let app = self.start(shutdown_rx, prometheus, None);
118119

119120
tokio::pin!(app);
120121
loop {

Diff for: src/handlers/http/modal/mod.rs

+5-20
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use openid::Discovered;
3838
use serde::Deserialize;
3939
use serde::Serialize;
4040
use ssl_acceptor::get_ssl_acceptor;
41-
use tokio::sync::{oneshot, Mutex};
41+
use tokio::sync::oneshot;
4242
use tracing::{error, info, warn};
4343

4444
use super::audit;
@@ -67,11 +67,12 @@ pub trait ParseableServer {
6767
async fn load_metadata(&self) -> anyhow::Result<Option<Bytes>>;
6868

6969
/// code that describes starting and setup procedures for each type of server
70-
async fn init(&self) -> anyhow::Result<()>;
70+
async fn init(&self, shutdown_rx: oneshot::Receiver<()>) -> anyhow::Result<()>;
7171

7272
/// configure the server
7373
async fn start(
7474
&self,
75+
shutdown_rx: oneshot::Receiver<()>,
7576
prometheus: PrometheusMetrics,
7677
oidc_client: Option<crate::oidc::OpenidConfig>,
7778
) -> anyhow::Result<()>
@@ -108,19 +109,6 @@ pub trait ParseableServer {
108109
.wrap(cross_origin_config())
109110
};
110111

111-
// Create a channel to trigger server shutdown
112-
let (shutdown_trigger, shutdown_rx) = oneshot::channel::<()>();
113-
let server_shutdown_signal = Arc::new(Mutex::new(Some(shutdown_trigger)));
114-
115-
// Clone the shutdown signal for the signal handler
116-
let shutdown_signal = server_shutdown_signal.clone();
117-
118-
// Spawn the signal handler task
119-
let signal_task = tokio::spawn(async move {
120-
health_check::handle_signals(shutdown_signal).await;
121-
println!("Received shutdown signal, notifying server to shut down...");
122-
});
123-
124112
// Create the HTTP server
125113
let http_server = HttpServer::new(create_app_fn)
126114
.workers(num_cpus::get())
@@ -142,6 +130,8 @@ pub trait ParseableServer {
142130
// Wait for the shutdown signal
143131
let _ = shutdown_rx.await;
144132

133+
health_check::shutdown().await;
134+
145135
// Perform S3 sync and wait for completion
146136
info!("Starting data sync to S3...");
147137
if let Err(e) = CONFIG.storage().get_object_store().sync(true).await {
@@ -158,11 +148,6 @@ pub trait ParseableServer {
158148
// Await the HTTP server to run
159149
let server_result = srv.await;
160150

161-
// Await the signal handler to ensure proper cleanup
162-
if let Err(e) = signal_task.await {
163-
error!("Error in signal handler: {:?}", e);
164-
}
165-
166151
// Wait for the sync task to complete before exiting
167152
if let Err(e) = sync_task.await {
168153
error!("Error in sync task: {:?}", e);

Diff for: src/handlers/http/modal/query_server.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use actix_web::web::{resource, ServiceConfig};
3434
use actix_web::{web, Scope};
3535
use async_trait::async_trait;
3636
use bytes::Bytes;
37+
use tokio::sync::oneshot;
3738
use tracing::{error, info};
3839

3940
use crate::{option::CONFIG, ParseableServer};
@@ -85,7 +86,7 @@ impl ParseableServer for QueryServer {
8586
}
8687

8788
/// initialize the server, run migrations as needed and start an instance
88-
async fn init(&self) -> anyhow::Result<()> {
89+
async fn init(&self, shutdown_rx: oneshot::Receiver<()>) -> anyhow::Result<()> {
8990
let prometheus = metrics::build_metrics_handler();
9091
CONFIG.storage().register_store_metrics(&prometheus);
9192

@@ -121,7 +122,7 @@ impl ParseableServer for QueryServer {
121122
sync::object_store_sync().await;
122123

123124
tokio::spawn(airplane::server());
124-
let app = self.start(prometheus, CONFIG.parseable.openid.clone());
125+
let app = self.start(shutdown_rx, prometheus, CONFIG.parseable.openid.clone());
125126

126127
tokio::pin!(app);
127128
loop {

Diff for: src/handlers/http/modal/server.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use actix_web::Scope;
4040
use actix_web_static_files::ResourceFiles;
4141
use async_trait::async_trait;
4242
use bytes::Bytes;
43+
use tokio::sync::oneshot;
4344
use tracing::error;
4445

4546
use crate::{
@@ -94,7 +95,7 @@ impl ParseableServer for Server {
9495
}
9596

9697
// configure the server and start an instance of the single server setup
97-
async fn init(&self) -> anyhow::Result<()> {
98+
async fn init(&self, shutdown_rx: oneshot::Receiver<()>) -> anyhow::Result<()> {
9899
let prometheus = metrics::build_metrics_handler();
99100
CONFIG.storage().register_store_metrics(&prometheus);
100101

@@ -124,7 +125,7 @@ impl ParseableServer for Server {
124125
tokio::spawn(handlers::livetail::server());
125126
tokio::spawn(handlers::airplane::server());
126127

127-
let app = self.start(prometheus, CONFIG.parseable.openid.clone());
128+
let app = self.start(shutdown_rx, prometheus, CONFIG.parseable.openid.clone());
128129

129130
tokio::pin!(app);
130131

Diff for: src/main.rs

+34-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ use parseable::{
2121
option::{Mode, CONFIG},
2222
rbac, storage, IngestServer, ParseableServer, QueryServer, Server,
2323
};
24+
use tokio::signal::ctrl_c;
25+
use tokio::sync::oneshot;
26+
use tracing::info;
2427
use tracing_subscriber::EnvFilter;
2528

2629
#[cfg(any(
@@ -61,7 +64,37 @@ async fn main() -> anyhow::Result<()> {
6164
tokio::task::spawn(kafka::setup_integration());
6265
}
6366

64-
server.init().await?;
67+
// Spawn a task to trigger graceful shutdown on appropriate signal
68+
let (shutdown_trigger, shutdown_rx) = oneshot::channel::<()>();
69+
tokio::spawn(async move {
70+
block_until_shutdown_signal().await;
71+
72+
// Trigger graceful shutdown
73+
println!("Received shutdown signal, notifying server to shut down...");
74+
shutdown_trigger.send(()).unwrap();
75+
});
76+
77+
server.init(shutdown_rx).await?;
6578

6679
Ok(())
6780
}
81+
82+
#[cfg(windows)]
83+
/// Asynchronously blocks until a shutdown signal is received
84+
pub async fn block_until_shutdown_signal() {
85+
_ = ctrl_c().await;
86+
info!("Received a CTRL+C event");
87+
}
88+
89+
#[cfg(unix)]
90+
/// Asynchronously blocks until a shutdown signal is received
91+
pub async fn block_until_shutdown_signal() {
92+
use tokio::signal::unix::{signal, SignalKind};
93+
let mut sigterm =
94+
signal(SignalKind::terminate()).expect("Failed to create SIGTERM signal handler");
95+
96+
tokio::select! {
97+
_ = ctrl_c() => info!("Received SIGINT signal"),
98+
_ = sigterm.recv() => info!("Received SIGTERM signal"),
99+
}
100+
}

0 commit comments

Comments
 (0)