@@ -1046,7 +1046,7 @@ uint32_t PassiveStream::setDead_UNLOCKED(end_stream_status_t status,
10461046 LOG (logLevel, " %s (vb %" PRId16 " ) Setting stream to dead"
10471047 " state, last_seqno is %" PRIu64 " , totalBytes is %" PRIu32 " ,"
10481048 " droppedBytes is %" PRIu32 " , status is %s" ,
1049- consumer->logHeader (), vb_, last_seqno, totalBytes,
1049+ consumer->logHeader (), vb_, last_seqno. load () , totalBytes,
10501050 unackedBytes, getEndStreamStatusStr (status));
10511051 return totalBytes;
10521052}
@@ -1094,7 +1094,7 @@ void PassiveStream::reconnectStream(RCPtr<VBucket> &vb,
10941094 start_seqno, end_seqno_, snap_start_seqno_, snap_end_seqno_);
10951095
10961096 LockHolder lh (streamMutex);
1097- last_seqno = start_seqno;
1097+ last_seqno. store ( start_seqno) ;
10981098 pushToReadyQ (new StreamRequest (vb_, new_opaque, flags_, start_seqno,
10991099 end_seqno_, vb_uuid_, snap_start_seqno_,
11001100 snap_end_seqno_));
@@ -1124,30 +1124,30 @@ ENGINE_ERROR_CODE PassiveStream::messageReceived(DcpResponse* resp) {
11241124 {
11251125 MutationResponse* m = static_cast <MutationResponse*>(resp);
11261126 uint64_t bySeqno = m->getBySeqno ();
1127- if (bySeqno <= last_seqno) {
1127+ if (bySeqno <= last_seqno. load () ) {
11281128 LOG (EXTENSION_LOG_WARNING, " %s (vb %d) Erroneous (out of "
11291129 " sequence) mutation received, with opaque: %" PRIu32 " , its "
11301130 " seqno (%" PRIu64 " ) is not greater than last received seqno "
11311131 " (%" PRIu64 " ); Dropping mutation!" , consumer->logHeader (),
1132- vb_, opaque_, bySeqno, last_seqno);
1132+ vb_, opaque_, bySeqno, last_seqno. load () );
11331133 delete m;
11341134 return ENGINE_ERANGE;
11351135 }
1136- last_seqno = bySeqno;
1136+ last_seqno. store ( bySeqno) ;
11371137 break ;
11381138 }
11391139 case DCP_SNAPSHOT_MARKER:
11401140 {
11411141 SnapshotMarker* s = static_cast <SnapshotMarker*>(resp);
11421142 uint64_t snapStart = s->getStartSeqno ();
11431143 uint64_t snapEnd = s->getEndSeqno ();
1144- if (snapStart < last_seqno && snapEnd <= last_seqno) {
1144+ if (snapStart < last_seqno. load () && snapEnd <= last_seqno. load () ) {
11451145 LOG (EXTENSION_LOG_WARNING, " %s (vb %d) Erroneous snapshot "
11461146 " marker received, with opaque: %" PRIu32 " , its start "
11471147 " (%" PRIu64 " ), and end (%" PRIu64 " ) are less than last "
11481148 " received seqno (%" PRIu64 " ); Dropping marker!" ,
11491149 consumer->logHeader (), vb_, opaque_, snapStart, snapEnd,
1150- last_seqno);
1150+ last_seqno. load () );
11511151 delete s;
11521152 return ENGINE_ERANGE;
11531153 }
@@ -1281,12 +1281,12 @@ ENGINE_ERROR_CODE PassiveStream::processMutation(MutationResponse* mutation) {
12811281 return ENGINE_NOT_MY_VBUCKET;
12821282 }
12831283
1284- if (mutation->getBySeqno () > cur_snapshot_end) {
1284+ if (mutation->getBySeqno () > cur_snapshot_end. load () ) {
12851285 LOG (EXTENSION_LOG_WARNING, " %s (vb %d) Erroneous mutation [sequence "
12861286 " number (%" PRIu64 " ) greater than current snapshot end seqno "
12871287 " (%" PRIu64 " )] being processed; Dropping the mutation!" ,
12881288 consumer->logHeader (), vb_, mutation->getBySeqno (),
1289- cur_snapshot_end);
1289+ cur_snapshot_end. load () );
12901290 delete mutation;
12911291 return ENGINE_ERANGE;
12921292 }
@@ -1328,12 +1328,12 @@ ENGINE_ERROR_CODE PassiveStream::processDeletion(MutationResponse* deletion) {
13281328 return ENGINE_NOT_MY_VBUCKET;
13291329 }
13301330
1331- if (deletion->getBySeqno () > cur_snapshot_end) {
1331+ if (deletion->getBySeqno () > cur_snapshot_end. load () ) {
13321332 LOG (EXTENSION_LOG_WARNING, " %s (vb %d) Erroneous deletion [sequence "
13331333 " number (%" PRIu64 " ) greater than current snapshot end seqno "
13341334 " (%" PRIu64 " )] being processed; Dropping the deletion!" ,
13351335 consumer->logHeader (), vb_, deletion->getBySeqno (),
1336- cur_snapshot_end);
1336+ cur_snapshot_end. load () );
13371337 delete deletion;
13381338 return ENGINE_ERANGE;
13391339 }
@@ -1372,22 +1372,22 @@ ENGINE_ERROR_CODE PassiveStream::processDeletion(MutationResponse* deletion) {
13721372void PassiveStream::processMarker (SnapshotMarker* marker) {
13731373 RCPtr<VBucket> vb = engine->getVBucket (vb_);
13741374
1375- cur_snapshot_start = marker->getStartSeqno ();
1376- cur_snapshot_end = marker->getEndSeqno ();
1377- cur_snapshot_type = ( marker->getFlags () & MARKER_FLAG_DISK) ? disk : memory;
1375+ cur_snapshot_start. store ( marker->getStartSeqno () );
1376+ cur_snapshot_end. store ( marker->getEndSeqno () );
1377+ cur_snapshot_type. store (( marker->getFlags () & MARKER_FLAG_DISK) ? disk : memory) ;
13781378
13791379 if (vb) {
13801380 if (marker->getFlags () & MARKER_FLAG_DISK && vb->getHighSeqno () == 0 ) {
13811381 vb->setBackfillPhase (true );
1382- vb->checkpointManager .setBackfillPhase (cur_snapshot_start,
1383- cur_snapshot_end);
1382+ vb->checkpointManager .setBackfillPhase (cur_snapshot_start. load () ,
1383+ cur_snapshot_end. load () );
13841384 } else {
13851385 if (marker->getFlags () & MARKER_FLAG_CHK ||
13861386 vb->checkpointManager .getOpenCheckpointId () == 0 ) {
1387- vb->checkpointManager .createSnapshot (cur_snapshot_start,
1388- cur_snapshot_end);
1387+ vb->checkpointManager .createSnapshot (cur_snapshot_start. load () ,
1388+ cur_snapshot_end. load () );
13891389 } else {
1390- vb->checkpointManager .updateCurrentSnapshotEnd (cur_snapshot_end);
1390+ vb->checkpointManager .updateCurrentSnapshotEnd (cur_snapshot_end. load () );
13911391 }
13921392 vb->setBackfillPhase (false );
13931393 }
@@ -1413,8 +1413,8 @@ void PassiveStream::processSetVBucketState(SetVBucketState* state) {
14131413}
14141414
14151415void PassiveStream::handleSnapshotEnd (RCPtr<VBucket>& vb, uint64_t byseqno) {
1416- if (byseqno == cur_snapshot_end) {
1417- if (cur_snapshot_type == disk && vb->isBackfillPhase ()) {
1416+ if (byseqno == cur_snapshot_end. load () ) {
1417+ if (cur_snapshot_type. load () == disk && vb->isBackfillPhase ()) {
14181418 vb->setBackfillPhase (false );
14191419 uint64_t id = vb->checkpointManager .getOpenCheckpointId () + 1 ;
14201420 vb->checkpointManager .checkAndAddNewCheckpoint (id, vb);
@@ -1438,7 +1438,7 @@ void PassiveStream::handleSnapshotEnd(RCPtr<VBucket>& vb, uint64_t byseqno) {
14381438 }
14391439 cur_snapshot_ack = false ;
14401440 }
1441- cur_snapshot_type = none;
1441+ cur_snapshot_type. store ( none) ;
14421442 }
14431443}
14441444
@@ -1454,18 +1454,18 @@ void PassiveStream::addStats(ADD_STAT add_stat, const void *c) {
14541454 snprintf (buf, bsize, " %s:stream_%d_items_ready" , name_.c_str (), vb_);
14551455 add_casted_stat (buf, itemsReady ? " true" : " false" , add_stat, c);
14561456 snprintf (buf, bsize, " %s:stream_%d_last_received_seqno" , name_.c_str (), vb_);
1457- add_casted_stat (buf, last_seqno, add_stat, c);
1457+ add_casted_stat (buf, last_seqno. load () , add_stat, c);
14581458 snprintf (buf, bsize, " %s:stream_%d_ready_queue_memory" , name_.c_str (), vb_);
14591459 add_casted_stat (buf, getReadyQueueMemory (), add_stat, c);
14601460
14611461 snprintf (buf, bsize, " %s:stream_%d_cur_snapshot_type" , name_.c_str (), vb_);
1462- add_casted_stat (buf, snapshotTypeToString (cur_snapshot_type), add_stat, c);
1462+ add_casted_stat (buf, snapshotTypeToString (cur_snapshot_type. load () ), add_stat, c);
14631463
1464- if (cur_snapshot_type != none) {
1464+ if (cur_snapshot_type. load () != none) {
14651465 snprintf (buf, bsize, " %s:stream_%d_cur_snapshot_start" , name_.c_str (), vb_);
1466- add_casted_stat (buf, cur_snapshot_start, add_stat, c);
1466+ add_casted_stat (buf, cur_snapshot_start. load () , add_stat, c);
14671467 snprintf (buf, bsize, " %s:stream_%d_cur_snapshot_end" , name_.c_str (), vb_);
1468- add_casted_stat (buf, cur_snapshot_end, add_stat, c);
1468+ add_casted_stat (buf, cur_snapshot_end. load () , add_stat, c);
14691469 }
14701470}
14711471
0 commit comments