Skip to content

Commit f6effab

Browse files
committed
Save the last status in sled and ws send it
1 parent c9b99f5 commit f6effab

7 files changed

+113
-68
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
/target
2+
/spo2.db
23
**/*.rs.bk

Cargo.lock

+14-13
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ arraydeque = "0.4.5"
99
futures-preview = "0.3.0-alpha.18"
1010
futures-timer = "0.4.0"
1111
once_cell = "1.0.2"
12+
serde = { version = "1.0.100", features = ["derive"] }
1213
serde_json = "1.0.40"
1314
sled = "0.28.0"
1415
surf = "1.0.2"

src/health_checker.rs

+36-39
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
use std::time::{Duration, Instant};
22

3-
use futures::sink::SinkExt;
43
use futures::channel::mpsc::Sender;
4+
use futures::sink::SinkExt;
55
use futures_timer::{Delay, TryFutureExt};
6-
76
use url::Url;
8-
use crate::ReportStatus;
7+
8+
use crate::url_value::{Status, UrlValue};
9+
use crate::url_value::Status::{Healthy, Unhealthy, Unreacheable};
910

1011
const STILL_UNHEALTHY_TIMEOUT: Duration = Duration::from_secs(15 * 60); // 15 minutes
1112
const TIMEOUT: Duration = Duration::from_secs(5);
@@ -14,48 +15,47 @@ const FAST_PING: Duration = Duration::from_millis(800);
1415

1516
type ArrayDeque10<T> = arraydeque::ArrayDeque<[T; 10], arraydeque::Wrapping>;
1617

17-
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18-
enum Status {
19-
Healthy,
20-
Unhealthy,
21-
Unreacheable,
22-
}
23-
24-
use Status::{Healthy, Unhealthy, Unreacheable};
25-
26-
impl Status {
27-
fn is_good(&self) -> bool {
28-
*self == Healthy
29-
}
30-
}
31-
3218
pub async fn health_checker(
3319
url: Url,
34-
mut report_sender: Sender<(Url, ReportStatus)>,
20+
mut report_sender: Sender<(Url, Status)>,
3521
event_sender: ws::Sender,
3622
database: sled::Db,
3723
)
3824
{
3925
let mut last_status = ArrayDeque10::new();
4026
let mut in_bad_state_since = None;
4127

42-
let message = format!("{},{:?}", url, ReportStatus::Healthy);
43-
let _ = event_sender.send(message);
44-
4528
loop {
46-
let status = match surf::get(&url).timeout(TIMEOUT).await {
47-
Ok(ref resp) if resp.status().is_success() => Healthy,
48-
Ok(resp) => Unhealthy,
49-
Err(e) => Unreacheable,
29+
let (status, reason) = match surf::get(&url).timeout(TIMEOUT).await {
30+
Ok(ref resp) if resp.status().is_success() => {
31+
(Healthy, resp.status().to_string())
32+
},
33+
Ok(resp) => (Unhealthy, resp.status().to_string()),
34+
Err(e) => (Unreacheable, e.to_string()),
5035
};
5136

5237
last_status.push_front(status);
5338

54-
match database.get(url.as_str()) {
55-
Ok(Some(_)) => (),
39+
// update this value but do not erase the user custom data updates
40+
let result = database.update_and_fetch(url.as_str(), |old| {
41+
let old = old?;
42+
let mut value: UrlValue = serde_json::from_slice(old).unwrap();
43+
value.status = status;
44+
value.reason = reason.clone();
45+
Some(serde_json::to_vec(&value).unwrap())
46+
});
47+
48+
// retrieve the new value and deserialize it
49+
// assign the current url this way it can be send in notifications
50+
let value = match result {
51+
Ok(Some(value)) => {
52+
let mut value: UrlValue = serde_json::from_slice(&value).unwrap();
53+
value.url = Some(url.to_string());
54+
value
55+
},
5656
Ok(None) => break,
57-
Err(e) => eprintln!("{}: {}", url, e),
58-
}
57+
Err(e) => { eprintln!("{}: {}", url, e); return },
58+
};
5959

6060
let cap = last_status.capacity() as f32;
6161
let bads = last_status.iter().filter(|s| !s.is_good()).count() as f32;
@@ -64,20 +64,20 @@ pub async fn health_checker(
6464
if ratio >= 0.5 && in_bad_state_since.is_none() {
6565
in_bad_state_since = Some(Instant::now());
6666

67-
let report = (url.clone(), ReportStatus::Unhealthy);
67+
let report = (url.clone(), Status::Unhealthy);
6868
let _ = report_sender.send(report).await;
6969

70-
let message = format!("{},{:?}", url, ReportStatus::Unhealthy);
70+
let message = serde_json::to_string(&value).unwrap();
7171
let _ = event_sender.send(message);
7272
}
7373

7474
if ratio == 0.0 && in_bad_state_since.is_some() {
7575
in_bad_state_since = None;
7676

77-
let report = (url.clone(), ReportStatus::Healthy);
77+
let report = (url.clone(), Status::Healthy);
7878
let _ = report_sender.send(report).await;
7979

80-
let message = format!("{},{:?}", url, ReportStatus::Healthy);
80+
let message = serde_json::to_string(&value).unwrap();
8181
let _ = event_sender.send(message);
8282
}
8383

@@ -89,17 +89,14 @@ pub async fn health_checker(
8989

9090
if let Some(since) = in_bad_state_since {
9191
if since.elapsed() > STILL_UNHEALTHY_TIMEOUT {
92-
let report = (url.clone(), ReportStatus::Unhealthy);
92+
let report = (url.clone(), Status::Unhealthy);
9393
let _ = report_sender.send(report).await;
9494

95-
let message = format!("{},{:?}", url, ReportStatus::Unhealthy);
95+
let message = serde_json::to_string(&value).unwrap();
9696
let _ = event_sender.send(message);
9797

9898
in_bad_state_since = Some(Instant::now());
9999
}
100100
}
101101
}
102-
103-
let message = format!("{},{}", url, "Removed");
104-
let _ = event_sender.send(message);
105102
}

src/main.rs

+3-7
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
mod health_checker;
22
mod response;
33
mod routes;
4+
mod url_value;
45

56
use std::{env, io, str, thread};
67

@@ -11,21 +12,16 @@ use url::Url;
1112

1213
use self::health_checker::health_checker;
1314
use self::routes::{update_url, read_url, delete_url, get_all_urls};
15+
use self::url_value::Status;
1416

1517
const HTTP_LISTEN_ADDR: &str = "HTTP_LISTEN_ADDR";
1618
const WS_LISTEN_ADDR: &str = "WS_LISTEN_ADDR";
1719
const SLACK_HOOK_URL: &str = "SLACK_HOOK_URL";
1820
const DATABASE_PATH: &str = "DATABASE_PATH";
1921

20-
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
21-
pub enum ReportStatus {
22-
Unhealthy,
23-
Healthy,
24-
}
25-
2622
pub struct State {
2723
thread_pool: ThreadPool,
28-
notifier_sender: Sender<(Url, ReportStatus)>,
24+
notifier_sender: Sender<(Url, Status)>,
2925
event_sender: ws::Sender,
3026
database: sled::Db,
3127
}

src/routes.rs

+29-9
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ use serde_json::Value;
88
use crate::health_checker::health_checker;
99
use crate::response::{Json, into_internal_error, into_bad_request, not_found};
1010
use crate::State;
11+
use crate::url_value::UrlValue;
12+
use crate::url_value::Status::{Healthy, Removed};
1113

1214
pub async fn update_url(mut cx: Context<State>) -> Result<Json, WithStatus<String>> {
1315
let url = cx.param("url").map(urldecode::decode).unwrap();
@@ -16,27 +18,33 @@ pub async fn update_url(mut cx: Context<State>) -> Result<Json, WithStatus<Strin
1618
let body = cx.body_bytes().await.map_err(into_bad_request)?;
1719
let body = if body.is_empty() { None } else { Some(body) };
1820

19-
let value = match body {
21+
let user_data = match body {
2022
None => Value::Null,
2123
Some(body) => serde_json::from_slice(&body).map_err(into_bad_request)?,
2224
};
2325

24-
let value = serde_json::json!({ "data": value });
25-
let value = serde_json::to_vec(&value).map_err(into_internal_error)?;
26+
let mut value = UrlValue { url: None, status: Healthy, reason: String::new(), data: user_data };
27+
let value_bytes = serde_json::to_vec(&value).map_err(into_internal_error)?;
2628

2729
let pool = &cx.state().thread_pool;
2830
let database = cx.state().database.clone();
2931
let notifier_sender = cx.state().notifier_sender.clone();
3032
let event_sender = cx.state().event_sender.clone();
3133

32-
match database.insert(url.as_str(), value.as_slice()) {
34+
match database.insert(url.as_str(), value_bytes.as_slice()) {
3335
Ok(None) => {
36+
// send the initial healthy message when an url is added
37+
value.url = Some(url.to_string());
38+
let message = serde_json::to_string(&value).map_err(into_internal_error)?;
39+
let _ = event_sender.send(message);
40+
3441
pool.spawn_ok(async {
3542
health_checker(url, notifier_sender, event_sender, database).await
3643
});
37-
Ok(Json(value))
44+
45+
Ok(Json(value_bytes))
3846
},
39-
Ok(Some(_)) => Ok(Json(value)),
47+
Ok(Some(_)) => Ok(Json(value_bytes)),
4048
Err(e) => Err(into_internal_error(e)),
4149
}
4250
}
@@ -58,8 +66,19 @@ pub async fn delete_url(cx: Context<State>) -> Result<Json, WithStatus<String>>
5866
let url = Url::parse(&url).map_err(into_bad_request)?;
5967

6068
let database = &cx.state().database;
69+
let event_sender = &cx.state().event_sender;
70+
6171
match database.remove(url.as_str()) {
62-
Ok(Some(value)) => Ok(Json(value.to_vec())),
72+
Ok(Some(value_bytes)) => {
73+
let mut value: UrlValue = serde_json::from_slice(&value_bytes).map_err(into_internal_error)?;
74+
value.status = Removed;
75+
value.url = Some(url.to_string());
76+
77+
let message = serde_json::to_string(&value).map_err(into_internal_error)?;
78+
let _ = event_sender.send(message);
79+
80+
Ok(Json(value_bytes.to_vec()))
81+
},
6382
Ok(None) => Err(not_found()),
6483
Err(e) => Err(into_internal_error(e)),
6584
}
@@ -77,9 +96,10 @@ pub async fn get_all_urls(cx: Context<State>) -> Result<Json, WithStatus<String>
7796

7897
let string = str::from_utf8(&key).map_err(into_internal_error)?;
7998
let url = Url::parse(&string).map_err(into_internal_error)?;
80-
let value: Value = serde_json::from_slice(&value).map_err(into_internal_error)?;
8199

82-
let value = serde_json::json!({ "url": url.as_str(), "data": value });
100+
let mut value: UrlValue = serde_json::from_slice(&value).map_err(into_internal_error)?;
101+
value.url = Some(url.to_string());
102+
83103
urls.push(value);
84104
}
85105

0 commit comments

Comments
 (0)