Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Descriptive stream & consumer health errors #6416

Merged
merged 1 commit into from
Jan 27, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 64 additions & 59 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
@@ -445,108 +445,113 @@ func (cc *jetStreamCluster) isStreamCurrent(account, stream string) bool {

// isStreamHealthy will determine if the stream is up to date or very close.
// For R1 it will make sure the stream is present on this server.
func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool {
func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) error {
js.mu.RLock()
s, cc := js.srv, js.cluster
if cc == nil {
// Non-clustered mode
js.mu.RUnlock()
return true
return nil
}

// Pull the group out.
rg := sa.Group
if rg == nil {
if sa == nil || sa.Group == nil {
js.mu.RUnlock()
return false
return errors.New("stream assignment or group missing")
}

streamName := sa.Config.Name
node := rg.node
node := sa.Group.node
js.mu.RUnlock()

// First lookup stream and make sure its there.
mset, err := acc.lookupStream(streamName)
if err != nil {
return false
return errors.New("stream not found")
}

// If R1 we are good.
if node == nil {
return true
}
switch {
case mset.cfg.Replicas <= 1:
return nil // No further checks for R=1 streams

// Here we are a replicated stream.
// First make sure our monitor routine is running.
if !mset.isMonitorRunning() {
return false
}
case node == nil:
return errors.New("group node missing")

if node.Healthy() {
// Check if we are processing a snapshot and are catching up.
if !mset.isCatchingUp() {
return true
}
} else { // node != nil
if node != mset.raftNode() {
s.Warnf("Detected stream cluster node skew '%s > %s'", acc.GetName(), streamName)
node.Delete()
mset.resetClusteredState(nil)
}
case node != mset.raftNode():
s.Warnf("Detected stream cluster node skew '%s > %s'", acc.GetName(), streamName)
node.Delete()
mset.resetClusteredState(nil)
return errors.New("cluster node skew detected")

case !mset.isMonitorRunning():
return errors.New("monitor goroutine not running")

case !node.Healthy():
return errors.New("group node unhealthy")

case mset.isCatchingUp():
return errors.New("stream catching up")

default:
return nil
}
return false
}

// isConsumerHealthy will determine if the consumer is up to date.
// For R1 it will make sure the consunmer is present on this server.
func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consumerAssignment) bool {
func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consumerAssignment) error {
if mset == nil {
return false
return errors.New("stream missing")
}

js.mu.RLock()
cc := js.cluster
s, cc := js.srv, js.cluster
if cc == nil {
// Non-clustered mode
js.mu.RUnlock()
return true
return nil
}
// These are required.
if ca == nil || ca.Group == nil {
js.mu.RUnlock()
return false
return errors.New("consumer assignment or group missing")
}
s := js.srv
// Capture RAFT node from assignment.
node := ca.Group.node
js.mu.RUnlock()

// Check if not running at all.
o := mset.lookupConsumer(consumer)
if o == nil {
return false
return errors.New("consumer not found")
}

// Check RAFT node state.
if node == nil || node.Healthy() {
return true
} else if node != nil {
if node != o.raftNode() {
mset.mu.RLock()
accName, streamName := mset.acc.GetName(), mset.cfg.Name
mset.mu.RUnlock()
s.Warnf("Detected consumer cluster node skew '%s > %s > %s'", accName, streamName, consumer)
node.Delete()
o.deleteWithoutAdvisory()
rc, _ := o.replica()
switch {
case rc <= 1:
return nil // No further checks for R=1 consumers

// When we try to restart we nil out the node and reprocess the consumer assignment.
js.mu.Lock()
ca.Group.node = nil
js.mu.Unlock()
js.processConsumerAssignment(ca)
}
case node == nil:
return errors.New("group node missing")

case node != o.raftNode():
mset.mu.RLock()
accName, streamName := mset.acc.GetName(), mset.cfg.Name
mset.mu.RUnlock()
s.Warnf("Detected consumer cluster node skew '%s > %s > %s'", accName, streamName, consumer)
node.Delete()
o.deleteWithoutAdvisory()

// When we try to restart we nil out the node and reprocess the consumer assignment.
js.mu.Lock()
ca.Group.node = nil
js.mu.Unlock()
js.processConsumerAssignment(ca)
return errors.New("cluster node skew detected")

case !o.isMonitorRunning():
return errors.New("monitor goroutine not running")

case !node.Healthy():
return errors.New("group node unhealthy")

default:
return nil
}
return false
}

// subjectsOverlap checks all existing stream assignments for the account cross-cluster for subject overlap
7 changes: 4 additions & 3 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
@@ -7263,9 +7263,10 @@ func TestJetStreamClusterConsumerHealthCheckMustNotRecreate(t *testing.T) {
}

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
Retention: nats.InterestPolicy, // Replicated consumers by default
})
require_NoError(t, err)
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"})
12 changes: 6 additions & 6 deletions server/monitor.go
Original file line number Diff line number Diff line change
@@ -3690,35 +3690,35 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus {

for stream, sa := range asa {
// Make sure we can look up
if !js.isStreamHealthy(acc, sa) {
if err := js.isStreamHealthy(acc, sa); err != nil {
if !details {
health.Status = na
health.Error = fmt.Sprintf("JetStream stream '%s > %s' is not current", accName, stream)
health.Error = fmt.Sprintf("JetStream stream '%s > %s' is not current: %s", accName, stream, err)
return health
}
health.Errors = append(health.Errors, HealthzError{
Type: HealthzErrorStream,
Account: accName,
Stream: stream,
Error: fmt.Sprintf("JetStream stream '%s > %s' is not current", accName, stream),
Error: fmt.Sprintf("JetStream stream '%s > %s' is not current: %s", accName, stream, err),
})
continue
}
mset, _ := acc.lookupStream(stream)
// Now check consumers.
for consumer, ca := range sa.consumers {
if !js.isConsumerHealthy(mset, consumer, ca) {
if err := js.isConsumerHealthy(mset, consumer, ca); err != nil {
if !details {
health.Status = na
health.Error = fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current", acc, stream, consumer)
health.Error = fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current: %s", acc, stream, consumer, err)
return health
}
health.Errors = append(health.Errors, HealthzError{
Type: HealthzErrorConsumer,
Account: accName,
Stream: stream,
Consumer: consumer,
Error: fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current", acc, stream, consumer),
Error: fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current: %s", acc, stream, consumer, err),
})
}
}