Skip to content

Commit 8488506

Browse files
authored
tcp_proxy: explicitly supporting upstream-write-first semantics in the new pool (envoyproxy#14919)
Additional Description: Both the old and the new TCP proxy pools register for "unexpected data" and disconnect when receiving it. It appears that generally the timing works out in the old pool that the connection gets assigned right away, but the new pool doesn't pass the race deterministically. Fixing this by read-disabling, so data will be associated with the downstream connection as it is associated. Risk Level: Medium (new pool is still off by default, but affects ALPN as well) Testing: new integration tests Docs Changes: n/a Release Notes: n/a Signed-off-by: Alyssa Wilk <[email protected]>
1 parent 79fdcba commit 8488506

File tree

8 files changed

+222
-4
lines changed

8 files changed

+222
-4
lines changed

source/common/conn_pool/conn_pool_base.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,8 @@ class ConnPoolImplBase : protected Logger::Loggable<Logger::Id::pool> {
197197
const char* spaces = spacesForLevel(indent_level);
198198
os << spaces << "ConnPoolImplBase " << this << DUMP_MEMBER(ready_clients_.size())
199199
<< DUMP_MEMBER(busy_clients_.size()) << DUMP_MEMBER(connecting_clients_.size())
200-
<< DUMP_MEMBER(connecting_stream_capacity_) << DUMP_MEMBER(num_active_streams_);
200+
<< DUMP_MEMBER(connecting_stream_capacity_) << DUMP_MEMBER(num_active_streams_)
201+
<< DUMP_MEMBER(pending_streams_.size());
201202
}
202203

203204
friend std::ostream& operator<<(std::ostream& os, const ConnPoolImplBase& s) {
@@ -206,7 +207,6 @@ class ConnPoolImplBase : protected Logger::Loggable<Logger::Id::pool> {
206207
}
207208

208209
protected:
209-
// Creates up to 3 connections, based on the preconnect ratio.
210210
virtual void onConnected(Envoy::ConnectionPool::ActiveClient&) {}
211211

212212
enum class ConnectionResult {

source/common/http/mixed_conn_pool.cc

+4
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ void HttpConnPoolImplMixed::onConnected(Envoy::ConnectionPool::ActiveClient& cli
5050

5151
Upstream::Host::CreateConnectionData data{std::move(tcp_client->connection_),
5252
client.real_host_description_};
53+
// As this connection comes from the tcp connection pool, it will be
54+
// read-disabled to handle TCP traffic where upstream sends data first. Undo
55+
// this as it is not necessary for HTTP/HTTPS.
56+
data.connection_->readDisable(false);
5357
data.connection_->removeConnectionCallbacks(*tcp_client);
5458
data.connection_->removeReadFilter(tcp_client->read_filter_handle_);
5559
dispatcher_.deferredDelete(client.removeFromList(owningList(client.state_)));

source/common/tcp/conn_pool.cc

+7-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ ActiveTcpClient::ActiveTcpClient(Envoy::ConnectionPool::ConnPoolImplBase& parent
2424
real_host_description_ = data.host_description_;
2525
connection_ = std::move(data.connection_);
2626
connection_->addConnectionCallbacks(*this);
27-
connection_->detectEarlyCloseWhenReadDisabled(false);
2827
read_filter_handle_ = std::make_shared<ConnReadFilter>(*this);
2928
connection_->addReadFilter(read_filter_handle_);
3029
connection_->setConnectionStats({host->cluster().stats().upstream_cx_rx_bytes_total_,
@@ -63,6 +62,13 @@ void ActiveTcpClient::clearCallbacks() {
6362
}
6463

6564
void ActiveTcpClient::onEvent(Network::ConnectionEvent event) {
65+
// If this is a newly established TCP connection, readDisable. This is to handle a race condition
66+
// for TCP for protocols like MySQL where the upstream writes first, and the data needs to be
67+
// preserved until a downstream connection is associated.
68+
// This is also necessary for prefetch to be used with such protocols.
69+
if (event == Network::ConnectionEvent::Connected) {
70+
connection_->readDisable(true);
71+
}
6672
Envoy::ConnectionPool::ActiveClient::onEvent(event);
6773
if (callbacks_) {
6874
// Do not pass the Connected event to any session which registered during onEvent above.

source/common/tcp/conn_pool.h

+17
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,21 @@ class ActiveTcpClient : public Envoy::ConnectionPool::ActiveClient {
9494
void onAboveWriteBufferHighWatermark() override { callbacks_->onAboveWriteBufferHighWatermark(); }
9595
void onBelowWriteBufferLowWatermark() override { callbacks_->onBelowWriteBufferLowWatermark(); }
9696

97+
// Undo the readDisable done in onEvent(Connected) - now that there is an associated connection,
98+
// drain any data.
99+
void readEnableIfNew() {
100+
// It is expected for Envoy use of ActiveTcpClient this function only be
101+
// called once. Other users of the TcpConnPool may recycle Tcp connections,
102+
// and this safeguards them against read-enabling too many times.
103+
if (!associated_before_) {
104+
associated_before_ = true;
105+
connection_->readDisable(false);
106+
// Also while we're at it, make sure the connection will proxy all TCP
107+
// data before picking up a FIN.
108+
connection_->detectEarlyCloseWhenReadDisabled(false);
109+
}
110+
}
111+
97112
absl::optional<Http::Protocol> protocol() const override { return {}; }
98113
void close() override { connection_->close(Network::ConnectionCloseType::NoFlush); }
99114
uint32_t numActiveStreams() const override { return callbacks_ ? 1 : 0; }
@@ -115,6 +130,7 @@ class ActiveTcpClient : public Envoy::ConnectionPool::ActiveClient {
115130
Network::ClientConnectionPtr connection_;
116131
ConnectionPool::ConnectionStatePtr connection_state_;
117132
TcpConnectionData* tcp_connection_data_{};
133+
bool associated_before_{};
118134
};
119135

120136
class ConnPoolImpl : public Envoy::ConnectionPool::ConnPoolImplBase,
@@ -180,6 +196,7 @@ class ConnPoolImpl : public Envoy::ConnectionPool::ConnPoolImplBase,
180196
void onPoolReady(Envoy::ConnectionPool::ActiveClient& client,
181197
Envoy::ConnectionPool::AttachContext& context) override {
182198
ActiveTcpClient* tcp_client = static_cast<ActiveTcpClient*>(&client);
199+
tcp_client->readEnableIfNew();
183200
auto* callbacks = typedContext<TcpAttachContext>(context).callbacks_;
184201
std::unique_ptr<Envoy::Tcp::ConnectionPool::ConnectionData> connection_data =
185202
std::make_unique<ActiveTcpClient::TcpConnectionData>(*tcp_client, *tcp_client->connection_);

test/common/tcp/conn_pool_test.cc

+1
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,7 @@ class TcpConnPoolImplDestructorTest : public Event::TestUsingSimulatedTime,
321321
EXPECT_CALL(*connection_, setConnectionStats(_));
322322
EXPECT_CALL(*connection_, streamInfo()).Times(2);
323323
EXPECT_CALL(*connection_, id()).Times(AnyNumber());
324+
EXPECT_CALL(*connection_, readDisable(_)).Times(AnyNumber());
324325

325326
connect_timer_ = new NiceMock<Event::MockTimer>(&dispatcher_);
326327
EXPECT_CALL(dispatcher_, createClientConnection_(_, _, _, _)).WillOnce(Return(connection_));

test/integration/fake_upstream.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -633,7 +633,7 @@ AssertionResult FakeUpstream::waitForRawConnection(FakeRawConnectionPtr& connect
633633

634634
return runOnDispatcherThreadAndWait([&]() {
635635
absl::MutexLock lock(&lock_);
636-
connection = std::make_unique<FakeRawConnection>(consumeConnection(), timeSystem());
636+
connection = makeRawConnection(consumeConnection(), timeSystem());
637637
connection->initialize();
638638
// Skip enableHalfClose if the connection is already disconnected.
639639
if (connection->connected()) {

test/integration/fake_upstream.h

+6
Original file line numberDiff line numberDiff line change
@@ -579,6 +579,12 @@ class FakeUpstream : Logger::Loggable<Logger::Id::testing>,
579579
return socket_->addressProvider().localAddress();
580580
}
581581

582+
virtual std::unique_ptr<FakeRawConnection>
583+
makeRawConnection(SharedConnectionWrapper& shared_connection,
584+
Event::TestTimeSystem& time_system) {
585+
return std::make_unique<FakeRawConnection>(shared_connection, time_system);
586+
}
587+
582588
// Wait for one of the upstreams to receive a connection
583589
ABSL_MUST_USE_RESULT
584590
static testing::AssertionResult

test/integration/tcp_proxy_integration_test.cc

+184
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,15 @@ using testing::NiceMock;
3232

3333
namespace Envoy {
3434

35+
std::vector<TcpProxyIntegrationTestParams> newPoolTestParams() {
36+
std::vector<TcpProxyIntegrationTestParams> ret;
37+
38+
for (auto ip_version : TestEnvironment::getIpVersionsForTest()) {
39+
ret.push_back(TcpProxyIntegrationTestParams{ip_version, false});
40+
}
41+
return ret;
42+
}
43+
3544
std::vector<TcpProxyIntegrationTestParams> getProtocolTestParams() {
3645
std::vector<TcpProxyIntegrationTestParams> ret;
3746

@@ -1297,4 +1306,179 @@ TEST_P(TcpProxySslIntegrationTest, UpstreamHalfClose) {
12971306
ASSERT_TRUE(fake_upstream_connection_->waitForHalfClose());
12981307
}
12991308

1309+
// Integration test a Mysql upstream, where the upstream sends data immediately
1310+
// after a connection is established.
1311+
class FakeMysqlUpstream : public FakeUpstream {
1312+
using FakeUpstream::FakeUpstream;
1313+
1314+
bool createNetworkFilterChain(Network::Connection& connection,
1315+
const std::vector<Network::FilterFactoryCb>& cb) override {
1316+
Buffer::OwnedImpl to_write("P");
1317+
connection.write(to_write, false);
1318+
return FakeUpstream::createNetworkFilterChain(connection, cb);
1319+
}
1320+
};
1321+
1322+
class MysqlIntegrationTest : public TcpProxyIntegrationTest {
1323+
public:
1324+
void createUpstreams() override {
1325+
for (uint32_t i = 0; i < fake_upstreams_count_; ++i) {
1326+
Network::TransportSocketFactoryPtr factory =
1327+
upstream_tls_ ? createUpstreamTlsContext()
1328+
: Network::Test::createRawBufferSocketFactory();
1329+
auto endpoint = upstream_address_fn_(i);
1330+
fake_upstreams_.emplace_back(
1331+
new FakeMysqlUpstream(std::move(factory), endpoint, upstreamConfig()));
1332+
}
1333+
}
1334+
1335+
absl::optional<uint64_t>
1336+
waitForNextUpstreamConnection(const std::vector<uint64_t>& upstream_indices,
1337+
std::chrono::milliseconds connection_wait_timeout,
1338+
FakeRawConnectionPtr& fake_upstream_connection) {
1339+
AssertionResult result = AssertionFailure();
1340+
int upstream_index = 0;
1341+
Event::TestTimeSystem::RealTimeBound bound(connection_wait_timeout);
1342+
// Loop over the upstreams until the call times out or an upstream request is received.
1343+
while (!result) {
1344+
upstream_index = upstream_index % upstream_indices.size();
1345+
result = fake_upstreams_[upstream_indices[upstream_index]]->waitForRawConnection(
1346+
fake_upstream_connection, std::chrono::milliseconds(5));
1347+
if (result) {
1348+
return upstream_index;
1349+
} else if (!bound.withinBound()) {
1350+
RELEASE_ASSERT(0, "Timed out waiting for new connection.");
1351+
break;
1352+
}
1353+
++upstream_index;
1354+
}
1355+
RELEASE_ASSERT(result, result.message());
1356+
return {};
1357+
}
1358+
1359+
void globalPreconnect() {
1360+
config_helper_.addConfigModifier(
1361+
[&](envoy::config::bootstrap::v3::Bootstrap& bootstrap) -> void {
1362+
bootstrap.mutable_static_resources()
1363+
->mutable_clusters(0)
1364+
->mutable_preconnect_policy()
1365+
->mutable_predictive_preconnect_ratio()
1366+
->set_value(1.5);
1367+
});
1368+
}
1369+
1370+
void perUpstreamPreconnect() {
1371+
config_helper_.addConfigModifier(
1372+
[&](envoy::config::bootstrap::v3::Bootstrap& bootstrap) -> void {
1373+
bootstrap.mutable_static_resources()
1374+
->mutable_clusters(0)
1375+
->mutable_preconnect_policy()
1376+
->mutable_per_upstream_preconnect_ratio()
1377+
->set_value(1.5);
1378+
});
1379+
}
1380+
1381+
void testPreconnect();
1382+
};
1383+
1384+
// This just verifies that FakeMysqlUpstream works as advertised, and the early data
1385+
// makes it to the client.
1386+
TEST_P(MysqlIntegrationTest, UpstreamWritesFirst) {
1387+
globalPreconnect();
1388+
initialize();
1389+
IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
1390+
FakeRawConnectionPtr fake_upstream_connection;
1391+
ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
1392+
1393+
tcp_client->waitForData("P", false);
1394+
1395+
ASSERT_TRUE(tcp_client->write("F"));
1396+
ASSERT_TRUE(fake_upstream_connection->waitForData(1));
1397+
1398+
ASSERT_TRUE(fake_upstream_connection->write("", true));
1399+
tcp_client->waitForHalfClose();
1400+
ASSERT_TRUE(tcp_client->write("", true));
1401+
ASSERT_TRUE(fake_upstream_connection->waitForHalfClose());
1402+
ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
1403+
}
1404+
1405+
// Make sure that with the connection read disabled, that disconnect detection
1406+
// works.
1407+
// Early close notification does not work for OSX
1408+
#if !defined(__APPLE__)
1409+
TEST_P(MysqlIntegrationTest, DisconnectDetected) {
1410+
// Switch to per-upstream preconnect.
1411+
perUpstreamPreconnect();
1412+
initialize();
1413+
IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));
1414+
FakeRawConnectionPtr fake_upstream_connection;
1415+
FakeRawConnectionPtr fake_upstream_connection1;
1416+
// The needed connection.
1417+
ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
1418+
// The prefetched connection.
1419+
ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection1));
1420+
1421+
// Close the prefetched connection.
1422+
ASSERT_TRUE(fake_upstream_connection1->close());
1423+
test_server_->waitForCounterGe("cluster.cluster_0.upstream_cx_destroy", 1);
1424+
1425+
tcp_client->close();
1426+
}
1427+
#endif
1428+
1429+
void MysqlIntegrationTest::testPreconnect() {
1430+
globalPreconnect();
1431+
enableHalfClose(false);
1432+
config_helper_.addConfigModifier([this](envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
1433+
auto* cluster = bootstrap.mutable_static_resources()->mutable_clusters(0);
1434+
auto* load_assignment = cluster->mutable_load_assignment();
1435+
load_assignment->clear_endpoints();
1436+
for (int i = 0; i < 5; ++i) {
1437+
auto locality = load_assignment->add_endpoints();
1438+
locality->add_lb_endpoints()->mutable_endpoint()->MergeFrom(
1439+
ConfigHelper::buildEndpoint(Network::Test::getLoopbackAddressString(version_)));
1440+
}
1441+
});
1442+
1443+
setUpstreamCount(5);
1444+
initialize();
1445+
int num_clients = 10;
1446+
1447+
std::vector<IntegrationTcpClientPtr> clients{10};
1448+
std::vector<FakeRawConnectionPtr> fake_connections{15};
1449+
1450+
int upstream_index = 0;
1451+
for (int i = 0; i < num_clients; ++i) {
1452+
// Start a new request.
1453+
clients[i] = makeTcpConnection(lookupPort("tcp_proxy"));
1454+
waitForNextUpstreamConnection(std::vector<uint64_t>({0, 1, 2, 3, 4}),
1455+
TestUtility::DefaultTimeout, fake_connections[upstream_index]);
1456+
++upstream_index;
1457+
1458+
// For every other connection, an extra connection should be preconnected.
1459+
if (i % 2 == 0) {
1460+
waitForNextUpstreamConnection(std::vector<uint64_t>({0, 1, 2, 3, 4}),
1461+
TestUtility::DefaultTimeout, fake_connections[upstream_index]);
1462+
++upstream_index;
1463+
}
1464+
}
1465+
EXPECT_EQ(0, test_server_->counter("cluster.cluster_0.upstream_cx_destroy")->value());
1466+
// Clean up.
1467+
for (int i = 0; i < num_clients; ++i) {
1468+
clients[i]->close();
1469+
}
1470+
}
1471+
1472+
TEST_P(MysqlIntegrationTest, Preconnect) { testPreconnect(); }
1473+
1474+
TEST_P(MysqlIntegrationTest, PreconnectWithTls) {
1475+
upstream_tls_ = true;
1476+
setUpstreamProtocol(FakeHttpConnection::Type::HTTP1);
1477+
config_helper_.configureUpstreamTls();
1478+
testPreconnect();
1479+
}
1480+
1481+
INSTANTIATE_TEST_SUITE_P(TcpProxyIntegrationTestParams, MysqlIntegrationTest,
1482+
testing::ValuesIn(newPoolTestParams()), protocolTestParamsToString);
1483+
13001484
} // namespace Envoy

0 commit comments

Comments
 (0)