@@ -445,108 +445,113 @@ func (cc *jetStreamCluster) isStreamCurrent(account, stream string) bool {
445
445
446
446
// isStreamHealthy will determine if the stream is up to date or very close.
447
447
// For R1 it will make sure the stream is present on this server.
448
- func (js * jetStream ) isStreamHealthy (acc * Account , sa * streamAssignment ) bool {
448
+ func (js * jetStream ) isStreamHealthy (acc * Account , sa * streamAssignment ) error {
449
449
js .mu .RLock ()
450
450
s , cc := js .srv , js .cluster
451
451
if cc == nil {
452
452
// Non-clustered mode
453
453
js .mu .RUnlock ()
454
- return true
454
+ return nil
455
455
}
456
-
457
- // Pull the group out.
458
- rg := sa .Group
459
- if rg == nil {
456
+ if sa == nil || sa .Group == nil {
460
457
js .mu .RUnlock ()
461
- return false
458
+ return fmt . Errorf ( "stream assignment or group missing" )
462
459
}
463
-
464
460
streamName := sa .Config .Name
465
- node := rg .node
461
+ node := sa . Group .node
466
462
js .mu .RUnlock ()
467
463
468
464
// First lookup stream and make sure its there.
469
465
mset , err := acc .lookupStream (streamName )
470
466
if err != nil {
471
- return false
467
+ return fmt . Errorf ( "stream not found" )
472
468
}
473
469
474
- // If R1 we are good.
475
- if node == nil {
476
- return true
477
- }
470
+ switch {
471
+ case mset .cfg .Replicas <= 1 :
472
+ return nil // No further checks for R=1 streams
478
473
479
- // Here we are a replicated stream.
480
- // First make sure our monitor routine is running.
481
- if ! mset .isMonitorRunning () {
482
- return false
483
- }
474
+ case node == nil :
475
+ return fmt .Errorf ("group node missing" )
484
476
485
- if node .Healthy () {
486
- // Check if we are processing a snapshot and are catching up.
487
- if ! mset .isCatchingUp () {
488
- return true
489
- }
490
- } else { // node != nil
491
- if node != mset .raftNode () {
492
- s .Warnf ("Detected stream cluster node skew '%s > %s'" , acc .GetName (), streamName )
493
- node .Delete ()
494
- mset .resetClusteredState (nil )
495
- }
477
+ case ! mset .isMonitorRunning ():
478
+ return fmt .Errorf ("monitor goroutine not running" )
479
+
480
+ case ! node .Healthy ():
481
+ return fmt .Errorf ("group node unhealthy" )
482
+
483
+ case mset .isCatchingUp ():
484
+ return fmt .Errorf ("stream catching up" )
485
+
486
+ case node != mset .raftNode ():
487
+ s .Warnf ("Detected stream cluster node skew '%s > %s'" , acc .GetName (), streamName )
488
+ node .Delete ()
489
+ mset .resetClusteredState (nil )
490
+ return fmt .Errorf ("cluster node skew detected" )
491
+
492
+ default :
493
+ return nil
496
494
}
497
- return false
498
495
}
499
496
500
497
// isConsumerHealthy will determine if the consumer is up to date.
501
498
// For R1 it will make sure the consunmer is present on this server.
502
- func (js * jetStream ) isConsumerHealthy (mset * stream , consumer string , ca * consumerAssignment ) bool {
499
+ func (js * jetStream ) isConsumerHealthy (mset * stream , consumer string , ca * consumerAssignment ) error {
503
500
if mset == nil {
504
- return false
501
+ return fmt . Errorf ( "stream missing" )
505
502
}
506
-
507
503
js .mu .RLock ()
508
- cc := js .cluster
504
+ s , cc := js . srv , js .cluster
509
505
if cc == nil {
510
506
// Non-clustered mode
511
507
js .mu .RUnlock ()
512
- return true
508
+ return nil
513
509
}
514
- // These are required.
515
510
if ca == nil || ca .Group == nil {
516
511
js .mu .RUnlock ()
517
- return false
512
+ return fmt . Errorf ( "consumer assignment or group missing" )
518
513
}
519
- s := js .srv
520
- // Capture RAFT node from assignment.
521
514
node := ca .Group .node
522
515
js .mu .RUnlock ()
523
516
524
517
// Check if not running at all.
525
518
o := mset .lookupConsumer (consumer )
526
519
if o == nil {
527
- return false
520
+ return fmt . Errorf ( "consumer not found" )
528
521
}
529
522
530
- // Check RAFT node state.
531
- if node == nil || node .Healthy () {
532
- return true
533
- } else if node != nil {
534
- if node != o .raftNode () {
535
- mset .mu .RLock ()
536
- accName , streamName := mset .acc .GetName (), mset .cfg .Name
537
- mset .mu .RUnlock ()
538
- s .Warnf ("Detected consumer cluster node skew '%s > %s > %s'" , accName , streamName , consumer )
539
- node .Delete ()
540
- o .deleteWithoutAdvisory ()
523
+ rc , _ := o .replica ()
524
+ switch {
525
+ case rc <= 1 :
526
+ return nil // No further checks for R=1 consumers
541
527
542
- // When we try to restart we nil out the node and reprocess the consumer assignment.
543
- js .mu .Lock ()
544
- ca .Group .node = nil
545
- js .mu .Unlock ()
546
- js .processConsumerAssignment (ca )
547
- }
528
+ case node == nil :
529
+ return fmt .Errorf ("group node missing" )
530
+
531
+ case ! o .isMonitorRunning ():
532
+ return fmt .Errorf ("monitor goroutine not running" )
533
+
534
+ case ! node .Healthy ():
535
+ return fmt .Errorf ("group node unhealthy" )
536
+
537
+ case node != mset .raftNode ():
538
+ mset .mu .RLock ()
539
+ accName , streamName := mset .acc .GetName (), mset .cfg .Name
540
+ mset .mu .RUnlock ()
541
+ s .Warnf ("Detected consumer cluster node skew '%s > %s > %s'" , accName , streamName , consumer )
542
+ node .Delete ()
543
+ o .deleteWithoutAdvisory ()
544
+
545
+ // When we try to restart we nil out the node and reprocess the consumer assignment.
546
+ js .mu .Lock ()
547
+ ca .Group .node = nil
548
+ js .mu .Unlock ()
549
+ js .processConsumerAssignment (ca )
550
+ return fmt .Errorf ("cluster node skew detected" )
551
+
552
+ default :
553
+ return nil
548
554
}
549
- return false
550
555
}
551
556
552
557
// subjectsOverlap checks all existing stream assignments for the account cross-cluster for subject overlap
0 commit comments