Skip to content

Commit 28bc450

Browse files
committed
Fix resource leak in session storage cleanup
Implements fix by calling Close() on sessions that implement io.Closer before deleting them from storage during cleanup. This prevents connection leaks when vMCP sessions with backend clients expire. Closes: #3871
1 parent 0791876 commit 28bc450

File tree

2 files changed

+341
-6
lines changed

2 files changed

+341
-6
lines changed

pkg/transport/session/storage_local.go

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ package session
66
import (
77
"context"
88
"fmt"
9+
"io"
10+
"log/slog"
911
"sync"
1012
"time"
1113
)
@@ -66,9 +68,12 @@ func (s *LocalStorage) Delete(_ context.Context, id string) error {
6668

6769
// DeleteExpired removes all sessions that haven't been updated since the given time.
6870
func (s *LocalStorage) DeleteExpired(ctx context.Context, before time.Time) error {
69-
var toDelete []string
71+
var toDelete []struct {
72+
id string
73+
session Session
74+
}
7075

71-
// First pass: collect IDs of expired sessions
76+
// First pass: collect expired sessions
7277
s.sessions.Range(func(key, val any) bool {
7378
// Check for context cancellation
7479
select {
@@ -80,16 +85,44 @@ func (s *LocalStorage) DeleteExpired(ctx context.Context, before time.Time) erro
8085
if session, ok := val.(Session); ok {
8186
if session.UpdatedAt().Before(before) {
8287
if id, ok := key.(string); ok {
83-
toDelete = append(toDelete, id)
88+
toDelete = append(toDelete, struct {
89+
id string
90+
session Session
91+
}{id, session})
8492
}
8593
}
8694
}
8795
return true
8896
})
8997

90-
// Second pass: delete expired sessions
91-
for _, id := range toDelete {
92-
s.sessions.Delete(id)
98+
// Second pass: close and delete expired sessions
99+
for _, item := range toDelete {
100+
// Check for context cancellation before processing each session
101+
select {
102+
case <-ctx.Done():
103+
return ctx.Err()
104+
default:
105+
}
106+
107+
// Re-check expiration and use CompareAndDelete to handle race conditions:
108+
// - Session may have been touched via Manager.Get().Touch() and is no longer expired
109+
// - Session may have been replaced via Store/UpsertSession with a new object
110+
// Only proceed if the stored value is still the same session object and still expired
111+
if item.session.UpdatedAt().Before(before) {
112+
// CompareAndDelete ensures we only delete if the value hasn't been replaced
113+
if deleted := s.sessions.CompareAndDelete(item.id, item.session); deleted {
114+
// Successfully deleted - now close if implements io.Closer
115+
if closer, ok := item.session.(io.Closer); ok {
116+
if err := closer.Close(); err != nil {
117+
slog.Warn("failed to close session during cleanup",
118+
"session_id", item.id,
119+
"error", err)
120+
}
121+
}
122+
}
123+
// If CompareAndDelete returned false, the session was already replaced/deleted - skip it
124+
}
125+
// If re-check shows session is no longer expired (was touched), skip it
93126
}
94127

95128
return nil

pkg/transport/session/storage_test.go

Lines changed: 302 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package session
55

66
import (
77
"context"
8+
"errors"
89
"fmt"
910
"testing"
1011
"time"
@@ -13,6 +14,24 @@ import (
1314
"github.com/stretchr/testify/require"
1415
)
1516

17+
// mockClosableSession is a test session that implements io.Closer
18+
type mockClosableSession struct {
19+
*ProxySession
20+
closeCalled bool
21+
closeError error
22+
}
23+
24+
func newMockClosableSession(id string) *mockClosableSession {
25+
return &mockClosableSession{
26+
ProxySession: NewProxySession(id),
27+
}
28+
}
29+
30+
func (m *mockClosableSession) Close() error {
31+
m.closeCalled = true
32+
return m.closeError
33+
}
34+
1635
// TestLocalStorage tests the LocalStorage implementation
1736
func TestLocalStorage(t *testing.T) {
1837
t.Parallel()
@@ -302,6 +321,289 @@ func TestLocalStorage(t *testing.T) {
302321
// Should not error, just stop early
303322
assert.NoError(t, err)
304323
})
324+
325+
t.Run("DeleteExpired calls Close on io.Closer sessions", func(t *testing.T) {
326+
t.Parallel()
327+
storage := NewLocalStorage()
328+
defer storage.Close()
329+
330+
ctx := context.Background()
331+
332+
// Create a closable session (implements io.Closer)
333+
closableSession := newMockClosableSession("closable-session")
334+
closableSession.updated = time.Now().Add(-2 * time.Hour)
335+
336+
// Create a regular session (does not implement io.Closer)
337+
regularSession := NewProxySession("regular-session")
338+
regularSession.updated = time.Now().Add(-2 * time.Hour)
339+
340+
// Store both sessions
341+
err := storage.Store(ctx, closableSession)
342+
require.NoError(t, err)
343+
err = storage.Store(ctx, regularSession)
344+
require.NoError(t, err)
345+
346+
// Delete sessions older than 1 hour
347+
cutoff := time.Now().Add(-1 * time.Hour)
348+
err = storage.DeleteExpired(ctx, cutoff)
349+
require.NoError(t, err)
350+
351+
// Both sessions should be deleted
352+
_, err = storage.Load(ctx, "closable-session")
353+
assert.Equal(t, ErrSessionNotFound, err)
354+
_, err = storage.Load(ctx, "regular-session")
355+
assert.Equal(t, ErrSessionNotFound, err)
356+
357+
// Close() is called synchronously, so it should already be done
358+
assert.True(t, closableSession.closeCalled,
359+
"Close() should have been called on closable session")
360+
})
361+
362+
t.Run("DeleteExpired continues deletion even if Close fails", func(t *testing.T) {
363+
t.Parallel()
364+
storage := NewLocalStorage()
365+
defer storage.Close()
366+
367+
ctx := context.Background()
368+
369+
// Create a closable session that returns an error on Close()
370+
failingSession := newMockClosableSession("failing-session")
371+
failingSession.closeError = errors.New("close failed")
372+
failingSession.updated = time.Now().Add(-2 * time.Hour)
373+
374+
// Store the session
375+
err := storage.Store(ctx, failingSession)
376+
require.NoError(t, err)
377+
378+
// Delete expired sessions - should not fail even if Close() returns an error
379+
cutoff := time.Now().Add(-1 * time.Hour)
380+
err = storage.DeleteExpired(ctx, cutoff)
381+
require.NoError(t, err)
382+
383+
// Session should be deleted from storage even though Close() failed
384+
_, err = storage.Load(ctx, "failing-session")
385+
assert.Equal(t, ErrSessionNotFound, err)
386+
387+
// Close() is called synchronously, so it should already be done
388+
assert.True(t, failingSession.closeCalled,
389+
"Close() should have been called even though it returned an error")
390+
391+
// Note: We don't verify log output to maintain t.Parallel() compatibility.
392+
// The important behavior is that deletion continues even when Close() fails.
393+
})
394+
395+
t.Run("DeleteExpired handles non-io.Closer sessions without error", func(t *testing.T) {
396+
t.Parallel()
397+
storage := NewLocalStorage()
398+
defer storage.Close()
399+
400+
ctx := context.Background()
401+
402+
// Create multiple regular sessions (do not implement io.Closer)
403+
for i := 0; i < 5; i++ {
404+
session := NewProxySession(fmt.Sprintf("session-%d", i))
405+
session.updated = time.Now().Add(-2 * time.Hour)
406+
err := storage.Store(ctx, session)
407+
require.NoError(t, err)
408+
}
409+
410+
// Delete expired sessions
411+
cutoff := time.Now().Add(-1 * time.Hour)
412+
err := storage.DeleteExpired(ctx, cutoff)
413+
require.NoError(t, err)
414+
415+
// All sessions should be deleted
416+
for i := 0; i < 5; i++ {
417+
_, err := storage.Load(ctx, fmt.Sprintf("session-%d", i))
418+
assert.Equal(t, ErrSessionNotFound, err)
419+
}
420+
})
421+
422+
t.Run("DeleteExpired with mixed session types", func(t *testing.T) {
423+
t.Parallel()
424+
storage := NewLocalStorage()
425+
defer storage.Close()
426+
427+
ctx := context.Background()
428+
429+
// Create a mix of closable and regular expired sessions
430+
closable1 := newMockClosableSession("closable-1")
431+
closable1.updated = time.Now().Add(-2 * time.Hour)
432+
closable2 := newMockClosableSession("closable-2")
433+
closable2.updated = time.Now().Add(-2 * time.Hour)
434+
435+
regular1 := NewProxySession("regular-1")
436+
regular1.updated = time.Now().Add(-2 * time.Hour)
437+
regular2 := NewProxySession("regular-2")
438+
regular2.updated = time.Now().Add(-2 * time.Hour)
439+
440+
// Store all sessions
441+
err := storage.Store(ctx, closable1)
442+
require.NoError(t, err)
443+
err = storage.Store(ctx, closable2)
444+
require.NoError(t, err)
445+
err = storage.Store(ctx, regular1)
446+
require.NoError(t, err)
447+
err = storage.Store(ctx, regular2)
448+
require.NoError(t, err)
449+
450+
// Delete expired sessions
451+
cutoff := time.Now().Add(-1 * time.Hour)
452+
err = storage.DeleteExpired(ctx, cutoff)
453+
require.NoError(t, err)
454+
455+
// All sessions should be deleted
456+
_, err = storage.Load(ctx, "closable-1")
457+
assert.Equal(t, ErrSessionNotFound, err)
458+
_, err = storage.Load(ctx, "closable-2")
459+
assert.Equal(t, ErrSessionNotFound, err)
460+
_, err = storage.Load(ctx, "regular-1")
461+
assert.Equal(t, ErrSessionNotFound, err)
462+
_, err = storage.Load(ctx, "regular-2")
463+
assert.Equal(t, ErrSessionNotFound, err)
464+
465+
// Close() is called synchronously, so it should already be done
466+
assert.True(t, closable1.closeCalled,
467+
"Close() should have been called on closable-1")
468+
assert.True(t, closable2.closeCalled,
469+
"Close() should have been called on closable-2")
470+
})
471+
472+
t.Run("DeleteExpired respects context cancellation during deletion", func(t *testing.T) {
473+
t.Parallel()
474+
storage := NewLocalStorage()
475+
defer storage.Close()
476+
477+
ctx := context.Background()
478+
479+
// Create many expired sessions to increase chance of context check
480+
for i := 0; i < 10000; i++ {
481+
session := NewProxySession(fmt.Sprintf("session-%d", i))
482+
session.updated = time.Now().Add(-2 * time.Hour)
483+
err := storage.Store(ctx, session)
484+
require.NoError(t, err)
485+
}
486+
487+
// Create a context with a very short timeout
488+
timeoutCtx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
489+
defer cancel()
490+
491+
// Wait a bit to ensure context times out
492+
time.Sleep(10 * time.Millisecond)
493+
494+
// DeleteExpired should respect context timeout
495+
cutoff := time.Now().Add(-1 * time.Hour)
496+
err := storage.DeleteExpired(timeoutCtx, cutoff)
497+
498+
// With 10000 sessions, the context check should trigger during cleanup
499+
// If it completes too quickly, that's also acceptable behavior
500+
if err != nil {
501+
assert.Equal(t, context.DeadlineExceeded, err)
502+
// Some sessions deleted, but not all due to timeout
503+
remaining := storage.Count()
504+
assert.Greater(t, remaining, 0, "Some sessions should remain due to context timeout")
505+
}
506+
})
507+
508+
t.Run("DeleteExpired handles concurrent Touch() race condition", func(t *testing.T) {
509+
t.Parallel()
510+
storage := NewLocalStorage()
511+
defer storage.Close()
512+
513+
ctx := context.Background()
514+
515+
// Create an expired session
516+
session := NewProxySession("race-session")
517+
session.updated = time.Now().Add(-2 * time.Hour)
518+
err := storage.Store(ctx, session)
519+
require.NoError(t, err)
520+
521+
// Create many other expired sessions to slow down the deletion loop
522+
for i := 0; i < 1000; i++ {
523+
dummySession := NewProxySession(fmt.Sprintf("dummy-%d", i))
524+
dummySession.updated = time.Now().Add(-2 * time.Hour)
525+
err := storage.Store(ctx, dummySession)
526+
require.NoError(t, err)
527+
}
528+
529+
// Start DeleteExpired in a goroutine
530+
done := make(chan error, 1)
531+
go func() {
532+
cutoff := time.Now().Add(-1 * time.Hour)
533+
done <- storage.DeleteExpired(ctx, cutoff)
534+
}()
535+
536+
// Concurrently touch the session to make it non-expired
537+
// This simulates Manager.Get().Touch() being called during cleanup
538+
session.Touch()
539+
540+
// Wait for DeleteExpired to complete
541+
err = <-done
542+
require.NoError(t, err)
543+
544+
// The session should NOT be deleted because it was touched
545+
// (CompareAndDelete would fail due to updated timestamp or re-check would skip it)
546+
loaded, err := storage.Load(ctx, "race-session")
547+
if err == nil {
548+
// Session still exists - this is correct behavior
549+
assert.NotNil(t, loaded)
550+
assert.True(t, loaded.UpdatedAt().After(time.Now().Add(-1*time.Hour)),
551+
"Session should have recent timestamp after Touch()")
552+
}
553+
// Note: Due to timing, the session might still be deleted if Touch() happened
554+
// after the re-check but before CompareAndDelete. This is acceptable as the
555+
// important thing is we don't close a session that's been replaced.
556+
})
557+
558+
t.Run("DeleteExpired handles concurrent Store() replacement race condition", func(t *testing.T) {
559+
t.Parallel()
560+
storage := NewLocalStorage()
561+
defer storage.Close()
562+
563+
ctx := context.Background()
564+
565+
// Create an expired closable session
566+
oldSession := newMockClosableSession("replace-session")
567+
oldSession.updated = time.Now().Add(-2 * time.Hour)
568+
err := storage.Store(ctx, oldSession)
569+
require.NoError(t, err)
570+
571+
// Create many other expired sessions to slow down the deletion loop
572+
for i := 0; i < 1000; i++ {
573+
dummySession := NewProxySession(fmt.Sprintf("dummy-%d", i))
574+
dummySession.updated = time.Now().Add(-2 * time.Hour)
575+
err := storage.Store(ctx, dummySession)
576+
require.NoError(t, err)
577+
}
578+
579+
// Start DeleteExpired in a goroutine
580+
done := make(chan error, 1)
581+
go func() {
582+
cutoff := time.Now().Add(-1 * time.Hour)
583+
done <- storage.DeleteExpired(ctx, cutoff)
584+
}()
585+
586+
// Concurrently replace the session with a new one (same ID, different object)
587+
// This simulates UpsertSession being called during cleanup
588+
newSession := newMockClosableSession("replace-session")
589+
err = storage.Store(ctx, newSession)
590+
require.NoError(t, err)
591+
592+
// Wait for DeleteExpired to complete
593+
err = <-done
594+
require.NoError(t, err)
595+
596+
// The new session should still exist (CompareAndDelete prevents deleting it)
597+
loaded, err := storage.Load(ctx, "replace-session")
598+
require.NoError(t, err)
599+
assert.NotNil(t, loaded)
600+
601+
// The old session's Close() may or may not have been called depending on timing
602+
// The important thing is the new session is not closed
603+
// Since Close() is now synchronous, we can check immediately
604+
assert.False(t, newSession.closeCalled,
605+
"New session should not be closed (CompareAndDelete should prevent this)")
606+
})
305607
}
306608

307609
// TestManagerWithStorage tests the Manager with the Storage interface

0 commit comments

Comments
 (0)