Skip to content

Commit c06efe3

Browse files
committed
set of fixes/improvements for http proxy
1 parent 5b564f8 commit c06efe3

File tree

7 files changed

+120
-55
lines changed

7 files changed

+120
-55
lines changed

ydb/library/actors/http/http.cpp

+14-4
Original file line numberDiff line numberDiff line change
@@ -409,8 +409,10 @@ void THttpParser<THttpResponse>::ConnectionClosed() {
409409
THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponseString(TStringBuf data) {
410410
THttpParser<THttpResponse> parser(data);
411411
THeadersBuilder headers(parser.Headers);
412-
if (!Endpoint->WorkerName.empty()) {
413-
headers.Set("X-Worker-Name", Endpoint->WorkerName);
412+
if (!headers.Has("X-Worker-Name")) {
413+
if (!Endpoint->WorkerName.empty()) {
414+
headers.Set("X-Worker-Name", Endpoint->WorkerName);
415+
}
414416
}
415417
THttpOutgoingResponsePtr response = new THttpOutgoingResponse(this);
416418
response->InitResponse(parser.Protocol, parser.Version, parser.Status, parser.Message);
@@ -604,8 +606,10 @@ THttpIncomingResponsePtr THttpIncomingResponse::Duplicate(THttpOutgoingRequestPt
604606

605607
THttpOutgoingResponsePtr THttpOutgoingResponse::Duplicate(THttpIncomingRequestPtr request) {
606608
THeadersBuilder headers(Headers);
607-
if (!request->Endpoint->WorkerName.empty()) {
608-
headers.Set("X-Worker-Name", request->Endpoint->WorkerName);
609+
if (!headers.Has("X-Worker-Name")) {
610+
if (!request->Endpoint->WorkerName.empty()) {
611+
headers.Set("X-Worker-Name", request->Endpoint->WorkerName);
612+
}
609613
}
610614
THttpOutgoingResponsePtr response = new THttpOutgoingResponse(request);
611615
response->InitResponse(Protocol, Version, Status, Message);
@@ -906,6 +910,12 @@ THeadersBuilder::THeadersBuilder(const THeadersBuilder& builder) {
906910
}
907911
}
908912

913+
THeadersBuilder::THeadersBuilder(std::initializer_list<std::pair<TString, TString>> headers) {
914+
for (const auto& pr : headers) {
915+
Set(pr.first, pr.second);
916+
}
917+
}
918+
909919
void THeadersBuilder::Set(TStringBuf name, TStringBuf data) {
910920
Data.emplace_back(name, data);
911921
Headers[Data.back().first] = Data.back().second;

ydb/library/actors/http/http.h

+1
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ struct THeadersBuilder : THeaders {
128128
THeadersBuilder();
129129
THeadersBuilder(TStringBuf headers);
130130
THeadersBuilder(const THeadersBuilder& builder);
131+
THeadersBuilder(std::initializer_list<std::pair<TString, TString>> headers);
131132
void Set(TStringBuf name, TStringBuf data);
132133
void Erase(TStringBuf name);
133134
};

ydb/library/actors/http/http_proxy.cpp

+10-6
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ class THttpProxy : public NActors::TActorBootstrapped<THttpProxy>, public THttpC
1616
return listeningSocket;
1717
}
1818

19-
IActor* AddOutgoingConnection(bool secure) {
20-
IActor* connectionSocket = CreateOutgoingConnectionActor(SelfId(), secure);
19+
IActor* AddOutgoingConnection(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr& event) {
20+
IActor* connectionSocket = CreateOutgoingConnectionActor(SelfId(), event);
2121
TActorId connectionId = Register(connectionSocket);
2222
ALOG_DEBUG(HttpLog, "Connection created " << connectionId);
2323
Connections.emplace(connectionId);
@@ -96,6 +96,12 @@ class THttpProxy : public NActors::TActorBootstrapped<THttpProxy>, public THttpC
9696
ALOG_ERROR(HttpLog, "Event TEvHttpOutgoingResponse shouldn't be in proxy, it should go to the http connection directly");
9797
}
9898

99+
template<typename TEventType>
100+
TAutoPtr<NActors::IEventHandle> Forward(const TActorId& dest, TAutoPtr<NActors::TEventHandle<TEventType>>&& event) {
101+
auto self(SelfId());
102+
return new IEventHandle(dest, event->Sender, event->Release().Release(), event->Flags, event->Cookie, &self, std::move(event->TraceId));
103+
}
104+
99105
void Handle(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr& event) {
100106
if (event->Get()->AllowConnectionReuse) {
101107
auto destination = event->Get()->Request->GetDestination();
@@ -104,15 +110,13 @@ class THttpProxy : public NActors::TActorBootstrapped<THttpProxy>, public THttpC
104110
TActorId availableConnection = itAvailableConnection->second;
105111
ALOG_DEBUG(HttpLog, "Reusing connection " << availableConnection << " for destination " << destination);
106112
AvailableConnections.erase(itAvailableConnection);
107-
Send(event->Forward(availableConnection));
113+
Send(Forward(availableConnection, std::move(event)));
108114
return;
109115
} else {
110116
ALOG_DEBUG(HttpLog, "Creating a new connection for destination " << destination);
111117
}
112118
}
113-
bool secure(event->Get()->Request->Secure);
114-
NActors::IActor* actor = AddOutgoingConnection(secure);
115-
Send(event->Forward(actor->SelfId()));
119+
AddOutgoingConnection(event);
116120
}
117121

118122
void Handle(TEvHttpProxy::TEvAddListeningPort::TPtr& event) {

ydb/library/actors/http/http_proxy.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ struct TPrivateEndpointInfo : THttpEndpointInfo {
294294

295295
NActors::IActor* CreateHttpProxy(std::weak_ptr<NMonitoring::IMetricFactory> registry = NMonitoring::TMetricRegistry::SharedInstance());
296296
NActors::IActor* CreateHttpAcceptorActor(const TActorId& owner);
297-
NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, bool secure);
297+
NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, TEvHttpProxy::TEvHttpOutgoingRequest::TPtr& event);
298298
NActors::IActor* CreateIncomingConnectionActor(
299299
std::shared_ptr<TPrivateEndpointInfo> endpoint,
300300
TIntrusivePtr<TSocketDescriptor> socket,

ydb/library/actors/http/http_proxy_outgoing.cpp

+77-37
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@
44
namespace NHttp {
55

66
template <typename TSocketImpl>
7-
class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor<TSocketImpl>>, public TSocketImpl, virtual public THttpConfig {
7+
class TOutgoingConnectionActor : public NActors::TActorBootstrapped<TOutgoingConnectionActor<TSocketImpl>>, public TSocketImpl, virtual public THttpConfig {
88
public:
9-
using TBase = NActors::TActor<TOutgoingConnectionActor<TSocketImpl>>;
9+
using TBase = NActors::TActorBootstrapped<TOutgoingConnectionActor<TSocketImpl>>;
1010
using TSelf = TOutgoingConnectionActor<TSocketImpl>;
11+
using TBase::Become;
1112
using TBase::Send;
1213
using TBase::Schedule;
1314
using TBase::SelfId;
@@ -23,14 +24,18 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
2324
bool AllowConnectionReuse = false;
2425
NActors::TPollerToken::TPtr PollerToken;
2526

26-
TOutgoingConnectionActor(const TActorId& owner)
27-
: TBase(&TSelf::StateWaiting)
28-
, Owner(owner)
27+
TOutgoingConnectionActor(const TActorId& owner, TEvHttpProxy::TEvHttpOutgoingRequest::TPtr& event)
28+
: Owner(owner)
2929
{
30+
InitiateRequest(event);
3031
}
3132

3233
static constexpr char ActorName[] = "OUT_CONNECTION_ACTOR";
3334

35+
void Bootstrap() {
36+
PerformRequest();
37+
}
38+
3439
void PassAway() override {
3540
Send(Owner, new TEvHttpProxy::TEvHttpOutgoingConnectionClosed(SelfId(), Destination));
3641
TSocketImpl::Shutdown(); // to avoid errors when connection already closed
@@ -60,19 +65,24 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
6065
PassAway();
6166
} else {
6267
ALOG_DEBUG(HttpLog, GetSocketName() << "connection available for reuse");
68+
CheckClose();
6369
Send(Owner, new TEvHttpProxy::TEvHttpOutgoingConnectionAvailable(SelfId(), Destination));
6470
}
6571
}
6672

6773
void ReplyErrorAndPassAway(const TString& error) {
68-
ALOG_ERROR(HttpLog, GetSocketName() << "connection closed with error: " << error);
74+
if (error) {
75+
ALOG_ERROR(HttpLog, GetSocketName() << "connection closed with error: " << error);
76+
} else {
77+
ALOG_DEBUG(HttpLog, GetSocketName() << "connection closed");
78+
}
6979
if (RequestOwner) {
7080
Send(RequestOwner, new TEvHttpProxy::TEvHttpIncomingResponse(Request, Response, error));
7181
RequestOwner = TActorId();
7282
THolder<TEvHttpProxy::TEvReportSensors> sensors(BuildOutgoingRequestSensors(Request, Response));
7383
Send(Owner, sensors.Release());
74-
PassAway();
7584
}
85+
PassAway();
7686
}
7787

7888
protected:
@@ -84,7 +94,7 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
8494
}
8595

8696
void Connect() {
87-
ALOG_DEBUG(HttpLog, GetSocketName() << "connecting");
97+
ALOG_DEBUG(HttpLog, GetSocketName() << "connecting to " << Address->ToString());
8898
TSocketImpl::Create(Address->SockAddr()->sa_family);
8999
TSocketImpl::SetNonBlock();
90100
TSocketImpl::SetTimeout(ConnectionTimeout);
@@ -101,6 +111,26 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
101111
}
102112
}
103113

114+
void InitiateRequest(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr& event) {
115+
Request = std::move(event->Get()->Request);
116+
Destination = Request->GetDestination();
117+
TSocketImpl::SetHost(TString(Request->Host));
118+
RequestOwner = event->Sender;
119+
if (event->Get()->Timeout) {
120+
ConnectionTimeout = event->Get()->Timeout;
121+
}
122+
AllowConnectionReuse = event->Get()->AllowConnectionReuse;
123+
}
124+
125+
void PerformRequest() {
126+
Request->Timer.Reset();
127+
ALOG_DEBUG(HttpLog, GetSocketName() << "resolving " << TSocketImpl::Host);
128+
Send(Owner, new TEvHttpProxy::TEvResolveHostRequest(TSocketImpl::Host));
129+
Schedule(ConnectionTimeout, new NActors::TEvents::TEvWakeup());
130+
LastActivity = NActors::TActivationContext::Now();
131+
TBase::Become(&TOutgoingConnectionActor::StateResolving);
132+
}
133+
104134
void FlushOutput() {
105135
if (Request != nullptr) {
106136
Request->Finish();
@@ -123,13 +153,38 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
123153
}
124154
break;
125155
} else {
126-
ReplyErrorAndPassAway(res == 0 ? "ConnectionClosed" : strerror(-res));
156+
ReplyErrorAndPassAway(res == 0 ? "" : strerror(-res));
127157
break;
128158
}
129159
}
130160
}
131161
}
132162

163+
void CheckClose() {
164+
char buf[8];
165+
for (;;) {
166+
bool read = false, write = false;
167+
ssize_t res = TSocketImpl::Recv(&buf, 0, read, write);
168+
if (res > 0) {
169+
return ReplyErrorAndPassAway("Unexpected data received");
170+
} else if (-res == EINTR) {
171+
continue;
172+
} else if (-res == EAGAIN || -res == EWOULDBLOCK) {
173+
if (PollerToken) {
174+
if (!read && !write) {
175+
read = true;
176+
}
177+
if (PollerToken->RequestNotificationAfterWouldBlock(read, write)) {
178+
continue;
179+
}
180+
}
181+
return;
182+
} else {
183+
return ReplyErrorAndPassAway(res == 0 ? "" : strerror(-res));
184+
}
185+
}
186+
}
187+
133188
void PullInput() {
134189
for (;;) {
135190
if (Response == nullptr) {
@@ -165,7 +220,7 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
165220
if (Response->IsDone() && Response->IsReady()) {
166221
return ReplyAndPassAway();
167222
}
168-
return ReplyErrorAndPassAway(res == 0 ? "ConnectionClosed" : strerror(-res));
223+
return ReplyErrorAndPassAway(res == 0 ? "" : strerror(-res));
169224
}
170225
}
171226
}
@@ -247,20 +302,8 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
247302
}
248303

249304
void HandleWaiting(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr& event) {
250-
Request = std::move(event->Get()->Request);
251-
Destination = Request->GetDestination();
252-
TSocketImpl::SetHost(TString(Request->Host));
253-
ALOG_DEBUG(HttpLog, GetSocketName() << "resolving " << TSocketImpl::Host);
254-
Request->Timer.Reset();
255-
RequestOwner = event->Sender;
256-
Send(Owner, new TEvHttpProxy::TEvResolveHostRequest(TSocketImpl::Host));
257-
if (event->Get()->Timeout) {
258-
ConnectionTimeout = event->Get()->Timeout;
259-
}
260-
AllowConnectionReuse = event->Get()->AllowConnectionReuse;
261-
Schedule(ConnectionTimeout, new NActors::TEvents::TEvWakeup());
262-
LastActivity = NActors::TActivationContext::Now();
263-
TBase::Become(&TOutgoingConnectionActor::StateResolving);
305+
InitiateRequest(event);
306+
PerformRequest();
264307
}
265308

266309
void HandleConnected(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr& event) {
@@ -284,8 +327,12 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
284327
if (event->Get()->Write && RequestOwner) {
285328
FlushOutput();
286329
}
287-
if (event->Get()->Read && RequestOwner) {
288-
PullInput();
330+
if (event->Get()->Read) {
331+
if (RequestOwner) {
332+
PullInput();
333+
} else {
334+
CheckClose();
335+
}
289336
}
290337
}
291338

@@ -311,13 +358,6 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
311358
}
312359
}
313360

314-
STATEFN(StateWaiting) {
315-
switch (ev->GetTypeRewrite()) {
316-
hFunc(TEvHttpProxy::TEvHttpOutgoingRequest, HandleWaiting);
317-
cFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout);
318-
}
319-
}
320-
321361
STATEFN(StateResolving) {
322362
switch (ev->GetTypeRewrite()) {
323363
hFunc(TEvHttpProxy::TEvResolveHostResponse, HandleResolving);
@@ -349,11 +389,11 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
349389
}
350390
};
351391

