Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Invalidate session check if associated session is deleted #22227

Merged
merged 3 commits into from
Mar 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 1.21.0 (March 17, 2025)
* Enhancement: Added support for Consul Session to update the state of a Health Check, allowing for more dynamic and responsive health monitoring within the Consul ecosystem. This feature enables sessions to directly influence health check statuses, improving the overall reliability and accuracy of service health assessments.

## 1.20.3 (February 13, 2025)

SECURITY:
Expand Down
35 changes: 32 additions & 3 deletions agent/consul/state/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package state

import (
"fmt"
"github.com/hashicorp/consul/api"
"reflect"
"strings"
"time"
Expand Down Expand Up @@ -219,7 +220,7 @@ func (s *Store) SessionCreate(idx uint64, sess *structs.Session) error {
// future.

// Call the session creation
if err := sessionCreateTxn(tx, idx, sess); err != nil {
if err := s.sessionCreateTxn(tx, idx, sess); err != nil {
return err
}

Expand All @@ -229,7 +230,7 @@ func (s *Store) SessionCreate(idx uint64, sess *structs.Session) error {
// sessionCreateTxn is the inner method used for creating session entries in
// an open transaction. Any health checks registered with the session will be
// checked for failing status. Returns any error encountered.
func sessionCreateTxn(tx WriteTxn, idx uint64, sess *structs.Session) error {
func (s *Store) sessionCreateTxn(tx WriteTxn, idx uint64, sess *structs.Session) error {
// Check that we have a session ID
if sess.ID == "" {
return ErrMissingSessionID
Expand Down Expand Up @@ -270,7 +271,7 @@ func sessionCreateTxn(tx WriteTxn, idx uint64, sess *structs.Session) error {
return fmt.Errorf("failed inserting session: %s", err)
}

return nil
return s.updateSessionCheck(tx, idx, sess, api.HealthPassing)
}

// SessionGet is used to retrieve an active session from the state store.
Expand Down Expand Up @@ -448,5 +449,33 @@ func (s *Store) deleteSessionTxn(tx WriteTxn, idx uint64, sessionID string, entM
}
}

// session invalidating the health-checks
return s.updateSessionCheck(tx, idx, session, api.HealthCritical)
}

// updateSessionCheck The method updates the health-checks associated with the session
func (s *Store) updateSessionCheck(tx WriteTxn, idx uint64, session *structs.Session, checkState string) error {
// Find all checks for the given Node
iter, err := tx.Get(tableChecks, indexNode, Query{Value: session.Node, EnterpriseMeta: session.EnterpriseMeta})
if err != nil {
return fmt.Errorf("failed check lookup: %s", err)
}

for check := iter.Next(); check != nil; check = iter.Next() {
if hc := check.(*structs.HealthCheck); hc.Type == "session" && hc.Definition.SessionName == session.Name {
updatedCheck := hc.Clone()
updatedCheck.Status = checkState
switch {
case checkState == api.HealthPassing:
updatedCheck.Output = fmt.Sprintf("Session '%s' in force", session.ID)
default:
updatedCheck.Output = fmt.Sprintf("Session '%s' is invalid", session.ID)
}

if err := s.ensureCheckTxn(tx, idx, true, updatedCheck); err != nil {
return err
}
}
}
return nil
}
7 changes: 4 additions & 3 deletions agent/consul/state/session_ce.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,10 @@ func validateSessionChecksTxn(tx ReadTxn, session *structs.Session) error {
}

// Verify that the check is not in critical state
status := check.(*structs.HealthCheck).Status
if status == api.HealthCritical {
return fmt.Errorf("Check '%s' is in %s state", checkID, status)
healthCheck := check.(*structs.HealthCheck)
// we are discounting the health check for session checks since they are expected to be in critical state without session and this flow is expected to be used for session checks
if healthCheck.Status == api.HealthCritical && healthCheck.Type != "session" {
return fmt.Errorf("Check '%s' is in %s state", checkID, healthCheck.Status)
}
}
return nil
Expand Down
144 changes: 144 additions & 0 deletions agent/consul/state/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -961,3 +961,147 @@ func TestStateStore_Session_Invalidate_PreparedQuery_Delete(t *testing.T) {
t.Fatalf("bad: %v", q2)
}
}

// the goal of this test is to verify if the system is blocking the session registration when a check is in critical state.
func TestHealthCheck_SessionRegistrationFail(t *testing.T) {
s := testStateStore(t)

var check *structs.HealthCheck
// setup node
testRegisterNode(t, s, 1, "foo-node")
testRegisterCheckCustom(t, s, 1, "foo", func(chk *structs.HealthCheck) {
chk.Node = "foo-node"
chk.Type = "tll"
chk.Status = api.HealthCritical
chk.Definition = structs.HealthCheckDefinition{
SessionName: "test-session",
}
check = chk
})

// Ensure the index was not updated if nothing was destroyed.
if idx := s.maxIndex("sessions"); idx != 0 {
t.Fatalf("bad index: %d", idx)
}

// Register a new session
sess := &structs.Session{
ID: testUUID(),
Node: "foo-node",
Name: "test-session",
Checks: make([]types.CheckID, 0),
}

sess.Checks = append(sess.Checks, check.CheckID)
// assert the check is critical initially
assertHealthCheckStatus(t, s, sess, check.CheckID, api.HealthCritical)

if err := s.SessionCreate(2, sess); err == nil {
// expecting error: Check 'foo' is in critical state
t.Fatalf("expected error, got nil")
}
}

// Allow the session to be created even if the check is critical.
// This is mainly to discount the health check of type `session`
func TestHealthCheck_SessionRegistrationAllow(t *testing.T) {
s := testStateStore(t)

var check *structs.HealthCheck
// setup node
testRegisterNode(t, s, 1, "foo-node")
testRegisterCheckCustom(t, s, 1, "foo", func(chk *structs.HealthCheck) {
chk.Node = "foo-node"
chk.Type = "session"
chk.Status = api.HealthCritical
chk.Definition = structs.HealthCheckDefinition{
SessionName: "test-session",
}
check = chk
})

// Ensure the index was not updated if nothing was destroyed.
if idx := s.maxIndex("sessions"); idx != 0 {
t.Fatalf("bad index: %d", idx)
}

// Register a new session
sess := &structs.Session{
ID: testUUID(),
Node: "foo-node",
Name: "test-session",
Checks: make([]types.CheckID, 0),
}

sess.Checks = append(sess.Checks, check.CheckID)
// assert the check is critical initially
assertHealthCheckStatus(t, s, sess, check.CheckID, api.HealthCritical)

if err := s.SessionCreate(2, sess); err != nil {
t.Fatalf("The system shall allow session to be created ignoring the session check is critical. err: %s", err)
}
}

// test the session health check when session status is changed
func TestHealthCheck_Session(t *testing.T) {
s := testStateStore(t)

var check *structs.HealthCheck
// setup node
testRegisterNode(t, s, 1, "foo-node")
testRegisterCheckCustom(t, s, 1, "foo", func(chk *structs.HealthCheck) {
chk.Node = "foo-node"
chk.Type = "session"
chk.Status = api.HealthCritical
chk.Definition = structs.HealthCheckDefinition{
SessionName: "test-session",
}
check = chk
})

// Ensure the index was not updated if nothing was destroyed.
if idx := s.maxIndex("sessions"); idx != 0 {
t.Fatalf("bad index: %d", idx)
}

// Register a new session
sess := &structs.Session{
ID: testUUID(),
Node: "foo-node",
Name: "test-session",
}
// assert the check is critical initially
assertHealthCheckStatus(t, s, sess, check.CheckID, api.HealthCritical)

if err := s.SessionCreate(2, sess); err != nil {
t.Fatalf("The system shall allow session to be created ignoring the session check is critical. err: %s", err)
}
// assert the check is critical after session creation
assertHealthCheckStatus(t, s, sess, check.CheckID, api.HealthPassing)

// Destroy the session.
if err := s.SessionDestroy(3, sess.ID, nil); err != nil {
t.Fatalf("err: %s", err)
}
// assert the check is critical after session destroy
assertHealthCheckStatus(t, s, sess, check.CheckID, api.HealthCritical)
}

func assertHealthCheckStatus(t *testing.T, s *Store, session *structs.Session, checkID types.CheckID, expectedStatus string) {
_, hc, err := s.NodeChecks(nil, session.Node, structs.DefaultEnterpriseMetaInPartition(""), structs.DefaultPeerKeyword)
if err != nil {
t.Fatalf("err: %s", err)
}
// assert the check is healthy
for _, c := range hc {
if c.CheckID == checkID {
if c.Status != expectedStatus {
t.Fatalf("check is expected to be %s but actually it is %s", expectedStatus, c.Status)
} else {
return
}
}
}

t.Fatalf("check %s, is not found", string(checkID))
}
23 changes: 23 additions & 0 deletions agent/consul/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,29 @@ func testRegisterCheckWithPartition(t *testing.T, s *Store, idx uint64,
}
}

func testRegisterCheckCustom(t *testing.T, s *Store, idx uint64, checkID types.CheckID, update func(chk *structs.HealthCheck)) {
chk := &structs.HealthCheck{
CheckID: checkID,
EnterpriseMeta: *structs.DefaultEnterpriseMetaInPartition(""),
}

update(chk)

if err := s.EnsureCheck(idx, chk); err != nil {
t.Fatalf("err: %s", err)
}

tx := s.db.Txn(false)
defer tx.Abort()
c, err := tx.First(tableChecks, indexID, NodeCheckQuery{Node: chk.Node, CheckID: string(checkID), EnterpriseMeta: chk.EnterpriseMeta})
if err != nil {
t.Fatalf("err: %s", err)
}
if result, ok := c.(*structs.HealthCheck); !ok || result.CheckID != checkID {
t.Fatalf("bad check: %#v", result)
}
}

func testRegisterSidecarProxy(t *testing.T, s *Store, idx uint64, nodeID string, targetServiceID string) {
testRegisterSidecarProxyOpts(t, s, idx, nodeID, targetServiceID)
}
Expand Down
1 change: 1 addition & 0 deletions agent/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1912,6 +1912,7 @@ type HealthCheckDefinition struct {
GRPCUseTLS bool `json:",omitempty"`
AliasNode string `json:",omitempty"`
AliasService string `json:",omitempty"`
SessionName string `json:",omitempty"`
TTL time.Duration `json:",omitempty"`
}

Expand Down
2 changes: 2 additions & 0 deletions api/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ type HealthCheckDefinition struct {
IntervalDuration time.Duration `json:"-"`
TimeoutDuration time.Duration `json:"-"`
DeregisterCriticalServiceAfterDuration time.Duration `json:"-"`
// when parent Type is `session`, and if this session is destroyed, the check will be marked as critical
SessionName string `json:",omitempty"`

// DEPRECATED in Consul 1.4.1. Use the above time.Duration fields instead.
Interval ReadableDuration
Expand Down
2 changes: 2 additions & 0 deletions proto/private/pbservice/healthcheck.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading