Skip to content

Commit a102c78

Browse files
authored
CPP-711 - Fix memory leak in WaitForHandler (#217)
1 parent 869095f commit a102c78

File tree

2 files changed

+166
-5
lines changed

2 files changed

+166
-5
lines changed
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/*
2+
Copyright (c) DataStax, Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
#include "loop_test.hpp"
18+
19+
#include "connector.hpp"
20+
#include "query_request.hpp"
21+
#include "request_handler.hpp"
22+
#include "wait_for_handler.hpp"
23+
#include "timer.hpp"
24+
25+
using namespace cass;
26+
27+
class WaitForHandlerUnitTest : public LoopTest {
28+
public:
29+
class TestWaitForHandler : public WaitForHandler {
30+
public:
31+
typedef SharedRefPtr<TestWaitForHandler> Ptr;
32+
33+
TestWaitForHandler(uint64_t max_wait_time = 2000,
34+
uint64_t retry_wait_time = 200)
35+
: WaitForHandler(RequestHandler::Ptr(
36+
Memory::allocate<RequestHandler>(
37+
QueryRequest::Ptr(Memory::allocate<QueryRequest>("")),
38+
ResponseFuture::Ptr(Memory::allocate<ResponseFuture>()))),
39+
Host::Ptr(Memory::allocate<Host>(Address())),
40+
Response::Ptr(), max_wait_time, retry_wait_time) { }
41+
42+
virtual RequestCallback::Ptr callback() = 0;
43+
44+
private:
45+
virtual bool on_set(const ChainedRequestCallback::Ptr& callback) {
46+
return false; // Never complete
47+
}
48+
};
49+
50+
class RegularQueryHandler : public TestWaitForHandler {
51+
public:
52+
virtual RequestCallback::Ptr callback() {
53+
WaitforRequestVec requests;
54+
requests.push_back(make_request("local", "SELECT * FROM system.local WHERE key='local'"));
55+
requests.push_back(make_request("peers", "SELECT * FROM system.peers"));
56+
return WaitForHandler::callback(requests);
57+
}
58+
59+
private:
60+
virtual void on_error(WaitForError code, const String& message) {
61+
EXPECT_TRUE(WAIT_FOR_ERROR_CONNECTION_CLOSED == code ||
62+
WAIT_FOR_ERROR_REQUEST_ERROR == code);
63+
}
64+
};
65+
66+
class IdempotentQueryHandler : public TestWaitForHandler {
67+
public:
68+
virtual RequestCallback::Ptr callback() {
69+
WaitforRequestVec requests;
70+
QueryRequest::Ptr local_request(Memory::allocate<QueryRequest>("SELECT * FROM system.local WHERE key='local'"));
71+
QueryRequest::Ptr peers_request(Memory::allocate<QueryRequest>("SELECT * FROM system.peers"));
72+
local_request->set_is_idempotent(true);
73+
peers_request->set_is_idempotent(true);
74+
requests.push_back(WaitForRequest("local", local_request));
75+
requests.push_back(WaitForRequest("peers", peers_request));
76+
return WaitForHandler::callback(requests);
77+
}
78+
79+
private:
80+
virtual void on_error(WaitForError code, const String& message) {
81+
EXPECT_TRUE(WAIT_FOR_ERROR_CONNECTION_CLOSED == code ||
82+
WAIT_FOR_ERROR_REQUEST_TIMEOUT == code);
83+
}
84+
};
85+
86+
void run(const TestWaitForHandler::Ptr& handler, uint64_t timeout = 0) {
87+
mockssandra::SimpleCluster cluster(simple());
88+
ASSERT_EQ(cluster.start_all(), 0);
89+
90+
handler_ = handler;
91+
timeout_ = timeout;
92+
93+
Connector::Ptr connector(
94+
Memory::allocate<Connector>(Address("127.0.0.1", PORT),
95+
PROTOCOL_VERSION,
96+
bind_callback(&WaitForHandlerUnitTest::on_connected, this)));
97+
connector->connect(loop());
98+
99+
uv_run(loop(), UV_RUN_DEFAULT);
100+
}
101+
102+
private:
103+
struct CloseConnectionHandler {
104+
CloseConnectionHandler(const Connection::Ptr& connection)
105+
: connection(connection) { }
106+
107+
void on_timeout(Timer* timer) {
108+
connection->close();
109+
Memory::deallocate(this);
110+
}
111+
112+
void start(uint64_t timeout) {
113+
timer.start(connection->loop(),
114+
timeout,
115+
bind_callback(&CloseConnectionHandler::on_timeout, this));
116+
}
117+
118+
Timer timer;
119+
Connection::Ptr connection;
120+
};
121+
122+
static void close(const Connection::Ptr& connection, uint64_t timeout) {
123+
CloseConnectionHandler* handler(Memory::allocate<CloseConnectionHandler>(connection));
124+
handler->start(timeout);
125+
}
126+
127+
void on_connected(Connector* connector) {
128+
if (connector->is_ok()) {
129+
Connection::Ptr connection(connector->release_connection());
130+
connection->write_and_flush(handler_->callback());
131+
if (timeout_ > 0) {
132+
close(connection, timeout_);
133+
} else {
134+
connection->close();
135+
}
136+
} else {
137+
ASSERT_TRUE(false) << "Connection had a failure: "
138+
<< connector->error_message();
139+
}
140+
}
141+
142+
private:
143+
TestWaitForHandler::Ptr handler_;
144+
uint64_t timeout_;
145+
};
146+
147+
TEST_F(WaitForHandlerUnitTest, CloseImmediatelyWhileWaiting) {
148+
run(TestWaitForHandler::Ptr(Memory::allocate<RegularQueryHandler>()));
149+
}
150+
151+
TEST_F(WaitForHandlerUnitTest, CloseAfterTimeoutWhileWaiting) {
152+
run(TestWaitForHandler::Ptr(Memory::allocate<RegularQueryHandler>()), 500);
153+
}
154+
155+
TEST_F(WaitForHandlerUnitTest, CloseIdempotentImmediatelyWhileWaiting) {
156+
run(TestWaitForHandler::Ptr(Memory::allocate<IdempotentQueryHandler>()));
157+
}
158+
159+
TEST_F(WaitForHandlerUnitTest, CloseIdempotentAfterTimeoutWhileWaiting) {
160+
run(TestWaitForHandler::Ptr(Memory::allocate<IdempotentQueryHandler>()), 500);
161+
}

cpp-driver/src/wait_for_handler.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ void WaitForCallback::on_chain_write(Connection* connection) {
4848
}
4949

5050
void WaitForCallback::on_chain_set() {
51-
assert(handler_->connection_);
5251
if (handler_->on_set(Ptr(this))) {
5352
handler_->finish();
5453
} else {
@@ -58,8 +57,8 @@ void WaitForCallback::on_chain_set() {
5857

5958
void WaitForCallback::on_chain_error(CassError code, const String& message) {
6059
OStringStream ss;
61-
ss << message
62-
<< "' (0x" << std::hex << std::uppercase << std::setw(8) << std::setfill('0') << code << ")";
60+
ss << message
61+
<< " (0x" << std::hex << std::uppercase << std::setw(8) << std::setfill('0') << code << ")";
6362
handler_->on_error(WaitForHandler::WAIT_FOR_ERROR_REQUEST_ERROR, ss.str());
6463
handler_->finish();
6564
}
@@ -90,7 +89,7 @@ WaitForHandler::WaitForRequest WaitForHandler::make_request(const String& key, c
9089
}
9190

9291
void WaitForHandler::start(Connection* connection) {
93-
if (!connection_) { // Start only once
92+
if (!connection_ && !is_finished_) { // Start only once
9493
inc_ref(); // Reference for the event loop
9594
connection_.reset(connection);
9695
timer_.start(connection_->loop(), max_wait_time_ms_,
@@ -99,7 +98,7 @@ void WaitForHandler::start(Connection* connection) {
9998
}
10099

101100
void WaitForHandler::schedule() {
102-
if (!is_finished_) { // Don't schedule a retry if the handler is finished.
101+
if (connection_ && !is_finished_) { // Don't schedule a retry if the handler is finished.
103102
retry_timer_.start(connection_->loop(), retry_wait_time_ms_,
104103
bind_callback(&WaitForHandler::on_retry_timeout, this));
105104
}
@@ -110,6 +109,7 @@ void WaitForHandler::finish() {
110109
is_finished_ = true;
111110
request_handler_->set_response(current_host_, response_);
112111
if (connection_) {
112+
connection_.reset();
113113
retry_timer_.stop();
114114
timer_.stop();
115115
dec_ref();

0 commit comments

Comments
 (0)