352-
NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, bool secure) {
353-
if (secure) {
354-
return new TOutgoingConnectionActor<TSecureSocketImpl>(owner);
392+
NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, TEvHttpProxy::TEvHttpOutgoingRequest::TPtr& event) {
393+
if (event->Get()->Request->Secure) {
394+
return new TOutgoingConnectionActor<TSecureSocketImpl>(owner, event);
355395
} else {
356-
return new TOutgoingConnectionActor<TPlainSocketImpl>(owner);
396+
return new TOutgoingConnectionActor<TPlainSocketImpl>(owner, event);
357397
}
358398
}
359399

ydb/library/actors/http/http_static.cpp

+15-7
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@ class THttpStaticContentHandler : public NActors::TActor<THttpStaticContentHandl
1616
const TFsPath URL;
1717
const TFsPath FilePath;
1818
const TFsPath ResourcePath;
19-
const TFsPath Index;
19+
TUrlAdapter UrlAdapter;
2020

21-
THttpStaticContentHandler(const TString& url, const TString& filePath, const TString& resourcePath, const TString& index)
21+
THttpStaticContentHandler(const TString& url, const TString& filePath, const TString& resourcePath, TUrlAdapter&& urlAdapter)
2222
: TBase(&THttpStaticContentHandler::StateWork)
2323
, URL(url)
2424
, FilePath(filePath)
2525
, ResourcePath(resourcePath)
26-
, Index(index)
26+
, UrlAdapter(std::move(urlAdapter))
2727
{}
2828

2929
static constexpr char ActorName[] = "HTTP_STATIC_ACTOR";
@@ -47,13 +47,13 @@ class THttpStaticContentHandler : public NActors::TActor<THttpStaticContentHandl
4747
Send(event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response));
4848
return;
4949
}
50-
if (url.GetPath().EndsWith('/') && Index.IsDefined()) {
51-
url /= Index;
50+
if (UrlAdapter) {
51+
UrlAdapter(url);
5252
}
5353
url = url.RelativeTo(URL);
5454
try {
5555
// TODO: caching?
56-
TString contentType = mimetypeByExt(url.GetExtension().c_str());
56+
TString contentType = mimetypeByExt(url.GetName().c_str());
5757
TString data;
5858
TFileStat filestat;
5959
TFsPath resourcename(ResourcePath / url);
@@ -90,8 +90,16 @@ class THttpStaticContentHandler : public NActors::TActor<THttpStaticContentHandl
9090
}
9191
};
9292

