Skip to content

Commit 40c58bd

Browse files
daverigbychiyoung
authored andcommitted
MB-19252: Fix data race on Stream::readyQueueMemory
As detected by TSan: WARNING: ThreadSanitizer: data race (pid=17244) Read of size 8 at 0x7d480000b370 by main thread (mutexes: write M24165, write M969, read M24121): #0 Stream::getReadyQueueMemory() /home/daver/repos/couchbase/server/ep-engine/src/dcp-stream.cc:234 (ep.so+0x00000028f51e) #1 ActiveStream::addStats(void (*)(char const*, unsigned short, char const*, unsigned int, void const*), void const*) /home/daver/repos/couchbase/server/ep-engine/src/dcp-stream.cc:563 (ep.so+0x00000029452f) #2 DcpProducer::addStats(void (*)(char const*, unsigned short, char const*, unsigned int, void const*), void const*) /home/daver/repos/couchbase/server/ep-engine/src/dcp-producer.cc:551 (ep.so+0x00000027f1a0) #3 ConnStatBuilder::operator()(SingleThreadedRCPtr<ConnHandler>&) /home/daver/repos/couchbase/server/ep-engine/src/ep_engine.cc:3696 (ep.so+0x000000182d54) Previous write of size 8 at 0x7d480000b370 by thread T16 (mutexes: write M24143): #0 Stream::pushToReadyQ(DcpResponse*) /home/daver/repos/couchbase/server/ep-engine/src/dcp-stream.cc:211 (ep.so+0x00000028f4a6) #1 ActiveStream::backfillReceived(Item*) /home/daver/repos/couchbase/server/ep-engine/src/dcp-stream.cc:407 (ep.so+0x00000028d6e5) #2 CacheCallback::callback(CacheLookup&) /home/daver/repos/couchbase/server/ep-engine/src/dcp-stream.cc:87 (ep.so+0x00000028d4b3) #3 CouchKVStore::recordDbDump(_db*, _docinfo*, void*) /home/daver/repos/couchbase/server/ep-engine/src/couch-kvstore/couch-kvstore.cc:1563 (ep.so+0x00000031dec5) See also: http://review.couchbase.org/54314 which originally fixed this issue in watson; however it also fixed a couple of other issues in the same patch. Change-Id: Iae6a34403394e54c9d7213a7c2703be761e7dc0f Reviewed-on: http://review.couchbase.org/62972 Well-Formed: buildbot <[email protected]> Reviewed-by: Will Gardner <[email protected]> Tested-by: buildbot <[email protected]>
1 parent be344d1 commit 40c58bd

File tree

2 files changed

+12
-8
lines changed

2 files changed

+12
-8
lines changed

src/dcp-stream.cc

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,8 @@ void Stream::pushToReadyQ(DcpResponse* resp)
208208
{
209209
if (resp) {
210210
readyQ.push(resp);
211-
readyQueueMemory += resp->getMessageSize();
211+
readyQueueMemory.fetch_add(resp->getMessageSize(),
212+
memory_order_relaxed);
212213
}
213214
}
214215

@@ -218,20 +219,20 @@ void Stream::popFromReadyQ(void)
218219
uint32_t respSize = readyQ.front()->getMessageSize();
219220
readyQ.pop();
220221
/* Decrement the readyQ size */
221-
if ((readyQueueMemory - respSize) <= readyQueueMemory) {
222-
readyQueueMemory -= respSize;
222+
if (respSize <= readyQueueMemory.load(memory_order_relaxed)) {
223+
readyQueueMemory.fetch_sub(respSize, memory_order_relaxed);
223224
} else {
224225
LOG(EXTENSION_LOG_DEBUG, "readyQ size for stream %s (vb %d)"
225226
"underflow, likely wrong stat calculation! curr size: %llu;"
226-
"new size: %d", name_.c_str(), getVBucket(), readyQueueMemory,
227+
"new size: %d", name_.c_str(), getVBucket(), readyQueueMemory.load(),
227228
respSize);
228-
readyQueueMemory = 0;
229+
readyQueueMemory.store(0, memory_order_relaxed);
229230
}
230231
}
231232
}
232233

233234
uint64_t Stream::getReadyQueueMemory() {
234-
return readyQueueMemory;
235+
return readyQueueMemory.load(memory_order_relaxed);
235236
}
236237

237238
const char * Stream::stateName(stream_state_t st) const {

src/dcp-stream.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,11 @@ class Stream : public RCValue {
154154
const static uint64_t dcpMaxSeqno;
155155

156156
private:
157-
/* This tracks the memory occupied by elements in the readyQ */
158-
uint64_t readyQueueMemory;
157+
/* readyQueueMemory tracks the memory occupied by elements
158+
* in the readyQ. It is an atomic because otherwise
159+
getReadyQueueMemory would need to acquire streamMutex.
160+
*/
161+
AtomicValue <uint64_t> readyQueueMemory;
159162
};
160163

161164
typedef RCPtr<Stream> stream_t;

0 commit comments

Comments
 (0)