Skip to content

Commit 82e7109

Browse files
authored
tcp: towards pluggable upstreams (envoyproxy#13331)
Commit Message: Refactoring TCP code to match HTTP code in preparation for pluggable TCP upstreams. Risk Level: High - fairly major refactor Testing: existing tests pass Docs Changes: n/a Release Notes: n/a Signed-off-by: Alyssa Wilk <[email protected]>
1 parent cb7691c commit 82e7109

File tree

4 files changed

+212
-104
lines changed

4 files changed

+212
-104
lines changed

source/common/tcp_proxy/tcp_proxy.cc

+30-65
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ Filter::~Filter() {
225225
access_log->log(nullptr, nullptr, nullptr, getStreamInfo());
226226
}
227227

228-
ASSERT(upstream_handle_ == nullptr);
228+
ASSERT(generic_conn_pool_ == nullptr);
229229
ASSERT(upstream_ == nullptr);
230230
}
231231

@@ -442,24 +442,17 @@ Network::FilterStatus Filter::initializeUpstreamConnection() {
442442

443443
bool Filter::maybeTunnel(const std::string& cluster_name) {
444444
if (!config_->tunnelingConfig()) {
445-
Tcp::ConnectionPool::Instance* conn_pool = cluster_manager_.tcpConnPoolForCluster(
446-
cluster_name, Upstream::ResourcePriority::Default, this);
447-
if (conn_pool) {
445+
generic_conn_pool_ =
446+
std::make_unique<TcpConnPool>(cluster_name, cluster_manager_, this, *upstream_callbacks_);
447+
if (generic_conn_pool_->valid()) {
448448
connecting_ = true;
449449
connect_attempts_++;
450-
451-
// Given this function is reentrant, make sure we only reset the upstream_handle_ if given a
452-
// valid connection handle. If newConnection fails inline it may result in attempting to
453-
// select a new host, and a recursive call to initializeUpstreamConnection. In this case the
454-
// first call to newConnection will return null and the inner call will persist.
455-
Tcp::ConnectionPool::Cancellable* handle = conn_pool->newConnection(*this);
456-
if (handle) {
457-
ASSERT(upstream_handle_.get() == nullptr);
458-
upstream_handle_ = std::make_shared<TcpConnectionHandle>(handle);
459-
}
450+
generic_conn_pool_->newStream(this);
460451
// Because we never return open connections to the pool, this either has a handle waiting on
461452
// connection completion, or onPoolFailure has been invoked. Either way, stop iteration.
462453
return true;
454+
} else {
455+
generic_conn_pool_.reset();
463456
}
464457
} else {
465458
auto* cluster = cluster_manager_.get(cluster_name);
@@ -474,28 +467,23 @@ bool Filter::maybeTunnel(const std::string& cluster_name) {
474467
"http2_protocol_options on the cluster.");
475468
return false;
476469
}
477-
Http::ConnectionPool::Instance* conn_pool = cluster_manager_.httpConnPoolForCluster(
478-
cluster_name, Upstream::ResourcePriority::Default, absl::nullopt, this);
479-
if (conn_pool) {
480-
upstream_ = std::make_unique<HttpUpstream>(*upstream_callbacks_,
481-
config_->tunnelingConfig()->hostname());
482-
HttpUpstream* http_upstream = static_cast<HttpUpstream*>(upstream_.get());
483-
Http::ConnectionPool::Cancellable* cancellable =
484-
conn_pool->newStream(http_upstream->responseDecoder(), *this);
485-
if (cancellable) {
486-
ASSERT(upstream_handle_.get() == nullptr);
487-
upstream_handle_ = std::make_shared<HttpConnectionHandle>(cancellable);
488-
}
470+
471+
generic_conn_pool_ = std::make_unique<HttpConnPool>(cluster_name, cluster_manager_, this,
472+
config_->tunnelingConfig()->hostname(),
473+
*upstream_callbacks_);
474+
if (generic_conn_pool_->valid()) {
475+
generic_conn_pool_->newStream(this);
489476
return true;
477+
} else {
478+
generic_conn_pool_.reset();
490479
}
491480
}
492481

493482
return false;
494483
}
495-
void Filter::onPoolFailure(ConnectionPool::PoolFailureReason reason,
496-
Upstream::HostDescriptionConstSharedPtr host) {
497-
upstream_handle_.reset();
498-
484+
void Filter::onGenericPoolFailure(ConnectionPool::PoolFailureReason reason,
485+
Upstream::HostDescriptionConstSharedPtr host) {
486+
generic_conn_pool_.reset();
499487
read_callbacks_->upstreamHost(host);
500488
getStreamInfo().onUpstreamHostSelected(host);
501489

@@ -518,44 +506,22 @@ void Filter::onPoolFailure(ConnectionPool::PoolFailureReason reason,
518506
}
519507
}
520508

521-
void Filter::onPoolReadyBase(Upstream::HostDescriptionConstSharedPtr& host,
522-
const Network::Address::InstanceConstSharedPtr& local_address,
523-
Ssl::ConnectionInfoConstSharedPtr ssl_info) {
524-
upstream_handle_.reset();
509+
void Filter::onGenericPoolReady(StreamInfo::StreamInfo* info,
510+
std::unique_ptr<GenericUpstream>&& upstream,
511+
Upstream::HostDescriptionConstSharedPtr& host,
512+
const Network::Address::InstanceConstSharedPtr& local_address,
513+
Ssl::ConnectionInfoConstSharedPtr ssl_info) {
514+
upstream_ = std::move(upstream);
515+
generic_conn_pool_.reset();
525516
read_callbacks_->upstreamHost(host);
526517
getStreamInfo().onUpstreamHostSelected(host);
527518
getStreamInfo().setUpstreamLocalAddress(local_address);
528519
getStreamInfo().setUpstreamSslConnection(ssl_info);
529520
onUpstreamConnection();
530521
read_callbacks_->continueReading();
531-
}
532-
533-
void Filter::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data,
534-
Upstream::HostDescriptionConstSharedPtr host) {
535-
Tcp::ConnectionPool::ConnectionData* latched_data = conn_data.get();
536-
537-
upstream_ = std::make_unique<TcpUpstream>(std::move(conn_data), *upstream_callbacks_);
538-
onPoolReadyBase(host, latched_data->connection().localAddress(),
539-
latched_data->connection().streamInfo().downstreamSslConnection());
540-
read_callbacks_->connection().streamInfo().setUpstreamFilterState(
541-
latched_data->connection().streamInfo().filterState());
542-
}
543-
544-
void Filter::onPoolFailure(ConnectionPool::PoolFailureReason failure, absl::string_view,
545-
Upstream::HostDescriptionConstSharedPtr host) {
546-
onPoolFailure(failure, host);
547-
}
548-
549-
void Filter::onPoolReady(Http::RequestEncoder& request_encoder,
550-
Upstream::HostDescriptionConstSharedPtr host,
551-
const StreamInfo::StreamInfo& info) {
552-
Http::RequestEncoder* latched_encoder = &request_encoder;
553-
HttpUpstream* http_upstream = static_cast<HttpUpstream*>(upstream_.get());
554-
http_upstream->setRequestEncoder(request_encoder,
555-
host->transportSocketFactory().implementsSecureTransport());
556-
557-
onPoolReadyBase(host, latched_encoder->getStream().connectionLocalAddress(),
558-
info.downstreamSslConnection());
522+
if (info) {
523+
read_callbacks_->connection().streamInfo().setUpstreamFilterState(info->filterState());
524+
}
559525
}
560526

561527
const Router::MetadataMatchCriteria* Filter::metadataMatchCriteria() {
@@ -624,12 +590,11 @@ void Filter::onDownstreamEvent(Network::ConnectionEvent event) {
624590
disableIdleTimer();
625591
}
626592
}
627-
if (upstream_handle_) {
593+
if (generic_conn_pool_) {
628594
if (event == Network::ConnectionEvent::LocalClose ||
629595
event == Network::ConnectionEvent::RemoteClose) {
630596
// Cancel the conn pool request and close any excess pending requests.
631-
upstream_handle_->cancel();
632-
upstream_handle_.reset();
597+
generic_conn_pool_.reset();
633598
}
634599
}
635600
}

source/common/tcp_proxy/tcp_proxy.h

+15-21
Original file line numberDiff line numberDiff line change
@@ -242,9 +242,8 @@ class PerConnectionCluster : public StreamInfo::FilterState::Object {
242242
*/
243243
class Filter : public Network::ReadFilter,
244244
public Upstream::LoadBalancerContextBase,
245-
Tcp::ConnectionPool::Callbacks,
246-
public Http::ConnectionPool::Callbacks,
247-
protected Logger::Loggable<Logger::Id::filter> {
245+
protected Logger::Loggable<Logger::Id::filter>,
246+
public GenericConnectionPoolCallbacks {
248247
public:
249248
Filter(ConfigSharedPtr config, Upstream::ClusterManager& cluster_manager);
250249
~Filter() override;
@@ -254,23 +253,13 @@ class Filter : public Network::ReadFilter,
254253
Network::FilterStatus onNewConnection() override;
255254
void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override;
256255

257-
// Tcp::ConnectionPool::Callbacks
258-
void onPoolFailure(ConnectionPool::PoolFailureReason reason,
259-
Upstream::HostDescriptionConstSharedPtr host) override;
260-
void onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data,
261-
Upstream::HostDescriptionConstSharedPtr host) override;
262-
263-
// Http::ConnectionPool::Callbacks,
264-
void onPoolFailure(ConnectionPool::PoolFailureReason reason,
265-
absl::string_view transport_failure_reason,
266-
Upstream::HostDescriptionConstSharedPtr host) override;
267-
void onPoolReady(Http::RequestEncoder& request_encoder,
268-
Upstream::HostDescriptionConstSharedPtr host,
269-
const StreamInfo::StreamInfo& info) override;
270-
271-
void onPoolReadyBase(Upstream::HostDescriptionConstSharedPtr& host,
272-
const Network::Address::InstanceConstSharedPtr& local_address,
273-
Ssl::ConnectionInfoConstSharedPtr ssl_info);
256+
// GenericConnectionPoolCallbacks
257+
void onGenericPoolReady(StreamInfo::StreamInfo* info, std::unique_ptr<GenericUpstream>&& upstream,
258+
Upstream::HostDescriptionConstSharedPtr& host,
259+
const Network::Address::InstanceConstSharedPtr& local_address,
260+
Ssl::ConnectionInfoConstSharedPtr ssl_info) override;
261+
void onGenericPoolFailure(ConnectionPool::PoolFailureReason reason,
262+
Upstream::HostDescriptionConstSharedPtr host) override;
274263

275264
// Upstream::LoadBalancerContext
276265
const Router::MetadataMatchCriteria* metadataMatchCriteria() override;
@@ -375,10 +364,15 @@ class Filter : public Network::ReadFilter,
375364
Event::TimerPtr idle_timer_;
376365
Event::TimerPtr connection_duration_timer_;
377366

378-
std::shared_ptr<ConnectionHandle> upstream_handle_;
379367
std::shared_ptr<UpstreamCallbacks> upstream_callbacks_; // shared_ptr required for passing as a
380368
// read filter.
369+
// The upstream handle (either TCP or HTTP). This is set in onGenericPoolReady and should persist
370+
// until either the upstream or downstream connection is terminated.
381371
std::unique_ptr<GenericUpstream> upstream_;
372+
// The connection pool used to set up |upstream_|.
373+
// This will be non-null from when an upstream connection is attempted until
374+
// it either succeeds or fails.
375+
std::unique_ptr<GenericConnPool> generic_conn_pool_;
382376
RouteConstSharedPtr route_;
383377
Router::MetadataMatchCriteriaConstPtr metadata_match_criteria_;
384378
Network::TransportSocketOptionsSharedPtr transport_socket_options_;

source/common/tcp_proxy/upstream.cc

+96
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#include "common/tcp_proxy/upstream.h"
22

3+
#include "envoy/upstream/cluster_manager.h"
4+
35
#include "common/http/header_map_impl.h"
46
#include "common/http/headers.h"
57
#include "common/http/utility.h"
@@ -152,5 +154,99 @@ void HttpUpstream::doneWriting() {
152154
}
153155
}
154156

157+
TcpConnPool::TcpConnPool(const std::string& cluster_name, Upstream::ClusterManager& cluster_manager,
158+
Upstream::LoadBalancerContext* context,
159+
Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks)
160+
: upstream_callbacks_(upstream_callbacks) {
161+
conn_pool_ = cluster_manager.tcpConnPoolForCluster(cluster_name,
162+
Upstream::ResourcePriority::Default, context);
163+
}
164+
165+
TcpConnPool::~TcpConnPool() {
166+
if (upstream_handle_ != nullptr) {
167+
upstream_handle_->cancel(ConnectionPool::CancelPolicy::CloseExcess);
168+
}
169+
}
170+
171+
bool TcpConnPool::valid() const { return conn_pool_ != nullptr; }
172+
173+
void TcpConnPool::newStream(GenericConnectionPoolCallbacks* callbacks) {
174+
callbacks_ = callbacks;
175+
// Given this function is reentrant, make sure we only reset the upstream_handle_ if given a
176+
// valid connection handle. If newConnection fails inline it may result in attempting to
177+
// select a new host, and a recursive call to initializeUpstreamConnection. In this case the
178+
// first call to newConnection will return null and the inner call will persist.
179+
Tcp::ConnectionPool::Cancellable* handle = conn_pool_->newConnection(*this);
180+
if (handle) {
181+
ASSERT(upstream_handle_ == nullptr);
182+
upstream_handle_ = handle;
183+
}
184+
}
185+
186+
void TcpConnPool::onPoolFailure(ConnectionPool::PoolFailureReason reason,
187+
Upstream::HostDescriptionConstSharedPtr host) {
188+
upstream_handle_ = nullptr;
189+
callbacks_->onGenericPoolFailure(reason, host);
190+
}
191+
192+
void TcpConnPool::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data,
193+
Upstream::HostDescriptionConstSharedPtr host) {
194+
upstream_handle_ = nullptr;
195+
Tcp::ConnectionPool::ConnectionData* latched_data = conn_data.get();
196+
Network::Connection& connection = conn_data->connection();
197+
198+
auto upstream = std::make_unique<TcpUpstream>(std::move(conn_data), upstream_callbacks_);
199+
callbacks_->onGenericPoolReady(&connection.streamInfo(), std::move(upstream), host,
200+
latched_data->connection().localAddress(),
201+
latched_data->connection().streamInfo().downstreamSslConnection());
202+
}
203+
204+
HttpConnPool::HttpConnPool(const std::string& cluster_name,
205+
Upstream::ClusterManager& cluster_manager,
206+
Upstream::LoadBalancerContext* context, std::string hostname,
207+
Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks)
208+
: hostname_(hostname), upstream_callbacks_(upstream_callbacks) {
209+
conn_pool_ = cluster_manager.httpConnPoolForCluster(
210+
cluster_name, Upstream::ResourcePriority::Default, absl::nullopt, context);
211+
}
212+
213+
HttpConnPool::~HttpConnPool() {
214+
if (upstream_handle_ != nullptr) {
215+
// Because HTTP connections are generally shorter lived and have a higher probability of use
216+
// before going idle, they are closed with Default rather than CloseExcess.
217+
upstream_handle_->cancel(ConnectionPool::CancelPolicy::Default);
218+
}
219+
}
220+
221+
bool HttpConnPool::valid() const { return conn_pool_ != nullptr; }
222+
223+
void HttpConnPool::newStream(GenericConnectionPoolCallbacks* callbacks) {
224+
callbacks_ = callbacks;
225+
upstream_ = std::make_unique<HttpUpstream>(upstream_callbacks_, hostname_);
226+
Tcp::ConnectionPool::Cancellable* handle =
227+
conn_pool_->newStream(upstream_->responseDecoder(), *this);
228+
if (handle != nullptr) {
229+
upstream_handle_ = handle;
230+
}
231+
}
232+
233+
void HttpConnPool::onPoolFailure(ConnectionPool::PoolFailureReason reason, absl::string_view,
234+
Upstream::HostDescriptionConstSharedPtr host) {
235+
upstream_handle_ = nullptr;
236+
callbacks_->onGenericPoolFailure(reason, host);
237+
}
238+
239+
void HttpConnPool::onPoolReady(Http::RequestEncoder& request_encoder,
240+
Upstream::HostDescriptionConstSharedPtr host,
241+
const StreamInfo::StreamInfo& info) {
242+
upstream_handle_ = nullptr;
243+
Http::RequestEncoder* latched_encoder = &request_encoder;
244+
upstream_->setRequestEncoder(request_encoder,
245+
host->transportSocketFactory().implementsSecureTransport());
246+
callbacks_->onGenericPoolReady(nullptr, std::move(upstream_), host,
247+
latched_encoder->getStream().connectionLocalAddress(),
248+
info.downstreamSslConnection());
249+
}
250+
155251
} // namespace TcpProxy
156252
} // namespace Envoy

0 commit comments

Comments
 (0)