-
Notifications
You must be signed in to change notification settings - Fork 109
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix hanging requests with filtered steal #3016
base: main
Are you sure you want to change the base?
Fix hanging requests with filtered steal #3016
Conversation
edb6727
to
22ce712
Compare
0c90fbf
to
7508902
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly some doc requests while I'm still going through stuff.
@@ -0,0 +1 @@ | |||
Fixed an issue where HTTP requests stolen with a filter would hang with a single-threaded local HTTP server. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like an understatement. I know that this is what users would care about, but I think this PR warrants at least adding an "Also improved internal handling of incoming connections`, or something like that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
match self { | ||
HttpResponseFallback::Framed(req) => req | ||
.internal_response | ||
.map_body(|body| body.map_err(|_| unreachable!()).boxed()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why were we doing these map_err(unreachable)
, we should just unwrap
if it's a panic
anyways.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we need a specific body type. Some requests are passed to the clients, and the response body implements Body<Data = Bytes, Error = Infallible>
. Other requests are passed to the original destination, and the response body implements Body<Data = Bytes, Error = hyper::Error>
. With this trick we unify (the function passed to Body::map_err
is of type Fn(Infallible) -> hyper::Error
@@ -40,7 +39,7 @@ pub(crate) struct TcpStealerApi { | |||
/// View on the stealer task's status. | |||
task_status: TaskStatus, | |||
|
|||
response_body_txs: HashMap<(ConnectionId, RequestId), Sender<hyper::Result<Frame<Bytes>>>>, | |||
response_body_txs: HashMap<(ConnectionId, RequestId), ResponseBodyTx>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
response_body_txs: HashMap<(ConnectionId, RequestId), ResponseBodyTx>, | |
/// Keeps track of the TCP stealer connections, associating them with the [`RequestId`] of the HTTP response. The `RequestId` is id of the [`MatchedHttpRequest`], so we create an association between a request and its response. | |
response_body_txs: HashMap<(ConnectionId, RequestId), ResponseBodyTx>, |
This one is a mess, my suggestion is confusing even to me, but I have faith you can build on top of it and make this the best doc ever.
pub(crate) use self::reversible_stream::ReversibleStream; | ||
pub(crate) use filter::HttpFilter; | ||
pub(crate) use response_fallback::{HttpResponseFallback, ReceiverStreamBody}; | ||
pub(crate) use reversible_stream::ReversibleStream; | ||
|
||
/// Handy alias due to [`ReversibleStream`] being generic, avoiding value mismatches. | ||
pub(crate) type DefaultReversibleStream = ReversibleStream<{ HttpVersion::MINIMAL_HEADER_SIZE }>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Considering that this refactor is already changing a bunch of stuff around these areas, what do you think of we dropping most of this in favor of TcpStream::peek
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, we can't use peek. It may return too few bytes for us to determine connection type, and Successive calls return the same data
:<
} | ||
} | ||
|
||
pub struct Frames<D> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add some docs, and while at it, either put in the docs what D
stands for, or change the parameter to Data
or something byte related.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
mirrord/protocol/src/batched_body.rs
Outdated
loop { | ||
match self.frame().now_or_never() { | ||
None => { | ||
frames.is_last = false; | ||
break; | ||
} | ||
Some(None) => { | ||
frames.is_last = true; | ||
break; | ||
} | ||
Some(Some(result)) => { | ||
frames.frames.push(result?); | ||
} | ||
} | ||
} | ||
|
||
Ok(frames) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Considering this is pretty much the same as the other method, maybe create a helper function that takes the Frames
we're pushing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moar stuff
// construct IncomingMode from config file | ||
let incoming = background_tasks.register(IncomingProxy::default(), (), 512); | ||
|
||
agent_connection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Considering how finnicky this is (and how easy it is to forget to do this when starting an agent connection), I wonder if we shouldn't refactor how we do the version negotiation. Like, you only get an AgentConnection
to send other messages after this whole thing has been done.
Maybe not for this PR, just some thoughts for the future.
@@ -609,21 +580,18 @@ impl ReversePortForwarder { | |||
) | |||
} | |||
}, | |||
(MainTaskId::IncomingProxy, TaskUpdate::Finished(result)) => match result { | |||
|
|||
TaskUpdate::Finished(result) => match result { | |||
Ok(()) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has major The operation completed successfully
windows error vibes.
How do we get here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should never, unless there's a bug in the IncomingProxy
. I'm up for doing unreachable!
here, if you think it's better
) { | ||
loop { | ||
let Some(message) = rx.recv().await else { | ||
break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of these break
s, can't we just unwrap
?
This one might require the break
on None
if I'm understanding it correctly, but the others we could just unwrap
instead of is_err
, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, unwrap
ing on send errors
/// Agent supports only | ||
/// [`LayerTcpSteal::HttpResponse`](mirrord_protocol::tcp::LayerTcpSteal::HttpResponse) | ||
#[default] | ||
Basic, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is basically the legacy version right? Do we know if there are mirrord-agents in the wild still using this? Considering that HTTP_FRAMED_VERSION
has been a thing since the end of 2023, I feel like we could drop this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a reason to drop this now. Like, this introduces ~100 lines of code in the whole project maybe 🤷♂️
.into_data() | ||
.map(|bytes| Self::Data(bytes.into())) | ||
.or_else(|frame| frame.into_trailers().map(Self::Trailers)) | ||
.expect("malformed frame type") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dropping if else
AND fixing a typo! This is like a hidden sweet you put here just for me.
let now = Instant::now(); | ||
let mut min_last_used = None; | ||
let notified = { | ||
let mut guard = clients.lock().unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe expect
here with a message that's more user friendly than the usual rust unwrap
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
std::mem::drop(store); | ||
|
||
loop { | ||
let Some(clients) = clients.upgrade() else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let Some(clients) = clients.upgrade() else { | |
// A failed `upgrade` here we have no more clients, so we can stop this task. | |
let Some(clients) = clients.upgrade() else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comments
let mut guard = clients.lock().unwrap(); | ||
let notified = notify.notified(); | ||
guard.retain(|client| { | ||
if client.last_used + idle_client_timeout > now { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if client.last_used + idle_client_timeout > now { | |
// We keep only the clients that have not gone beyond their `idle_client_timeout`, and update their `last_used` time. | |
if client.last_used + idle_client_timeout > now { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comments
} | ||
} | ||
|
||
pub struct Closed<T: BackgroundTask>(Sender<T::MessageOut>); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Docs.
Should probably add why we're using this, and how it helps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this stuff to be pub
?
pub struct Closed<T: BackgroundTask>(Sender<T::MessageOut>); | ||
|
||
impl<T: BackgroundTask> Closed<T> { | ||
pub async fn cancel_on_close<F: Future>(&self, future: F) -> Option<F::Output> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Docs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this to be pub
?
So...
This started as a small refactor in order with some hope to fix the hanging requests issue. I could not find any bug that could cause the problem and only later I found out that there was no problem. The repro application I was using was handling only one connection at a time and the first HTTP connection was not closed by the k8s proxy (most probably to be reused later). And so the second request would hang on intproxy's HTTP handshake attempt. Since we want to be user friendly, this PR introduces reusing local HTTP connections, which solves the problem. However, since it started as a refactor, it's big. Sorry.
Changes summarized:
StreamingBody
was moved frommirrord-protocol
tomirrord-intproxy
without any notable changes. There was no need for it to be in the protocol crate.BodyExt
trait inmirrord-protocol
was renamed toBatchedBody
. The only notable change is moving from customFuture
implementation (FramesFut
) to usingnow_or_never
. I was afraid of it in the past, now I'm not. I tested this with heavy load and did not detect any difference. Usingnow_or_never
simplifies things, because some code no longer needs to be asyncHttpRequest<StreamingBody>
type, to remove ugly generics and match expressions.HttpRequestFallback
enum, along with lots of conversion code, was removed frommirrord-protocol
.HttpResponseFallback
type was moved to the agent without any notable changes. There was no need for it to be in the protocol crate.ReversePortForwarder
and its tests were fixed. It was never streaming responses' bodies, becauseIncomingProxy
was not notified about agent protocol version. This change is not related to the issue, but the problem came up in the CI.h2::Error::is_reset
check and the dependency onh2
completely. Instead of checking if the HTTP error is transient, we check if it's not transient (usinghyper::Error
methods, e.ghyper::Error::is_user
). I think it's simpler and safer, since retrying a request is not harmful.BoundTcpSocket
struct to the incoming proxy, which wraps logic for binding the same interface as user socket. Now we can actually see the bound socket address in tracing.ClientStore
struct that caches unused local HTTP connection and cleans them up after some timeout.HttpGatewayTask
insideIncomingProxy
. To reuse connections, they share aClientStore
instance.IncomingProxy
. Each connection is handled by its ownTcpProxyTask
. The task knows whether the connection is stolen or mirrored. If it's mirrored, the data is no longer being sent to the mainIncomingProxy
task, it is immediately discarded. If it's stolen, the connection is no longer artificially kept alive until silent for a second (this mechanism makes sense only with the mirror mode, can introduce weird behavior in steal mode).Interceptor
task removed completely, now we have two separate tasks:HttpGatewayTask
andTcpProxyTask
MetadataStore
was moved to its own module without any notable changesIncomingProxy
now optimizes HTTP response variant. If the whole response body is available when the response head is received, we no longer send the chunked response variant. Instead we respond with the framed variant. This allows us to use only onemirrord_protocol
message.IncomingProxy
now does subscription checks when receiving a new connection/request. If we receive a connection/request on a remote port that we no longer subscribe, we unsubscribe immediately, without attempting to connect to the user application.IncomingProxy
, e.g added time spent on polling response frames