Skip to content

Commit fa59006

Browse files
authored
tcp: make it possible for TCP connections to be creatd by non-TCP pool) (envoyproxy#13889)
As part of envoyproxy#3431 making sure the ALPN pool can create raw TCP active clients by allowing them to be created by a generic connection pool. Signed-off-by: Alyssa Wilk <[email protected]>
1 parent 91638e6 commit fa59006

File tree

4 files changed

+50
-32
lines changed

4 files changed

+50
-32
lines changed

source/common/conn_pool/conn_pool_base.h

+1
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ class ConnPoolImplBase : protected Logger::Loggable<Logger::Id::pool> {
174174
const Network::TransportSocketOptionsSharedPtr& transportSocketOptions() {
175175
return transport_socket_options_;
176176
}
177+
bool hasPendingStreams() const { return !pending_streams_.empty(); }
177178

178179
protected:
179180
// Creates up to 3 connections, based on the prefetch ratio.

source/common/tcp/conn_pool.cc

+11-5
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
namespace Envoy {
1313
namespace Tcp {
1414

15-
ActiveTcpClient::ActiveTcpClient(ConnPoolImpl& parent, const Upstream::HostConstSharedPtr& host,
15+
ActiveTcpClient::ActiveTcpClient(Envoy::ConnectionPool::ConnPoolImplBase& parent,
16+
const Upstream::HostConstSharedPtr& host,
1617
uint64_t concurrent_stream_limit)
1718
: Envoy::ConnectionPool::ActiveClient(parent, host->cluster().maxRequestsPerConnection(),
1819
concurrent_stream_limit),
@@ -24,6 +25,12 @@ ActiveTcpClient::ActiveTcpClient(ConnPoolImpl& parent, const Upstream::HostConst
2425
connection_->addConnectionCallbacks(*this);
2526
connection_->detectEarlyCloseWhenReadDisabled(false);
2627
connection_->addReadFilter(std::make_shared<ConnReadFilter>(*this));
28+
connection_->setConnectionStats({host->cluster().stats().upstream_cx_rx_bytes_total_,
29+
host->cluster().stats().upstream_cx_rx_bytes_buffered_,
30+
host->cluster().stats().upstream_cx_tx_bytes_total_,
31+
host->cluster().stats().upstream_cx_tx_bytes_buffered_,
32+
&host->cluster().stats().bind_errors_, nullptr});
33+
2734
connection_->connect();
2835
}
2936

@@ -37,13 +44,12 @@ ActiveTcpClient::~ActiveTcpClient() {
3744
parent_.onStreamClosed(*this, true);
3845
parent_.checkForDrained();
3946
}
40-
parent_.onConnDestroyed();
4147
}
4248

4349
void ActiveTcpClient::clearCallbacks() {
44-
if (state_ == Envoy::ConnectionPool::ActiveClient::State::BUSY ||
45-
state_ == Envoy::ConnectionPool::ActiveClient::State::DRAINING) {
46-
parent_.onConnReleased(*this);
50+
if (state_ == Envoy::ConnectionPool::ActiveClient::State::BUSY && parent_.hasPendingStreams()) {
51+
auto* pool = &parent_;
52+
pool->dispatcher().post([pool]() -> void { pool->onUpstreamReady(); });
4753
}
4854
callbacks_ = nullptr;
4955
tcp_connection_data_ = nullptr;

source/common/tcp/conn_pool.h

+6-20
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ class ActiveTcpClient : public Envoy::ConnectionPool::ActiveClient {
8484
Network::ClientConnection& connection_;
8585
};
8686

87-
ActiveTcpClient(ConnPoolImpl& parent, const Upstream::HostConstSharedPtr& host,
88-
uint64_t concurrent_stream_limit);
87+
ActiveTcpClient(Envoy::ConnectionPool::ConnPoolImplBase& parent,
88+
const Upstream::HostConstSharedPtr& host, uint64_t concurrent_stream_limit);
8989
~ActiveTcpClient() override;
9090

9191
// Override the default's of Envoy::ConnectionPool::ActiveClient for class-specific functions.
@@ -106,9 +106,9 @@ class ActiveTcpClient : public Envoy::ConnectionPool::ActiveClient {
106106
close();
107107
}
108108
}
109-
void clearCallbacks();
109+
virtual void clearCallbacks();
110110

111-
ConnPoolImpl& parent_;
111+
Envoy::ConnectionPool::ConnPoolImplBase& parent_;
112112
ConnectionPool::UpstreamCallbacks* callbacks_{};
113113
Network::ClientConnectionPtr connection_;
114114
ConnectionPool::ConnectionStatePtr connection_state_;
@@ -123,11 +123,7 @@ class ConnPoolImpl : public Envoy::ConnectionPool::ConnPoolImplBase,
123123
const Network::ConnectionSocket::OptionsSharedPtr& options,
124124
Network::TransportSocketOptionsSharedPtr transport_socket_options)
125125
: Envoy::ConnectionPool::ConnPoolImplBase(host, priority, dispatcher, options,
126-
transport_socket_options),
127-
upstream_ready_cb_(dispatcher.createSchedulableCallback([this]() {
128-
upstream_ready_enabled_ = false;
129-
onUpstreamReady();
130-
})) {}
126+
transport_socket_options) {}
131127
~ConnPoolImpl() override { destructAllConnections(); }
132128

133129
void addDrainedCallback(DrainedCb cb) override { addDrainedCallbackImpl(cb); }
@@ -196,18 +192,8 @@ class ConnPoolImpl : public Envoy::ConnectionPool::ConnPoolImplBase,
196192
}
197193

198194
// These two functions exist for testing parity between old and new Tcp Connection Pools.
199-
virtual void onConnReleased(Envoy::ConnectionPool::ActiveClient& client) {
200-
if (client.state_ == Envoy::ConnectionPool::ActiveClient::State::BUSY) {
201-
if (!pending_streams_.empty() && !upstream_ready_enabled_) {
202-
upstream_ready_cb_->scheduleCallbackCurrentIteration();
203-
}
204-
}
205-
}
195+
virtual void onConnReleased(Envoy::ConnectionPool::ActiveClient&) {}
206196
virtual void onConnDestroyed() {}
207-
208-
protected:
209-
Event::SchedulableCallbackPtr upstream_ready_cb_;
210-
bool upstream_ready_enabled_{};
211197
};
212198

213199
} // namespace Tcp

