Skip to content

Commit bd32509

Browse files
authored
CPP-914 Fix: WaitForHandler::on_set() called after error/timeout
1 parent d227f6e commit bd32509

File tree

2 files changed

+118
-49
lines changed

2 files changed

+118
-49
lines changed

src/wait_for_handler.cpp

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ void WaitForCallback::on_chain_write(Connection* connection) { handler_->start(c
5151
}}} // namespace datastax::internal::core
5252

5353
void WaitForCallback::on_chain_set() {
54+
if (handler_->is_finished_) return;
55+
5456
if (handler_->on_set(Ptr(this))) {
5557
handler_->finish();
5658
} else {
@@ -59,6 +61,8 @@ void WaitForCallback::on_chain_set() {
5961
}
6062

6163
void WaitForCallback::on_chain_error(CassError code, const String& message) {
64+
if (handler_->is_finished_) return;
65+
6266
OStringStream ss;
6367
ss << message << " (0x" << std::hex << std::uppercase << std::setw(8) << std::setfill('0') << code
6468
<< ")";
@@ -67,6 +71,8 @@ void WaitForCallback::on_chain_error(CassError code, const String& message) {
6771
}
6872

6973
void WaitForCallback::on_chain_timeout() {
74+
if (handler_->is_finished_) return;
75+
7076
handler_->on_error(WaitForHandler::WAIT_FOR_ERROR_REQUEST_TIMEOUT, "Request timed out");
7177
handler_->schedule();
7278
}
@@ -102,26 +108,26 @@ void WaitForHandler::start(Connection* connection) {
102108
}
103109

104110
void WaitForHandler::schedule() {
105-
if (connection_ && !is_finished_) { // Don't schedule a retry if the handler is finished.
106-
retry_timer_.start(connection_->loop(), retry_wait_time_ms_,
107-
bind_callback(&WaitForHandler::on_retry_timeout, this));
108-
}
111+
assert(!is_finished_ && "This shouldn't be called if the handler is finished");
112+
retry_timer_.start(connection_->loop(), retry_wait_time_ms_,
113+
bind_callback(&WaitForHandler::on_retry_timeout, this));
109114
}
110115

111116
void WaitForHandler::finish() {
112-
if (!is_finished_) {
113-
is_finished_ = true;
114-
request_handler_->set_response(current_host_, response_);
115-
if (connection_) {
116-
connection_.reset();
117-
retry_timer_.stop();
118-
timer_.stop();
119-
dec_ref();
120-
}
117+
assert(!is_finished_ && "This shouldn't be called more than once");
118+
is_finished_ = true;
119+
request_handler_->set_response(current_host_, response_);
120+
if (connection_) {
121+
connection_.reset();
122+
retry_timer_.stop();
123+
timer_.stop();
124+
dec_ref();
121125
}
122126
}
123127

124128
void WaitForHandler::on_retry_timeout(Timer* timer) {
129+
if (is_finished_) return;
130+
125131
if (connection_->is_closing()) {
126132
on_error(WaitForHandler::WAIT_FOR_ERROR_CONNECTION_CLOSED, "Connection closed");
127133
finish();
@@ -133,6 +139,8 @@ void WaitForHandler::on_retry_timeout(Timer* timer) {
133139
}
134140

135141
void WaitForHandler::on_timeout(Timer* timer) {
142+
if (is_finished_) return;
143+
136144
on_error(WaitForHandler::WAIT_FOR_ERROR_TIMEOUT, "Timed out");
137145
finish();
138146
}

tests/src/unit/tests/test_wait_for_handler.cpp

Lines changed: 97 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,25 @@
2222
#include "timer.hpp"
2323
#include "wait_for_handler.hpp"
2424

25+
#include <ostream>
26+
2527
using namespace datastax::internal;
2628
using namespace datastax::internal::core;
2729

30+
typedef Vector<WaitForHandler::WaitForError> Errors;
31+
32+
namespace std {
33+
34+
std::ostream& operator<<(std::ostream& os, const Errors& errors) {
35+
for (Errors::const_iterator it = errors.begin(); it != errors.end(); ++it) {
36+
if (it != errors.begin()) os << ", ";
37+
os << *it;
38+
}
39+
return os;
40+
}
41+
42+
} // namespace std
43+
2844
class WaitForHandlerUnitTest : public LoopTest {
2945
public:
3046
class TestWaitForHandler : public WaitForHandler {
@@ -35,54 +51,70 @@ class WaitForHandlerUnitTest : public LoopTest {
3551
: WaitForHandler(
3652
RequestHandler::Ptr(new RequestHandler(QueryRequest::Ptr(new QueryRequest("")),
3753
ResponseFuture::Ptr(new ResponseFuture()))),
38-
Host::Ptr(new Host(Address())), Response::Ptr(), max_wait_time, retry_wait_time) {}
39-
40-
virtual RequestCallback::Ptr callback() = 0;
54+
Host::Ptr(new Host(Address())), Response::Ptr(), max_wait_time, retry_wait_time)
55+
, count_on_set_(0)
56+
, count_on_error_(0)
57+
, is_idempotent_(false) {}
58+
59+
Ptr with_is_idempotent(bool is_idempotent) {
60+
is_idempotent_ = is_idempotent;
61+
return Ptr(this);
62+
}
4163

42-
private:
43-
virtual bool on_set(const ChainedRequestCallback::Ptr& callback) {
44-
return false; // Never complete
64+
Ptr with_expected_error(WaitForHandler::WaitForError error) {
65+
expected_.push_back(error);
66+
return Ptr(this);
4567
}
46-
};
4768

48-
class RegularQueryHandler : public TestWaitForHandler {
49-
public:
5069
virtual RequestCallback::Ptr callback() {
5170
WaitforRequestVec requests;
52-
requests.push_back(make_request("local", "SELECT * FROM system.local WHERE key='local'"));
53-
requests.push_back(make_request("peers", "SELECT * FROM system.peers"));
71+
QueryRequest::Ptr table1_request(new QueryRequest("SELECT * FROM test.table1"));
72+
QueryRequest::Ptr table2_request(new QueryRequest("SELECT * FROM test.table2"));
73+
table1_request->set_is_idempotent(is_idempotent_);
74+
table2_request->set_is_idempotent(is_idempotent_);
75+
requests.push_back(WaitForRequest("table1", table1_request));
76+
requests.push_back(WaitForRequest("table2", table2_request));
5477
return WaitForHandler::callback(requests);
5578
}
5679

57-
private:
58-
virtual void on_error(WaitForError code, const String& message) {
59-
EXPECT_TRUE(WAIT_FOR_ERROR_CONNECTION_CLOSED == code || WAIT_FOR_ERROR_REQUEST_ERROR == code);
60-
}
61-
};
80+
int count_on_set() const { return count_on_set_; }
6281

63-
class IdempotentQueryHandler : public TestWaitForHandler {
64-
public:
65-
virtual RequestCallback::Ptr callback() {
66-
WaitforRequestVec requests;
67-
QueryRequest::Ptr local_request(
68-
new QueryRequest("SELECT * FROM system.local WHERE key='local'"));
69-
QueryRequest::Ptr peers_request(new QueryRequest("SELECT * FROM system.peers"));
70-
local_request->set_is_idempotent(true);
71-
peers_request->set_is_idempotent(true);
72-
requests.push_back(WaitForRequest("local", local_request));
73-
requests.push_back(WaitForRequest("peers", peers_request));
74-
return WaitForHandler::callback(requests);
82+
protected:
83+
virtual bool on_set(const ChainedRequestCallback::Ptr& callback) {
84+
EXPECT_EQ(0, count_on_error_); // Set shouldn't be called after an error
85+
count_on_set_++;
86+
return false; // Never complete
7587
}
7688

77-
private:
7889
virtual void on_error(WaitForError code, const String& message) {
79-
EXPECT_TRUE(WAIT_FOR_ERROR_CONNECTION_CLOSED == code ||
80-
WAIT_FOR_ERROR_REQUEST_TIMEOUT == code);
90+
ASSERT_NE(0, expected_.size());
91+
bool found_expected = false;
92+
for (Errors::const_iterator it = expected_.begin(), end = expected_.end();
93+
!found_expected && it != end; ++it) {
94+
if (*it == code) {
95+
found_expected = true;
96+
}
97+
}
98+
EXPECT_TRUE(found_expected) << "Expected error codes [ " << expected_
99+
<< " ], but received error " << code;
100+
count_on_error_++;
81101
}
102+
103+
private:
104+
Errors expected_;
105+
int count_on_set_;
106+
int count_on_error_;
107+
bool is_idempotent_;
82108
};
83109

84110
void run(const TestWaitForHandler::Ptr& handler, uint64_t timeout = 0) {
85-
mockssandra::SimpleCluster cluster(simple());
111+
mockssandra::SimpleRequestHandlerBuilder builder;
112+
run(handler, builder, timeout);
113+
}
114+
115+
void run(const TestWaitForHandler::Ptr& handler,
116+
mockssandra::SimpleRequestHandlerBuilder& builder, uint64_t timeout = 0) {
117+
mockssandra::SimpleCluster cluster(builder.build());
86118
ASSERT_EQ(cluster.start_all(), 0);
87119

88120
handler_ = handler;
@@ -140,17 +172,46 @@ class WaitForHandlerUnitTest : public LoopTest {
140172
};
141173

142174
TEST_F(WaitForHandlerUnitTest, CloseImmediatelyWhileWaiting) {
143-
run(TestWaitForHandler::Ptr(new RegularQueryHandler()));
175+
run(TestWaitForHandler::Ptr(new TestWaitForHandler())
176+
->with_expected_error(WaitForHandler::WAIT_FOR_ERROR_REQUEST_ERROR)
177+
->with_expected_error(WaitForHandler::WAIT_FOR_ERROR_CONNECTION_CLOSED));
144178
}
145179

146180
TEST_F(WaitForHandlerUnitTest, CloseAfterTimeoutWhileWaiting) {
147-
run(TestWaitForHandler::Ptr(new RegularQueryHandler()), 500);
181+
run(TestWaitForHandler::Ptr(new TestWaitForHandler())
182+
->with_expected_error(WaitForHandler::WAIT_FOR_ERROR_REQUEST_ERROR)
183+
->with_expected_error(WaitForHandler::WAIT_FOR_ERROR_CONNECTION_CLOSED),
184+
500);
148185
}
149186

150187
TEST_F(WaitForHandlerUnitTest, CloseIdempotentImmediatelyWhileWaiting) {
151-
run(TestWaitForHandler::Ptr(new IdempotentQueryHandler()));
188+
run(TestWaitForHandler::Ptr(new TestWaitForHandler())
189+
->with_is_idempotent(true)
190+
->with_expected_error(WaitForHandler::WAIT_FOR_ERROR_REQUEST_TIMEOUT)
191+
->with_expected_error(WaitForHandler::WAIT_FOR_ERROR_CONNECTION_CLOSED));
152192
}
153193

154194
TEST_F(WaitForHandlerUnitTest, CloseIdempotentAfterTimeoutWhileWaiting) {
155-
run(TestWaitForHandler::Ptr(new IdempotentQueryHandler()), 500);
195+
run(TestWaitForHandler::Ptr(new TestWaitForHandler())
196+
->with_is_idempotent(true)
197+
->with_expected_error(WaitForHandler::WAIT_FOR_ERROR_REQUEST_TIMEOUT)
198+
->with_expected_error(WaitForHandler::WAIT_FOR_ERROR_CONNECTION_CLOSED),
199+
500);
200+
}
201+
202+
TEST_F(WaitForHandlerUnitTest, EnsureOnSetNotCalledAfterTimeout) {
203+
TestWaitForHandler::Ptr handler(
204+
new TestWaitForHandler(1)); // Timeout handler before query returns
205+
206+
// Make sure the query doesn't complete before the handler times out
207+
mockssandra::SimpleRequestHandlerBuilder builder;
208+
builder.on(mockssandra::OPCODE_QUERY)
209+
.system_local()
210+
.system_peers()
211+
.wait(200)
212+
.empty_rows_result(1);
213+
214+
run(handler->with_expected_error(WaitForHandler::WAIT_FOR_ERROR_TIMEOUT), builder, 500);
215+
216+
EXPECT_EQ(0, handler->count_on_set()); // Ensure on_set() never called
156217
}

0 commit comments

Comments
 (0)