Skip to content

Commit d7474a1

Browse files
authored
set of fixes/improvements for http proxy (#17998)
1 parent 25bf032 commit d7474a1

File tree

8 files changed

+126
-57
lines changed

8 files changed

+126
-57
lines changed

ydb/library/actors/http/http.cpp

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -409,8 +409,10 @@ void THttpParser<THttpResponse>::ConnectionClosed() {
409409
THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponseString(TStringBuf data) {
410410
THttpParser<THttpResponse> parser(data);
411411
THeadersBuilder headers(parser.Headers);
412-
if (!Endpoint->WorkerName.empty()) {
413-
headers.Set("X-Worker-Name", Endpoint->WorkerName);
412+
if (!headers.Has("X-Worker-Name")) {
413+
if (!Endpoint->WorkerName.empty()) {
414+
headers.Set("X-Worker-Name", Endpoint->WorkerName);
415+
}
414416
}
415417
THttpOutgoingResponsePtr response = new THttpOutgoingResponse(this);
416418
response->InitResponse(parser.Protocol, parser.Version, parser.Status, parser.Message);
@@ -604,8 +606,10 @@ THttpIncomingResponsePtr THttpIncomingResponse::Duplicate(THttpOutgoingRequestPt
604606

605607
THttpOutgoingResponsePtr THttpOutgoingResponse::Duplicate(THttpIncomingRequestPtr request) {
606608
THeadersBuilder headers(Headers);
607-
if (!request->Endpoint->WorkerName.empty()) {
608-
headers.Set("X-Worker-Name", request->Endpoint->WorkerName);
609+
if (!headers.Has("X-Worker-Name")) {
610+
if (!request->Endpoint->WorkerName.empty()) {
611+
headers.Set("X-Worker-Name", request->Endpoint->WorkerName);
612+
}
609613
}
610614
THttpOutgoingResponsePtr response = new THttpOutgoingResponse(request);
611615
response->InitResponse(Protocol, Version, Status, Message);
@@ -906,6 +910,12 @@ THeadersBuilder::THeadersBuilder(const THeadersBuilder& builder) {
906910
}
907911
}
908912

913+
THeadersBuilder::THeadersBuilder(std::initializer_list<std::pair<TString, TString>> headers) {
914+
for (const auto& pr : headers) {
915+
Set(pr.first, pr.second);
916+
}
917+
}
918+
909919
void THeadersBuilder::Set(TStringBuf name, TStringBuf data) {
910920
Data.emplace_back(name, data);
911921
Headers[Data.back().first] = Data.back().second;

ydb/library/actors/http/http.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ struct THeadersBuilder : THeaders {
128128
THeadersBuilder();
129129
THeadersBuilder(TStringBuf headers);
130130
THeadersBuilder(const THeadersBuilder& builder);
131+
THeadersBuilder(std::initializer_list<std::pair<TString, TString>> headers);
131132
void Set(TStringBuf name, TStringBuf data);
132133
void Erase(TStringBuf name);
133134
};

ydb/library/actors/http/http_proxy.cpp

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ class THttpProxy : public NActors::TActorBootstrapped<THttpProxy>, public THttpC
1616
return listeningSocket;
1717
}
1818

19-
IActor* AddOutgoingConnection(bool secure) {
20-
IActor* connectionSocket = CreateOutgoingConnectionActor(SelfId(), secure);
19+
IActor* AddOutgoingConnection(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr& event) {
20+
IActor* connectionSocket = CreateOutgoingConnectionActor(SelfId(), event);
2121
TActorId connectionId = Register(connectionSocket);
2222
ALOG_DEBUG(HttpLog, "Connection created " << connectionId);
2323
Connections.emplace(connectionId);
@@ -96,6 +96,12 @@ class THttpProxy : public NActors::TActorBootstrapped<THttpProxy>, public THttpC
9696
ALOG_ERROR(HttpLog, "Event TEvHttpOutgoingResponse shouldn't be in proxy, it should go to the http connection directly");
9797
}
9898

99+
template<typename TEventType>
100+
TAutoPtr<NActors::IEventHandle> Forward(const TActorId& dest, TAutoPtr<NActors::TEventHandle<TEventType>>&& event) {
101+
auto self(SelfId());
102+
return new IEventHandle(dest, event->Sender, event->Release().Release(), event->Flags, event->Cookie, &self, std::move(event->TraceId));
103+
}
104+
99105
void Handle(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr& event) {
100106
if (event->Get()->AllowConnectionReuse) {
101107
auto destination = event->Get()->Request->GetDestination();
@@ -104,15 +110,13 @@ class THttpProxy : public NActors::TActorBootstrapped<THttpProxy>, public THttpC
104110
TActorId availableConnection = itAvailableConnection->second;
105111
ALOG_DEBUG(HttpLog, "Reusing connection " << availableConnection << " for destination " << destination);
106112
AvailableConnections.erase(itAvailableConnection);
107-
Send(event->Forward(availableConnection));
113+
Send(Forward(availableConnection, std::move(event)));
108114
return;
109115
} else {
110116
ALOG_DEBUG(HttpLog, "Creating a new connection for destination " << destination);
111117
}
112118
}
113-
bool secure(event->Get()->Request->Secure);
114-
NActors::IActor* actor = AddOutgoingConnection(secure);
115-
Send(event->Forward(actor->SelfId()));
119+
AddOutgoingConnection(event);
116120
}
117121

118122
void Handle(TEvHttpProxy::TEvAddListeningPort::TPtr& event) {

ydb/library/actors/http/http_proxy.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ struct TPrivateEndpointInfo : THttpEndpointInfo {
294294

295295
NActors::IActor* CreateHttpProxy(std::weak_ptr<NMonitoring::IMetricFactory> registry = NMonitoring::TMetricRegistry::SharedInstance());
296296
NActors::IActor* CreateHttpAcceptorActor(const TActorId& owner);
297-
NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, bool secure);
297+
NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, TEvHttpProxy::TEvHttpOutgoingRequest::TPtr& event);
298298
NActors::IActor* CreateIncomingConnectionActor(
299299
std::shared_ptr<TPrivateEndpointInfo> endpoint,
300300
TIntrusivePtr<TSocketDescriptor> socket,

ydb/library/actors/http/http_proxy_outgoing.cpp

Lines changed: 82 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@
44
namespace NHttp {
55

66
template <typename TSocketImpl>
7-
class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor<TSocketImpl>>, public TSocketImpl, virtual public THttpConfig {
7+
class TOutgoingConnectionActor : public NActors::TActorBootstrapped<TOutgoingConnectionActor<TSocketImpl>>, public TSocketImpl, virtual public THttpConfig {
88
public:
9-
using TBase = NActors::TActor<TOutgoingConnectionActor<TSocketImpl>>;
9+
using TBase = NActors::TActorBootstrapped<TOutgoingConnectionActor<TSocketImpl>>;
1010
using TSelf = TOutgoingConnectionActor<TSocketImpl>;
11+
using TBase::Become;
1112
using TBase::Send;
1213
using TBase::Schedule;
1314
using TBase::SelfId;
@@ -23,14 +24,18 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
2324
bool AllowConnectionReuse = false;
2425
NActors::TPollerToken::TPtr PollerToken;
2526

26-
TOutgoingConnectionActor(const TActorId& owner)
27-
: TBase(&TSelf::StateWaiting)
28-
, Owner(owner)
27+
TOutgoingConnectionActor(const TActorId& owner, TEvHttpProxy::TEvHttpOutgoingRequest::TPtr& event)
28+
: Owner(owner)
2929
{
30+
InitiateRequest(event);
3031
}
3132

3233
static constexpr char ActorName[] = "OUT_CONNECTION_ACTOR";
3334

35+
void Bootstrap() {
36+
PerformRequest();
37+
}
38+
3439
void PassAway() override {
3540
Send(Owner, new TEvHttpProxy::TEvHttpOutgoingConnectionClosed(SelfId(), Destination));
3641
TSocketImpl::Shutdown(); // to avoid errors when connection already closed
@@ -60,19 +65,28 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
6065
PassAway();
6166
} else {
6267
ALOG_DEBUG(HttpLog, GetSocketName() << "connection available for reuse");
68+
CheckClose();
6369
Send(Owner, new TEvHttpProxy::TEvHttpOutgoingConnectionAvailable(SelfId(), Destination));
6470
}
6571
}
6672

6773
void ReplyErrorAndPassAway(const TString& error) {
68-
ALOG_ERROR(HttpLog, GetSocketName() << "connection closed with error: " << error);
74+
if (error) {
75+
ALOG_ERROR(HttpLog, GetSocketName() << "connection closed with error: " << error);
76+
} else {
77+
ALOG_DEBUG(HttpLog, GetSocketName() << "connection closed");
78+
}
6979
if (RequestOwner) {
70-
Send(RequestOwner, new TEvHttpProxy::TEvHttpIncomingResponse(Request, Response, error));
80+
if (!error && Response && !Response->IsReady()) {
81+
Send(RequestOwner, new TEvHttpProxy::TEvHttpIncomingResponse(Request, Response, "ConnectionClosed")); // connection closed prematurely
82+
} else {
83+
Send(RequestOwner, new TEvHttpProxy::TEvHttpIncomingResponse(Request, Response, error));
84+
}
7185
RequestOwner = TActorId();
7286
THolder<TEvHttpProxy::TEvReportSensors> sensors(BuildOutgoingRequestSensors(Request, Response));
7387
Send(Owner, sensors.Release());
74-
PassAway();
7588
}
89+
PassAway();
7690
}
7791

7892
protected:
@@ -84,7 +98,7 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
8498
}
8599

86100
void Connect() {
87-
ALOG_DEBUG(HttpLog, GetSocketName() << "connecting");
101+
ALOG_DEBUG(HttpLog, GetSocketName() << "connecting to " << Address->ToString());
88102
TSocketImpl::Create(Address->SockAddr()->sa_family);
89103
TSocketImpl::SetNonBlock();
90104
TSocketImpl::SetTimeout(ConnectionTimeout);
@@ -101,6 +115,26 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
101115
}
102116
}
103117

118+
void InitiateRequest(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr& event) {
119+
Request = std::move(event->Get()->Request);
120+
Destination = Request->GetDestination();
121+
TSocketImpl::SetHost(TString(Request->Host));
122+
RequestOwner = event->Sender;
123+
if (event->Get()->Timeout) {
124+
ConnectionTimeout = event->Get()->Timeout;
125+
}
126+
AllowConnectionReuse = event->Get()->AllowConnectionReuse;
127+
}
128+
129+
void PerformRequest() {
130+
Request->Timer.Reset();
131+
ALOG_DEBUG(HttpLog, GetSocketName() << "resolving " << TSocketImpl::Host);
132+
Send(Owner, new TEvHttpProxy::TEvResolveHostRequest(TSocketImpl::Host));
133+
Schedule(ConnectionTimeout, new NActors::TEvents::TEvWakeup());
134+
LastActivity = NActors::TActivationContext::Now();
135+
TBase::Become(&TOutgoingConnectionActor::StateResolving);
136+
}
137+
104138
void FlushOutput() {
105139
if (Request != nullptr) {
106140
Request->Finish();
@@ -123,13 +157,38 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
123157
}
124158
break;
125159
} else {
126-
ReplyErrorAndPassAway(res == 0 ? "ConnectionClosed" : strerror(-res));
160+
ReplyErrorAndPassAway(res == 0 ? "" : strerror(-res));
127161
break;
128162
}
129163
}
130164
}
131165
}
132166

167+
void CheckClose() {
168+
char buf[8];
169+
for (;;) {
170+
bool read = false, write = false;
171+
ssize_t res = TSocketImpl::Recv(&buf, 0, read, write);
172+
if (res > 0) {
173+
return ReplyErrorAndPassAway("Unexpected data received");
174+
} else if (-res == EINTR) {
175+
continue;
176+
} else if (-res == EAGAIN || -res == EWOULDBLOCK) {
177+
if (PollerToken) {
178+
if (!read && !write) {
179+
read = true;
180+
}
181+
if (PollerToken->RequestNotificationAfterWouldBlock(read, write)) {
182+
continue;
183+
}
184+
}
185+
return;
186+
} else {
187+
return ReplyErrorAndPassAway(res == 0 ? "" : strerror(-res));
188+
}
189+
}
190+
}
191+
133192
void PullInput() {
134193
for (;;) {
135194
if (Response == nullptr) {
@@ -165,7 +224,7 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
165224
if (Response->IsDone() && Response->IsReady()) {
166225
return ReplyAndPassAway();
167226
}
168-
return ReplyErrorAndPassAway(res == 0 ? "ConnectionClosed" : strerror(-res));
227+
return ReplyErrorAndPassAway(res == 0 ? "" : strerror(-res));
169228
}
170229
}
171230
}
@@ -247,20 +306,8 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
247306
}
248307

249308
void HandleWaiting(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr& event) {
250-
Request = std::move(event->Get()->Request);
251-
Destination = Request->GetDestination();
252-
TSocketImpl::SetHost(TString(Request->Host));
253-
ALOG_DEBUG(HttpLog, GetSocketName() << "resolving " << TSocketImpl::Host);
254-
Request->Timer.Reset();
255-
RequestOwner = event->Sender;
256-
Send(Owner, new TEvHttpProxy::TEvResolveHostRequest(TSocketImpl::Host));
257-
if (event->Get()->Timeout) {
258-
ConnectionTimeout = event->Get()->Timeout;
259-
}
260-
AllowConnectionReuse = event->Get()->AllowConnectionReuse;
261-
Schedule(ConnectionTimeout, new NActors::TEvents::TEvWakeup());
262-
LastActivity = NActors::TActivationContext::Now();
263-
TBase::Become(&TOutgoingConnectionActor::StateResolving);
309+
InitiateRequest(event);
310+
PerformRequest();
264311
}
265312

266313
void HandleConnected(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr& event) {
@@ -284,8 +331,12 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
284331
if (event->Get()->Write && RequestOwner) {
285332
FlushOutput();
286333
}
287-
if (event->Get()->Read && RequestOwner) {
288-
PullInput();
334+
if (event->Get()->Read) {
335+
if (RequestOwner) {
336+
PullInput();
337+
} else {
338+
CheckClose();
339+
}
289340
}
290341
}
291342

@@ -311,13 +362,6 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
311362
}
312363
}
313364

314-
STATEFN(StateWaiting) {
315-
switch (ev->GetTypeRewrite()) {
316-
hFunc(TEvHttpProxy::TEvHttpOutgoingRequest, HandleWaiting);
317-
cFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout);
318-
}
319-
}
320-
321365
STATEFN(StateResolving) {
322366
switch (ev->GetTypeRewrite()) {
323367
hFunc(TEvHttpProxy::TEvResolveHostResponse, HandleResolving);
@@ -349,11 +393,11 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
349393
}
350394
};
351395

352-
NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, bool secure) {
353-
if (secure) {
354-
return new TOutgoingConnectionActor<TSecureSocketImpl>(owner);
396+
NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, TEvHttpProxy::TEvHttpOutgoingRequest::TPtr& event) {
397+
if (event->Get()->Request->Secure) {
398+
return new TOutgoingConnectionActor<TSecureSocketImpl>(owner, event);
355399
} else {
356-
return new TOutgoingConnectionActor<TPlainSocketImpl>(owner);
400+
return new TOutgoingConnectionActor<TPlainSocketImpl>(owner, event);
357401
}
358402
}
359403

ydb/library/actors/http/http_static.cpp

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@ class THttpStaticContentHandler : public NActors::TActor<THttpStaticContentHandl
1616
const TFsPath URL;
1717
const TFsPath FilePath;
1818
const TFsPath ResourcePath;
19-
const TFsPath Index;
19+
TUrlAdapter UrlAdapter;
2020

21-
THttpStaticContentHandler(const TString& url, const TString& filePath, const TString& resourcePath, const TString& index)
21+
THttpStaticContentHandler(const TString& url, const TString& filePath, const TString& resourcePath, TUrlAdapter&& urlAdapter)
2222
: TBase(&THttpStaticContentHandler::StateWork)
2323
, URL(url)
2424
, FilePath(filePath)
2525
, ResourcePath(resourcePath)
26-
, Index(index)
26+
, UrlAdapter(std::move(urlAdapter))
2727
{}
2828

2929
static constexpr char ActorName[] = "HTTP_STATIC_ACTOR";
@@ -47,13 +47,13 @@ class THttpStaticContentHandler : public NActors::TActor<THttpStaticContentHandl
4747
Send(event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response));
4848
return;
4949
}
50-
if (url.GetPath().EndsWith('/') && Index.IsDefined()) {
51-
url /= Index;
50+
if (UrlAdapter) {
51+
UrlAdapter(url);
5252
}
5353
url = url.RelativeTo(URL);
5454
try {
5555
// TODO: caching?
56-
TString contentType = mimetypeByExt(url.GetExtension().c_str());
56+
TString contentType = mimetypeByExt(url.GetName().c_str());
5757
TString data;
5858
TFileStat filestat;
5959
TFsPath resourcename(ResourcePath / url);
@@ -90,8 +90,16 @@ class THttpStaticContentHandler : public NActors::TActor<THttpStaticContentHandl
9090
}
9191
};
9292

