Skip to content

Commit df14784

Browse files
committed
Handle signals and shutdown gracefully
1 parent 0a0d0f8 commit df14784

File tree

4 files changed

+164
-52
lines changed

4 files changed

+164
-52
lines changed

Cargo.lock

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,4 @@ log = "0.4.11"
1616
env_logger = "0.7.1"
1717
tiny_http = { git = "https://github.com/prasmussen/tiny-http" }
1818
ascii = "1.0.0"
19+
signal-hook = "0.1.16"

src/docker_run/api/mod.rs

Lines changed: 62 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -16,54 +16,78 @@ pub struct Config<C, H> {
1616
pub handler: H,
1717
}
1818

19-
pub fn start<C, H>(config: Config<C, H>) -> Result<(), Error>
20-
where
21-
C: Send + Clone + 'static,
22-
H: Send + Copy + 'static,
23-
H: FnOnce(&C, tiny_http::Request) {
24-
25-
let server = tiny_http::Server::new(config.listen_addr)
26-
.map_err(Error::Bind)?;
27-
28-
let mut handles = Vec::new();
29-
let request_handler = config.handler;
30-
31-
for n in 0..config.worker_threads {
32-
let handler_config = config.handler_config.clone();
33-
let server = server.try_clone()
34-
.map_err(|err| Error::CloneServer(err, n))?;
35-
36-
handles.push(thread::spawn(move || {
37-
loop {
38-
match server.accept() {
39-
Ok(client) => {
40-
for request in client {
41-
request_handler(&handler_config, request);
19+
pub struct Server {
20+
server: tiny_http::Server,
21+
}
22+
23+
impl Server {
24+
pub fn new(listen_addr: String) -> Result<Server, io::Error> {
25+
let server = tiny_http::Server::new(listen_addr)?;
26+
27+
Ok(Server{
28+
server,
29+
})
30+
}
31+
32+
pub fn start<C, H>(&self, config: Config<C, H>) -> Result<Workers, Error>
33+
where
34+
C: Send + Clone + 'static,
35+
H: Send + Copy + 'static,
36+
H: FnOnce(&C, tiny_http::Request) {
37+
38+
let mut handles = Vec::new();
39+
let request_handler = config.handler;
40+
41+
for n in 0..config.worker_threads {
42+
let handler_config = config.handler_config.clone();
43+
let server = self.server.try_clone()
44+
.map_err(|err| Error::CloneServer(err, n))?;
45+
46+
handles.push(thread::spawn(move || {
47+
loop {
48+
match server.accept() {
49+
Ok(client) => {
50+
for request in client {
51+
request_handler(&handler_config, request);
52+
}
4253
}
43-
}
4454

45-
Err(tiny_http::AcceptError::Accept(err)) => {
46-
log::error!("Accept error on thread {}: {:?}", n, err);
47-
break;
48-
}
55+
Err(tiny_http::AcceptError::Accept(err)) => {
56+
log::error!("Accept error on thread {}: {:?}", n, err);
57+
break;
58+
}
4959

50-
Err(tiny_http::AcceptError::ShuttingDown()) => {
51-
log::info!("Thread {} shutting down", n);
52-
break;
60+
Err(tiny_http::AcceptError::ShuttingDown()) => {
61+
log::info!("Thread {} shutting down", n);
62+
break;
63+
}
5364
}
5465
}
55-
}
56-
}))
57-
}
66+
}))
67+
}
5868

59-
// Wait for threads to complete, in practice this will block forever unless there is a panic
60-
for handle in handles {
61-
handle.join().unwrap();
69+
Ok(Workers{
70+
handles
71+
})
6272
}
73+
}
6374

64-
Ok(())
75+
pub struct Workers {
76+
handles: Vec<thread::JoinHandle<()>>
6577
}
6678

79+
impl Workers {
80+
pub fn wait(self) {
81+
// Wait for threads to complete, in practice this will block forever unless:
82+
// - The server is shutdown
83+
// - One of the threads panics
84+
for handle in self.handles {
85+
handle.join().unwrap();
86+
}
87+
}
88+
}
89+
90+
6791
#[derive(Debug, Clone)]
6892
pub struct ApiConfig {
6993
pub access_token: ascii::AsciiString,
@@ -181,17 +205,12 @@ pub fn error_response(request: tiny_http::Request, error: ErrorResponse) -> Resu
181205

182206

183207
pub enum Error {
184-
Bind(io::Error),
185208
CloneServer(io::Error, u16),
186209
}
187210

188211
impl fmt::Display for Error {
189212
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
190213
match self {
191-
Error::Bind(err) => {
192-
write!(f, "Failed to bind: {}", err)
193-
}
194-
195214
Error::CloneServer(err, n) => {
196215
write!(f, "Failed to clone server (n = {}): {}", n, err)
197216
}

src/main.rs

Lines changed: 81 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
mod docker_run;
22

3+
use std::io;
4+
use std::fmt;
5+
use std::thread;
36
use std::process;
47
use std::time::Duration;
58

9+
use signal_hook::iterator::Signals;
10+
611
use docker_run::config;
712
use docker_run::environment;
813
use docker_run::unix_stream;
@@ -14,38 +19,72 @@ fn main() {
1419
env_logger::init();
1520

1621
match start() {
17-
Ok(()) => {}
18-
19-
Err(Error::BuildConfig(err)) => {
20-
log::error!("Failed to build config: {}", err);
21-
process::exit(1)
22+
Ok(()) => {
23+
log::info!("Exiting gracefully")
2224
}
2325

24-
Err(Error::StartServer(err)) => {
25-
log::error!("Failed to start api server: {}", err);
26+
Err(err) => {
27+
log::error!("{}", err);
2628
process::exit(1)
2729
}
2830
}
2931
}
3032

3133
enum Error {
3234
BuildConfig(environment::Error),
35+
CreateServer(io::Error),
3336
StartServer(api::Error),
37+
Signal(io::Error),
38+
}
39+
40+
impl fmt::Display for Error {
41+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
42+
match self {
43+
Error::BuildConfig(err) => {
44+
write!(f, "Failed to build config: {}", err)
45+
}
46+
47+
Error::CreateServer(err) => {
48+
write!(f, "Failed to create api server: {}", err)
49+
}
50+
51+
Error::StartServer(err) => {
52+
write!(f, "Failed to start api server: {}", err)
53+
}
54+
55+
Error::Signal(err) => {
56+
write!(f, "Failed to register signal handler: {}", err)
57+
}
58+
}
59+
}
3460
}
3561

62+
3663
fn start() -> Result<(), Error> {
3764
let env = environment::get_environment();
3865
let config = build_config(&env)
3966
.map_err(Error::BuildConfig)?;
4067

4168
log::info!("Listening on {} with {} worker threads", config.server.listen_addr_with_port(), config.server.worker_threads);
4269

43-
api::start(api::Config{
70+
let server = api::Server::new(config.server.listen_addr_with_port())
71+
.map_err(Error::CreateServer)?;
72+
73+
let workers = server.start(api::Config{
4474
listen_addr: config.server.listen_addr_with_port(),
4575
worker_threads: config.server.worker_threads,
4676
handler_config: config,
4777
handler: handle_request,
48-
}).map_err(Error::StartServer)
78+
}).map_err(Error::StartServer)?;
79+
80+
// Handle OS signals
81+
handle_signals(server)
82+
.map_err(Error::Signal)?;
83+
84+
// Wait for workers
85+
workers.wait();
86+
87+
Ok(())
4988
}
5089

5190

@@ -92,6 +131,39 @@ fn router(config: &config::Config, request: &mut tiny_http::Request) -> Result<a
92131
}
93132

94133

134+
fn handle_signals(server: api::Server) -> Result<(), io::Error> {
135+
let signals = Signals::new(&[
136+
signal_hook::SIGTERM,
137+
signal_hook::SIGINT,
138+
])?;
139+
140+
thread::spawn(move || {
141+
for signal in signals.forever() {
142+
match signal {
143+
signal_hook::SIGTERM => {
144+
log::info!("Caught SIGTERM signal");
145+
break
146+
}
147+
148+
signal_hook::SIGINT => {
149+
log::info!("Caught SIGINT signal");
150+
break
151+
}
152+
153+
_ => {
154+
log::info!("Ignoring signal {}", signal);
155+
}
156+
}
157+
}
158+
159+
log::info!("Shutting down server...");
160+
drop(server)
161+
});
162+
163+
Ok(())
164+
}
165+
166+
95167
fn build_config(env: &environment::Environment) -> Result<config::Config, environment::Error> {
96168
let server = build_server_config(env)?;
97169
let api = build_api_config(env)?;

0 commit comments

Comments
 (0)