Skip to content

set of fixes/improvements for http proxy #17998

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions ydb/library/actors/http/http.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,10 @@ void THttpParser<THttpResponse>::ConnectionClosed() {
THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponseString(TStringBuf data) {
THttpParser<THttpResponse> parser(data);
THeadersBuilder headers(parser.Headers);
if (!Endpoint->WorkerName.empty()) {
headers.Set("X-Worker-Name", Endpoint->WorkerName);
if (!headers.Has("X-Worker-Name")) {
if (!Endpoint->WorkerName.empty()) {
headers.Set("X-Worker-Name", Endpoint->WorkerName);
}
}
THttpOutgoingResponsePtr response = new THttpOutgoingResponse(this);
response->InitResponse(parser.Protocol, parser.Version, parser.Status, parser.Message);
Expand Down Expand Up @@ -604,8 +606,10 @@ THttpIncomingResponsePtr THttpIncomingResponse::Duplicate(THttpOutgoingRequestPt

THttpOutgoingResponsePtr THttpOutgoingResponse::Duplicate(THttpIncomingRequestPtr request) {
THeadersBuilder headers(Headers);
if (!request->Endpoint->WorkerName.empty()) {
headers.Set("X-Worker-Name", request->Endpoint->WorkerName);
if (!headers.Has("X-Worker-Name")) {
if (!request->Endpoint->WorkerName.empty()) {
headers.Set("X-Worker-Name", request->Endpoint->WorkerName);
}
}
THttpOutgoingResponsePtr response = new THttpOutgoingResponse(request);
response->InitResponse(Protocol, Version, Status, Message);
Expand Down Expand Up @@ -906,6 +910,12 @@ THeadersBuilder::THeadersBuilder(const THeadersBuilder& builder) {
}
}

THeadersBuilder::THeadersBuilder(std::initializer_list<std::pair<TString, TString>> headers) {
for (const auto& pr : headers) {
Set(pr.first, pr.second);
}
}

void THeadersBuilder::Set(TStringBuf name, TStringBuf data) {
Data.emplace_back(name, data);
Headers[Data.back().first] = Data.back().second;
Expand Down
1 change: 1 addition & 0 deletions ydb/library/actors/http/http.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ struct THeadersBuilder : THeaders {
THeadersBuilder();
THeadersBuilder(TStringBuf headers);
THeadersBuilder(const THeadersBuilder& builder);
THeadersBuilder(std::initializer_list<std::pair<TString, TString>> headers);
void Set(TStringBuf name, TStringBuf data);
void Erase(TStringBuf name);
};
Expand Down
16 changes: 10 additions & 6 deletions ydb/library/actors/http/http_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ class THttpProxy : public NActors::TActorBootstrapped<THttpProxy>, public THttpC
return listeningSocket;
}

IActor* AddOutgoingConnection(bool secure) {
IActor* connectionSocket = CreateOutgoingConnectionActor(SelfId(), secure);
IActor* AddOutgoingConnection(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr& event) {
IActor* connectionSocket = CreateOutgoingConnectionActor(SelfId(), event);
TActorId connectionId = Register(connectionSocket);
ALOG_DEBUG(HttpLog, "Connection created " << connectionId);
Connections.emplace(connectionId);
Expand Down Expand Up @@ -96,6 +96,12 @@ class THttpProxy : public NActors::TActorBootstrapped<THttpProxy>, public THttpC
ALOG_ERROR(HttpLog, "Event TEvHttpOutgoingResponse shouldn't be in proxy, it should go to the http connection directly");
}

template<typename TEventType>
TAutoPtr<NActors::IEventHandle> Forward(const TActorId& dest, TAutoPtr<NActors::TEventHandle<TEventType>>&& event) {
auto self(SelfId());
return new IEventHandle(dest, event->Sender, event->Release().Release(), event->Flags, event->Cookie, &self, std::move(event->TraceId));
}

void Handle(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr& event) {
if (event->Get()->AllowConnectionReuse) {
auto destination = event->Get()->Request->GetDestination();
Expand All @@ -104,15 +110,13 @@ class THttpProxy : public NActors::TActorBootstrapped<THttpProxy>, public THttpC
TActorId availableConnection = itAvailableConnection->second;
ALOG_DEBUG(HttpLog, "Reusing connection " << availableConnection << " for destination " << destination);
AvailableConnections.erase(itAvailableConnection);
Send(event->Forward(availableConnection));
Send(Forward(availableConnection, std::move(event)));
return;
} else {
ALOG_DEBUG(HttpLog, "Creating a new connection for destination " << destination);
}
}
bool secure(event->Get()->Request->Secure);
NActors::IActor* actor = AddOutgoingConnection(secure);
Send(event->Forward(actor->SelfId()));
AddOutgoingConnection(event);
}

void Handle(TEvHttpProxy::TEvAddListeningPort::TPtr& event) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/actors/http/http_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ struct TPrivateEndpointInfo : THttpEndpointInfo {

NActors::IActor* CreateHttpProxy(std::weak_ptr<NMonitoring::IMetricFactory> registry = NMonitoring::TMetricRegistry::SharedInstance());
NActors::IActor* CreateHttpAcceptorActor(const TActorId& owner);
NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, bool secure);
NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, TEvHttpProxy::TEvHttpOutgoingRequest::TPtr& event);
NActors::IActor* CreateIncomingConnectionActor(
std::shared_ptr<TPrivateEndpointInfo> endpoint,
TIntrusivePtr<TSocketDescriptor> socket,
Expand Down
120 changes: 82 additions & 38 deletions ydb/library/actors/http/http_proxy_outgoing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
namespace NHttp {

template <typename TSocketImpl>
class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor<TSocketImpl>>, public TSocketImpl, virtual public THttpConfig {
class TOutgoingConnectionActor : public NActors::TActorBootstrapped<TOutgoingConnectionActor<TSocketImpl>>, public TSocketImpl, virtual public THttpConfig {
public:
using TBase = NActors::TActor<TOutgoingConnectionActor<TSocketImpl>>;
using TBase = NActors::TActorBootstrapped<TOutgoingConnectionActor<TSocketImpl>>;
using TSelf = TOutgoingConnectionActor<TSocketImpl>;
using TBase::Become;
using TBase::Send;
using TBase::Schedule;
using TBase::SelfId;
Expand All @@ -23,14 +24,18 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
bool AllowConnectionReuse = false;
NActors::TPollerToken::TPtr PollerToken;

TOutgoingConnectionActor(const TActorId& owner)
: TBase(&TSelf::StateWaiting)
, Owner(owner)
TOutgoingConnectionActor(const TActorId& owner, TEvHttpProxy::TEvHttpOutgoingRequest::TPtr& event)
: Owner(owner)
{
InitiateRequest(event);
}

static constexpr char ActorName[] = "OUT_CONNECTION_ACTOR";

void Bootstrap() {
PerformRequest();
}

void PassAway() override {
Send(Owner, new TEvHttpProxy::TEvHttpOutgoingConnectionClosed(SelfId(), Destination));
TSocketImpl::Shutdown(); // to avoid errors when connection already closed
Expand Down Expand Up @@ -60,19 +65,28 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
PassAway();
} else {
ALOG_DEBUG(HttpLog, GetSocketName() << "connection available for reuse");
CheckClose();
Send(Owner, new TEvHttpProxy::TEvHttpOutgoingConnectionAvailable(SelfId(), Destination));
}
}

void ReplyErrorAndPassAway(const TString& error) {
ALOG_ERROR(HttpLog, GetSocketName() << "connection closed with error: " << error);
if (error) {
ALOG_ERROR(HttpLog, GetSocketName() << "connection closed with error: " << error);
} else {
ALOG_DEBUG(HttpLog, GetSocketName() << "connection closed");
}
if (RequestOwner) {
Send(RequestOwner, new TEvHttpProxy::TEvHttpIncomingResponse(Request, Response, error));
if (!error && Response && !Response->IsReady()) {
Send(RequestOwner, new TEvHttpProxy::TEvHttpIncomingResponse(Request, Response, "ConnectionClosed")); // connection closed prematurely
} else {
Send(RequestOwner, new TEvHttpProxy::TEvHttpIncomingResponse(Request, Response, error));
}
RequestOwner = TActorId();
THolder<TEvHttpProxy::TEvReportSensors> sensors(BuildOutgoingRequestSensors(Request, Response));
Send(Owner, sensors.Release());
PassAway();
}
PassAway();
}

protected:
Expand All @@ -84,7 +98,7 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
}

void Connect() {
ALOG_DEBUG(HttpLog, GetSocketName() << "connecting");
ALOG_DEBUG(HttpLog, GetSocketName() << "connecting to " << Address->ToString());
TSocketImpl::Create(Address->SockAddr()->sa_family);
TSocketImpl::SetNonBlock();
TSocketImpl::SetTimeout(ConnectionTimeout);
Expand All @@ -101,6 +115,26 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
}
}

void InitiateRequest(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr& event) {
Request = std::move(event->Get()->Request);
Destination = Request->GetDestination();
TSocketImpl::SetHost(TString(Request->Host));
RequestOwner = event->Sender;
if (event->Get()->Timeout) {
ConnectionTimeout = event->Get()->Timeout;
}
AllowConnectionReuse = event->Get()->AllowConnectionReuse;
}

void PerformRequest() {
Request->Timer.Reset();
ALOG_DEBUG(HttpLog, GetSocketName() << "resolving " << TSocketImpl::Host);
Send(Owner, new TEvHttpProxy::TEvResolveHostRequest(TSocketImpl::Host));
Schedule(ConnectionTimeout, new NActors::TEvents::TEvWakeup());
LastActivity = NActors::TActivationContext::Now();
TBase::Become(&TOutgoingConnectionActor::StateResolving);
}

void FlushOutput() {
if (Request != nullptr) {
Request->Finish();
Expand All @@ -123,13 +157,38 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
}
break;
} else {
ReplyErrorAndPassAway(res == 0 ? "ConnectionClosed" : strerror(-res));
ReplyErrorAndPassAway(res == 0 ? "" : strerror(-res));
break;
}
}
}
}

void CheckClose() {
char buf[8];
for (;;) {
bool read = false, write = false;
ssize_t res = TSocketImpl::Recv(&buf, 0, read, write);
if (res > 0) {
return ReplyErrorAndPassAway("Unexpected data received");
} else if (-res == EINTR) {
continue;
} else if (-res == EAGAIN || -res == EWOULDBLOCK) {
if (PollerToken) {
if (!read && !write) {
read = true;
}
if (PollerToken->RequestNotificationAfterWouldBlock(read, write)) {
continue;
}
}
return;
} else {
return ReplyErrorAndPassAway(res == 0 ? "" : strerror(-res));
}
}
}

void PullInput() {
for (;;) {
if (Response == nullptr) {
Expand Down Expand Up @@ -165,7 +224,7 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
if (Response->IsDone() && Response->IsReady()) {
return ReplyAndPassAway();
}
return ReplyErrorAndPassAway(res == 0 ? "ConnectionClosed" : strerror(-res));
return ReplyErrorAndPassAway(res == 0 ? "" : strerror(-res));
}
}
}
Expand Down Expand Up @@ -247,20 +306,8 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
}

void HandleWaiting(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr& event) {
Request = std::move(event->Get()->Request);
Destination = Request->GetDestination();
TSocketImpl::SetHost(TString(Request->Host));
ALOG_DEBUG(HttpLog, GetSocketName() << "resolving " << TSocketImpl::Host);
Request->Timer.Reset();
RequestOwner = event->Sender;
Send(Owner, new TEvHttpProxy::TEvResolveHostRequest(TSocketImpl::Host));
if (event->Get()->Timeout) {
ConnectionTimeout = event->Get()->Timeout;
}
AllowConnectionReuse = event->Get()->AllowConnectionReuse;
Schedule(ConnectionTimeout, new NActors::TEvents::TEvWakeup());
LastActivity = NActors::TActivationContext::Now();
TBase::Become(&TOutgoingConnectionActor::StateResolving);
InitiateRequest(event);
PerformRequest();
}

void HandleConnected(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr& event) {
Expand All @@ -284,8 +331,12 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
if (event->Get()->Write && RequestOwner) {
FlushOutput();
}
if (event->Get()->Read && RequestOwner) {
PullInput();
if (event->Get()->Read) {
if (RequestOwner) {
PullInput();
} else {
CheckClose();
}
}
}

Expand All @@ -311,13 +362,6 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
}
}

