@@ -421,7 +421,7 @@ void ActiveStream::completeBackfill() {
421421 LockHolder lh (streamMutex);
422422 LOG (EXTENSION_LOG_WARNING, " %s (vb %d) Backfill complete, %d items read"
423423 " from disk, last seqno read: %ld" , producer->logHeader (), vb_,
424- itemsFromBackfill, lastReadSeqno);
424+ itemsFromBackfill, lastReadSeqno. load () );
425425 }
426426
427427 isBackfillTaskRunning.store (false );
@@ -455,7 +455,7 @@ void ActiveStream::setVBucketStateAckRecieved() {
455455 RCPtr<VBucket> vbucket = engine->getVBucket (vb_);
456456 LOG (EXTENSION_LOG_WARNING, " %s (vb %" PRIu16 " ) Vbucket marked as "
457457 " dead, last sent seqno: %" PRIu64 " , high seqno: %" PRIu64 " " ,
458- producer->logHeader (), vb_, lastSentSeqno,
458+ producer->logHeader (), vb_, lastSentSeqno. load () ,
459459 vbucket->getHighSeqno ());
460460 } else {
461461 LOG (EXTENSION_LOG_INFO, " %s (vb %" PRIu16 " ) Receive ack for set "
@@ -487,8 +487,8 @@ DcpResponse* ActiveStream::backfillPhase() {
487487 }
488488
489489 if (!isBackfillTaskRunning && readyQ.empty ()) {
490- backfillRemaining = 0 ;
491- if (lastReadSeqno >= end_seqno_) {
490+ backfillRemaining. store ( 0 , memory_order_relaxed) ;
491+ if (lastReadSeqno. load () >= end_seqno_) {
492492 endStream (END_STREAM_OK);
493493 } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
494494 transitionState (STREAM_TAKEOVER_SEND);
@@ -507,7 +507,7 @@ DcpResponse* ActiveStream::backfillPhase() {
507507}
508508
509509DcpResponse* ActiveStream::inMemoryPhase () {
510- if (lastSentSeqno >= end_seqno_) {
510+ if (lastSentSeqno. load () >= end_seqno_) {
511511 endStream (END_STREAM_OK);
512512 } else if (readyQ.empty ()) {
513513 if (nextCheckpointItem ()) {
@@ -556,9 +556,9 @@ void ActiveStream::addStats(ADD_STAT add_stat, const void *c) {
556556 snprintf (buffer, bsize, " %s:stream_%d_memory" , name_.c_str (), vb_);
557557 add_casted_stat (buffer, itemsFromMemory, add_stat, c);
558558 snprintf (buffer, bsize, " %s:stream_%d_last_sent_seqno" , name_.c_str (), vb_);
559- add_casted_stat (buffer, lastSentSeqno, add_stat, c);
559+ add_casted_stat (buffer, lastSentSeqno. load () , add_stat, c);
560560 snprintf (buffer, bsize, " %s:stream_%d_last_read_seqno" , name_.c_str (), vb_);
561- add_casted_stat (buffer, lastReadSeqno, add_stat, c);
561+ add_casted_stat (buffer, lastReadSeqno. load () , add_stat, c);
562562 snprintf (buffer, bsize, " %s:stream_%d_ready_queue_memory" , name_.c_str (), vb_);
563563 add_casted_stat (buffer, getReadyQueueMemory (), add_stat, c);
564564 snprintf (buffer, bsize, " %s:stream_%d_items_ready" , name_.c_str (), vb_);
@@ -830,7 +830,8 @@ void ActiveStream::endStream(end_stream_status_t reason) {
830830 LOG (EXTENSION_LOG_WARNING, " %s (vb %d) Stream closing, %llu items sent"
831831 " from disk, %llu items sent from memory, %llu was last seqno sent"
832832 " %s is the reason" , producer->logHeader (), vb_, itemsFromBackfill,
833- itemsFromMemory, lastSentSeqno, getEndStreamStatusStr (reason));
833+ itemsFromMemory.load (), lastSentSeqno.load (),
834+ getEndStreamStatusStr (reason));
834835 }
835836}
836837
@@ -988,12 +989,12 @@ size_t ActiveStream::getItemsRemaining() {
988989 uint64_t high_seqno = vbucket->getHighSeqno ();
989990
990991 if (end_seqno_ < high_seqno) {
991- if (end_seqno_ > lastSentSeqno) {
992- return (end_seqno_ - lastSentSeqno);
992+ if (end_seqno_ > lastSentSeqno. load () ) {
993+ return (end_seqno_ - lastSentSeqno. load () );
993994 }
994995 } else {
995- if (high_seqno > lastSentSeqno) {
996- return (high_seqno - lastSentSeqno);
996+ if (high_seqno > lastSentSeqno. load () ) {
997+ return (high_seqno - lastSentSeqno. load () );
997998 }
998999 }
9991000
0 commit comments