test/common/tcp/conn_pool_test.cc

+32-7
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,22 @@ struct ConnPoolCallbacks : public Tcp::ConnectionPool::Callbacks {
6767
Upstream::HostDescriptionConstSharedPtr host_;
6868
};
6969

70+
class TestActiveTcpClient : public ActiveTcpClient {
71+
public:
72+
using ActiveTcpClient::ActiveTcpClient;
73+
74+
~TestActiveTcpClient() override { parent().onConnDestroyed(); }
75+
void clearCallbacks() override {
76+
if (state_ == Envoy::ConnectionPool::ActiveClient::State::BUSY ||
77+
state_ == Envoy::ConnectionPool::ActiveClient::State::DRAINING) {
78+
parent().onConnReleased(*this);
79+
}
80+
ActiveTcpClient::clearCallbacks();
81+
}
82+
83+
Envoy::Tcp::ConnPoolImpl& parent() { return *static_cast<ConnPoolImpl*>(&parent_); }
84+
};
85+
7086
/**
7187
* A wrapper around a ConnectionPoolImpl which tracks when the bridge between
7288
* the pool and the consumer of the connection is released and destroyed.
@@ -143,7 +159,13 @@ class ConnPoolBase : public Tcp::ConnectionPool::Instance {
143159
parent_.onConnReleasedForTest();
144160
}
145161

162+
Envoy::ConnectionPool::ActiveClientPtr instantiateActiveClient() override {
163+
return std::make_unique<TestActiveTcpClient>(
164+
*this, Envoy::ConnectionPool::ConnPoolImplBase::host(), 1);
165+
}
166+
146167
void onConnDestroyed() override { parent_.onConnDestroyedForTest(); }
168+
Event::PostCb post_cb_;
147169
ConnPoolBase& parent_;
148170
};
149171

@@ -202,12 +224,11 @@ void ConnPoolBase::expectEnableUpstreamReady(bool run) {
202224
if (!test_new_connection_pool_) {
203225
dynamic_cast<OriginalConnPoolImplForTest*>(conn_pool_.get())->expectEnableUpstreamReady(run);
204226
} else {
205-
if (!run) {
206-
EXPECT_CALL(*mock_upstream_ready_cb_, scheduleCallbackCurrentIteration())
207-
.Times(1)
208-
.RetiresOnSaturation();
227+
Event::PostCb& post_cb = dynamic_cast<ConnPoolImplForTest*>(conn_pool_.get())->post_cb_;
228+
if (run) {
229+
post_cb();
209230
} else {
210-
mock_upstream_ready_cb_->invokeCallback();
231+
EXPECT_CALL(mock_dispatcher_, post(_)).WillOnce(testing::SaveArg<0>(&post_cb));
211232
}
212233
}
213234
}
@@ -219,7 +240,9 @@ class TcpConnPoolImplTest : public testing::TestWithParam<bool> {
219240
public:
220241
TcpConnPoolImplTest()
221242
: test_new_connection_pool_(GetParam()),
222-
upstream_ready_cb_(new NiceMock<Event::MockSchedulableCallback>(&dispatcher_)),
243+
upstream_ready_cb_(test_new_connection_pool_
244+
? nullptr
245+
: new NiceMock<Event::MockSchedulableCallback>(&dispatcher_)),
223246
host_(Upstream::makeTestHost(cluster_, "tcp://127.0.0.1:9000")),
224247
conn_pool_(dispatcher_, host_, upstream_ready_cb_, test_new_connection_pool_) {}
225248

@@ -244,7 +267,9 @@ class TcpConnPoolImplDestructorTest : public testing::TestWithParam<bool> {
244267
public:
245268
TcpConnPoolImplDestructorTest()
246269
: test_new_connection_pool_(GetParam()),
247-
upstream_ready_cb_(new NiceMock<Event::MockSchedulableCallback>(&dispatcher_)) {
270+
upstream_ready_cb_(test_new_connection_pool_
271+
? nullptr
272+
: new NiceMock<Event::MockSchedulableCallback>(&dispatcher_)) {
248273
host_ = Upstream::makeTestHost(cluster_, "tcp://127.0.0.1:9000");
249274
if (test_new_connection_pool_) {
250275
conn_pool_ = std::make_unique<ConnPoolImpl>(

0 commit comments

Comments
 (0)