93+
NActors::IActor* CreateHttpStaticContentHandler(const TString& url, const TString& filePath, const TString& resourcePath, TUrlAdapter&& urlAdapter) {
94+
return new THttpStaticContentHandler(url, filePath, resourcePath, std::move(urlAdapter));
95+
}
96+
9397
NActors::IActor* CreateHttpStaticContentHandler(const TString& url, const TString& filePath, const TString& resourcePath, const TString& index) {
94-
return new THttpStaticContentHandler(url, filePath, resourcePath, index);
98+
return CreateHttpStaticContentHandler(url, filePath, resourcePath, [index](TFsPath& url) {
99+
if (url.GetPath().EndsWith('/') && index) {
100+
url /= index;
101+
}
102+
});
95103
}
96104

97105
}

ydb/library/actors/http/http_static.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
namespace NHttp {
66

7+
using TUrlAdapter = std::function<void(TFsPath&)>;
78
NActors::IActor* CreateHttpStaticContentHandler(const TString& url, const TString& filePath, const TString& resourcePath, const TString& index = TString());
9+
NActors::IActor* CreateHttpStaticContentHandler(const TString& url, const TString& filePath, const TString& resourcePath, TUrlAdapter&& urlAdapter);
810

911
}

ydb/library/actors/http/http_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -791,8 +791,8 @@ CRA/5XcX13GJwHHj6LCoc3sL7mt8qV9HKY2AOZ88mpObzISZxgPpdKCfjsrdm63V
791791

792792
NHttp::TEvHttpProxy::TEvHttpIncomingResponse* response = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingResponse>(handle);
793793

794-
UNIT_ASSERT_EQUAL(response->Error, "ConnectionClosed");
795794
UNIT_ASSERT(response->Response->IsError());
795+
UNIT_ASSERT_EQUAL(response->Error, "ConnectionClosed");
796796
}
797797

798798
Y_UNIT_TEST(RequestAfter307) {

0 commit comments

Comments
 (0)