STATEFN(StateWaiting) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvHttpProxy::TEvHttpOutgoingRequest, HandleWaiting);
cFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout);
}
}

STATEFN(StateResolving) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvHttpProxy::TEvResolveHostResponse, HandleResolving);
Expand Down Expand Up @@ -349,11 +393,11 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
}
};

NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, bool secure) {
if (secure) {
return new TOutgoingConnectionActor<TSecureSocketImpl>(owner);
NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, TEvHttpProxy::TEvHttpOutgoingRequest::TPtr& event) {
if (event->Get()->Request->Secure) {
return new TOutgoingConnectionActor<TSecureSocketImpl>(owner, event);
} else {
return new TOutgoingConnectionActor<TPlainSocketImpl>(owner);
return new TOutgoingConnectionActor<TPlainSocketImpl>(owner, event);
}
}

Expand Down
22 changes: 15 additions & 7 deletions ydb/library/actors/http/http_static.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ class THttpStaticContentHandler : public NActors::TActor<THttpStaticContentHandl
const TFsPath URL;
const TFsPath FilePath;
const TFsPath ResourcePath;
const TFsPath Index;
TUrlAdapter UrlAdapter;

THttpStaticContentHandler(const TString& url, const TString& filePath, const TString& resourcePath, const TString& index)
THttpStaticContentHandler(const TString& url, const TString& filePath, const TString& resourcePath, TUrlAdapter&& urlAdapter)
: TBase(&THttpStaticContentHandler::StateWork)
, URL(url)
, FilePath(filePath)
, ResourcePath(resourcePath)
, Index(index)
, UrlAdapter(std::move(urlAdapter))
{}

static constexpr char ActorName[] = "HTTP_STATIC_ACTOR";
Expand All @@ -47,13 +47,13 @@ class THttpStaticContentHandler : public NActors::TActor<THttpStaticContentHandl
Send(event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response));
return;
}
if (url.GetPath().EndsWith('/') && Index.IsDefined()) {
url /= Index;
if (UrlAdapter) {
UrlAdapter(url);
}
url = url.RelativeTo(URL);
try {
// TODO: caching?
TString contentType = mimetypeByExt(url.GetExtension().c_str());
TString contentType = mimetypeByExt(url.GetName().c_str());
TString data;
TFileStat filestat;
TFsPath resourcename(ResourcePath / url);
Expand Down Expand Up @@ -90,8 +90,16 @@ class THttpStaticContentHandler : public NActors::TActor<THttpStaticContentHandl
}
};