93+
NActors::IActor* CreateHttpStaticContentHandler(const TString& url, const TString& filePath, const TString& resourcePath, TUrlAdapter&& urlAdapter) {
94+
return new THttpStaticContentHandler(url, filePath, resourcePath, std::move(urlAdapter));
95+
}
96+
9397
NActors::IActor* CreateHttpStaticContentHandler(const TString& url, const TString& filePath, const TString& resourcePath, const TString& index) {
94-
return new THttpStaticContentHandler(url, filePath, resourcePath, index);
98+
return CreateHttpStaticContentHandler(url, filePath, resourcePath, [index](TFsPath& url) {
99+
if (url.GetPath().EndsWith('/') && index) {
100+
url /= index;
101+
}
102+
});
95103
}
96104

97105
}

ydb/library/actors/http/http_static.h

+2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
namespace NHttp {
66

7+
using TUrlAdapter = std::function<void(TFsPath&)>;
78
NActors::IActor* CreateHttpStaticContentHandler(const TString& url, const TString& filePath, const TString& resourcePath, const TString& index = TString());
9+
NActors::IActor* CreateHttpStaticContentHandler(const TString& url, const TString& filePath, const TString& resourcePath, TUrlAdapter&& urlAdapter);
810

911
}

0 commit comments

Comments
 (0)