diff --git a/services/p2p/HandleWebsocket.go b/services/p2p/HandleWebsocket.go index f9b7d32c0..92a25def0 100644 --- a/services/p2p/HandleWebsocket.go +++ b/services/p2p/HandleWebsocket.go @@ -97,6 +97,12 @@ func (cm *clientChannelMap) remove(ch chan []byte) { delete(cm.channels, ch) } +// maxConcurrentBroadcasts caps the number of in-flight broadcast goroutines so a +// notification burst with many connected clients can't exhaust goroutines/timers. +// Declared as a var (not const) so tests can override it; not exposed to settings +// because the cap is an internal resource ceiling, not a behavioural knob. +var maxConcurrentBroadcasts = 256 + func (cm *clientChannelMap) broadcast(data []byte, logger ulogger.Logger) { // Get a snapshot of channels under the lock cm.RLock() @@ -113,11 +119,15 @@ func (cm *clientChannelMap) broadcast(data []byte, logger ulogger.Logger) { // Send to all channels in parallel without holding the lock // This prevents O(N) delay accumulation from blocking clients + sem := make(chan struct{}, maxConcurrentBroadcasts) + var wg sync.WaitGroup for _, ch := range channels { wg.Add(1) + sem <- struct{}{} // blocks if pool is full — caps in-flight goroutines go func(ch chan []byte) { defer wg.Done() + defer func() { <-sem }() timer := time.NewTimer(time.Second) defer func() { // Ensure timer resources are released promptly when the send succeeds. diff --git a/services/p2p/HandleWebsocket_test.go b/services/p2p/HandleWebsocket_test.go index 4c8444f3f..6bc36f404 100644 --- a/services/p2p/HandleWebsocket_test.go +++ b/services/p2p/HandleWebsocket_test.go @@ -654,3 +654,46 @@ broadcastComplete: // Don't fail the test - the important part is demonstrating the DoS vulnerability is fixed } } + +// TestBroadcast_BoundedPool verifies the broadcast goroutine pool caps in-flight goroutines. +// It overrides maxConcurrentBroadcasts to a small value, then submits 4x that many unresponsive +// (unbuffered, unread) channels. Every channel hits the 1s send-timeout. With the cap, total +// wall-clock time is ceil(channels/poolSize) * 1s; without it, all timeouts run concurrently +// and total wall-clock is ~1s. The lower bound asserts the semaphore actually serialises work. +func TestBroadcast_BoundedPool(t *testing.T) { + originalPoolSize := maxConcurrentBroadcasts + defer func() { maxConcurrentBroadcasts = originalPoolSize }() + maxConcurrentBroadcasts = 2 + + cm := newClientChannelMap() + + const numChannels = 8 + channels := make([]chan []byte, numChannels) + + for i := 0; i < numChannels; i++ { + channels[i] = make(chan []byte) + cm.add(channels[i]) + } + + require.Equal(t, numChannels, cm.count(), "All channels should be registered") + + logger := &ulogger.TestLogger{} + + startTime := time.Now() + cm.broadcast([]byte("test"), logger) + elapsed := time.Since(startTime) + + expectedMin := time.Duration(numChannels/maxConcurrentBroadcasts) * time.Second + expectedMax := expectedMin + 2*time.Second + + require.GreaterOrEqual(t, elapsed, expectedMin, + "Broadcast finished too quickly (%v); pool of %d should have serialised %d unresponsive channels into batches taking ~%v", + elapsed, maxConcurrentBroadcasts, numChannels, expectedMin) + require.LessOrEqual(t, elapsed, expectedMax, + "Broadcast took too long (%v); expected at most %v", elapsed, expectedMax) + + require.Equal(t, 0, cm.count(), "All timed-out channels should be removed") + + t.Logf("Broadcast of %d unresponsive channels with pool=%d completed in %v (expected %v..%v)", + numChannels, maxConcurrentBroadcasts, elapsed, expectedMin, expectedMax) +}