Skip to content

Commit 487a8a6

Browse files
committed
Various enhancements for test and release
- Add logs and access logs for informationals - Add metrics - Fix panic on malformed and Http/1.0 requests - Some clippy fixes Signed-off-by: Eloi DEMOLIS <[email protected]>
1 parent 6507da3 commit 487a8a6

File tree

9 files changed

+234
-129
lines changed

9 files changed

+234
-129
lines changed

bin/src/cli.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,10 @@ pub enum SubCmd {
180180
#[clap(subcommand)]
181181
cmd: ConfigCmd,
182182
},
183-
#[clap(name = "events", about = "receive sozu events about the status of backends")]
183+
#[clap(
184+
name = "events",
185+
about = "receive sozu events about the status of backends"
186+
)]
184187
Events,
185188
}
186189

@@ -296,6 +299,11 @@ pub enum ClusterCmd {
296299
help = "Configures the load balancing policy. Possible values are 'roundrobin', 'random' or 'leastconnections'"
297300
)]
298301
load_balancing_policy: LoadBalancingAlgorithms,
302+
#[clap(
303+
long = "http2",
304+
help = "the backends of this cluster use http2 prio-knowledge"
305+
)]
306+
h2: bool,
299307
},
300308
}
301309

@@ -422,8 +430,6 @@ pub enum HttpFrontendCmd {
422430
method: Option<String>,
423431
#[clap(long = "tags", help = "Specify tag (key-value pair) to apply on front-end (example: 'key=value, other-key=other-value')", value_parser = parse_tags)]
424432
tags: Option<BTreeMap<String, String>>,
425-
#[clap(help = "the frontend uses http2 with prio-knowledge")]
426-
h2: Option<bool>,
427433
},
428434
#[clap(name = "remove")]
429435
Remove {

bin/src/ctl/request_builder.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ impl CommandManager {
150150
send_proxy,
151151
expect_proxy,
152152
load_balancing_policy,
153+
h2,
153154
} => {
154155
let proxy_protocol = match (send_proxy, expect_proxy) {
155156
(true, true) => Some(ProxyProtocolConfig::RelayHeader),
@@ -164,7 +165,9 @@ impl CommandManager {
164165
https_redirect,
165166
proxy_protocol: proxy_protocol.map(|pp| pp as i32),
166167
load_balancing: load_balancing_policy as i32,
167-
..Default::default()
168+
http2: h2,
169+
load_metric: None,
170+
answer_503: None,
168171
})
169172
.into(),
170173
)
@@ -238,7 +241,6 @@ impl CommandManager {
238241
method,
239242
cluster_id: route,
240243
tags,
241-
h2,
242244
} => self.send_request(
243245
RequestType::AddHttpFrontend(RequestHttpFrontend {
244246
cluster_id: route.into(),
@@ -287,7 +289,6 @@ impl CommandManager {
287289
method,
288290
cluster_id: route,
289291
tags,
290-
h2,
291292
} => self.send_request(
292293
RequestType::AddHttpsFrontend(RequestHttpFrontend {
293294
cluster_id: route.into(),

lib/src/http.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,10 @@ impl HttpSession {
218218
Some(session_address),
219219
public_address,
220220
);
221-
context.create_stream(expect.request_id, 1 << 16)?;
221+
if context.create_stream(expect.request_id, 1 << 16).is_none() {
222+
error!("HTTP expect upgrade failed: could not create stream");
223+
return None;
224+
}
222225
let mut mux = Mux {
223226
configured_frontend_timeout: self.configured_frontend_timeout,
224227
frontend_token: self.frontend_token,
@@ -232,7 +235,13 @@ impl HttpSession {
232235
gauge_add!("protocol.http", 1);
233236
Some(HttpStateMachine::Mux(mux))
234237
}
235-
_ => None,
238+
_ => {
239+
warn!(
240+
"HTTP expect upgrade failed: bad header {:?}",
241+
expect.addresses
242+
);
243+
None
244+
}
236245
}
237246
}
238247

lib/src/https.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,10 @@ impl HttpsSession {
248248
if !expect.container_frontend_timeout.cancel() {
249249
error!("failed to cancel request timeout on expect upgrade phase for 'expect proxy protocol with AF_UNSPEC address'");
250250
}
251-
251+
warn!(
252+
"HTTP expect upgrade failed: bad header {:?}",
253+
expect.addresses
254+
);
252255
None
253256
}
254257

@@ -299,14 +302,18 @@ impl HttpsSession {
299302
);
300303
let mut frontend = match alpn {
301304
AlpnProtocol::Http11 => {
305+
incr!("http.alpn.http11");
302306
context.create_stream(handshake.request_id, 1 << 16)?;
303307
mux::Connection::new_h1_server(front_stream, handshake.container_frontend_timeout)
304308
}
305-
AlpnProtocol::H2 => mux::Connection::new_h2_server(
306-
front_stream,
307-
self.pool.clone(),
308-
handshake.container_frontend_timeout,
309-
)?,
309+
AlpnProtocol::H2 => {
310+
incr!("http.alpn.h2");
311+
mux::Connection::new_h2_server(
312+
front_stream,
313+
self.pool.clone(),
314+
handshake.container_frontend_timeout,
315+
)?
316+
}
310317
};
311318
frontend.readiness_mut().event = handshake.frontend_readiness.event;
312319

lib/src/protocol/mux/h1.rs

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -90,18 +90,41 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
9090
self.readiness.interest.remove(Ready::READABLE);
9191
}
9292
if kawa.is_main_phase() {
93-
if let StreamState::Linked(token) = stream.state {
94-
endpoint
95-
.readiness_mut(token)
96-
.interest
97-
.insert(Ready::WRITABLE)
98-
}
9993
if !was_main_phase && self.position.is_server() {
94+
if parts.context.method.is_none()
95+
|| parts.context.authority.is_none()
96+
|| parts.context.path.is_none()
97+
{
98+
if let kawa::StatusLine::Request {
99+
version: kawa::Version::V10,
100+
..
101+
} = kawa.detached.status_line
102+
{
103+
error!(
104+
"Unexpected malformed request: HTTP/1.0 from {:?} with {:?} {:?} {:?}",
105+
parts.context.session_address,
106+
parts.context.method,
107+
parts.context.authority,
108+
parts.context.path
109+
);
110+
} else {
111+
error!("Unexpected malformed request");
112+
kawa::debug_kawa(kawa);
113+
}
114+
set_default_answer(stream, &mut self.readiness, 400);
115+
return MuxResult::Continue;
116+
}
100117
self.requests += 1;
101118
println_!("REQUESTS: {}", self.requests);
102119
gauge_add!("http.active_requests", 1);
103120
stream.state = StreamState::Link
104121
}
122+
if let StreamState::Linked(token) = stream.state {
123+
endpoint
124+
.readiness_mut(token)
125+
.interest
126+
.insert(Ready::WRITABLE)
127+
}
105128
};
106129
MuxResult::Continue
107130
}
@@ -150,6 +173,11 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
150173
match kawa.detached.status_line {
151174
kawa::StatusLine::Response { code: 101, .. } => {
152175
debug!("============== HANDLE UPGRADE!");
176+
stream.generate_access_log(
177+
false,
178+
Some(String::from("H1::Upgrade")),
179+
context.listener.clone(),
180+
);
153181
return MuxResult::Upgrade;
154182
}
155183
kawa::StatusLine::Response { code: 100, .. } => {
@@ -158,6 +186,11 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
158186
self.timeout_container.reset();
159187
self.readiness.interest.insert(Ready::READABLE);
160188
kawa.clear();
189+
stream.generate_access_log(
190+
false,
191+
Some(String::from("H1::Continue")),
192+
context.listener.clone(),
193+
);
161194
return MuxResult::Continue;
162195
}
163196
kawa::StatusLine::Response { code: 103, .. } => {
@@ -169,14 +202,24 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
169202
.interest
170203
.insert(Ready::READABLE);
171204
kawa.clear();
205+
stream.generate_access_log(
206+
false,
207+
Some(String::from("H1::EarlyHint+Error")),
208+
context.listener.clone(),
209+
);
172210
return MuxResult::Continue;
173211
} else {
212+
stream.generate_access_log(
213+
false,
214+
Some(String::from("H1::EarlyHint")),
215+
context.listener.clone(),
216+
);
174217
return MuxResult::CloseSession;
175218
}
176219
}
177220
_ => {}
178221
}
179-
// ACCESS LOG
222+
incr!("http.e2e.http11");
180223
stream.generate_access_log(
181224
false,
182225
Some(String::from("H1")),

0 commit comments

Comments
 (0)