NActors::IActor* CreateHttpStaticContentHandler(const TString& url, const TString& filePath, const TString& resourcePath, TUrlAdapter&& urlAdapter) {
return new THttpStaticContentHandler(url, filePath, resourcePath, std::move(urlAdapter));
}

NActors::IActor* CreateHttpStaticContentHandler(const TString& url, const TString& filePath, const TString& resourcePath, const TString& index) {
return new THttpStaticContentHandler(url, filePath, resourcePath, index);
return CreateHttpStaticContentHandler(url, filePath, resourcePath, [index](TFsPath& url) {
if (url.GetPath().EndsWith('/') && index) {
url /= index;
}
});
}

}
2 changes: 2 additions & 0 deletions ydb/library/actors/http/http_static.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

namespace NHttp {

using TUrlAdapter = std::function<void(TFsPath&)>;
NActors::IActor* CreateHttpStaticContentHandler(const TString& url, const TString& filePath, const TString& resourcePath, const TString& index = TString());
NActors::IActor* CreateHttpStaticContentHandler(const TString& url, const TString& filePath, const TString& resourcePath, TUrlAdapter&& urlAdapter);

}
2 changes: 1 addition & 1 deletion ydb/library/actors/http/http_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -791,8 +791,8 @@ CRA/5XcX13GJwHHj6LCoc3sL7mt8qV9HKY2AOZ88mpObzISZxgPpdKCfjsrdm63V

NHttp::TEvHttpProxy::TEvHttpIncomingResponse* response = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingResponse>(handle);

UNIT_ASSERT_EQUAL(response->Error, "ConnectionClosed");
UNIT_ASSERT(response->Response->IsError());
UNIT_ASSERT_EQUAL(response->Error, "ConnectionClosed");
}

Y_UNIT_TEST(RequestAfter307) {
Expand Down
Loading