Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions services/p2p/HandleWebsocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ func (cm *clientChannelMap) remove(ch chan []byte) {
delete(cm.channels, ch)
}

// maxConcurrentBroadcasts caps the number of in-flight broadcast goroutines so a
// notification burst with many connected clients can't exhaust goroutines/timers.
// Declared as a var (not const) so tests can override it; not exposed to settings
// because the cap is an internal resource ceiling, not a behavioural knob.
var maxConcurrentBroadcasts = 256

func (cm *clientChannelMap) broadcast(data []byte, logger ulogger.Logger) {
// Get a snapshot of channels under the lock
cm.RLock()
Expand All @@ -113,11 +119,15 @@ func (cm *clientChannelMap) broadcast(data []byte, logger ulogger.Logger) {

// Send to all channels in parallel without holding the lock
// This prevents O(N) delay accumulation from blocking clients
sem := make(chan struct{}, maxConcurrentBroadcasts)

var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
sem <- struct{}{} // blocks if pool is full — caps in-flight goroutines
go func(ch chan []byte) {
defer wg.Done()
defer func() { <-sem }()
timer := time.NewTimer(time.Second)
defer func() {
// Ensure timer resources are released promptly when the send succeeds.
Expand Down
43 changes: 43 additions & 0 deletions services/p2p/HandleWebsocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,3 +654,46 @@ broadcastComplete:
// Don't fail the test - the important part is demonstrating the DoS vulnerability is fixed
}
}

// TestBroadcast_BoundedPool verifies the broadcast goroutine pool caps in-flight goroutines.
// It overrides maxConcurrentBroadcasts to a small value, then submits 4x that many unresponsive
// (unbuffered, unread) channels. Every channel hits the 1s send-timeout. With the cap, total
// wall-clock time is ceil(channels/poolSize) * 1s; without it, all timeouts run concurrently
// and total wall-clock is ~1s. The lower bound asserts the semaphore actually serialises work.
func TestBroadcast_BoundedPool(t *testing.T) {
originalPoolSize := maxConcurrentBroadcasts
defer func() { maxConcurrentBroadcasts = originalPoolSize }()
maxConcurrentBroadcasts = 2

cm := newClientChannelMap()

const numChannels = 8
channels := make([]chan []byte, numChannels)

for i := 0; i < numChannels; i++ {
channels[i] = make(chan []byte)
cm.add(channels[i])
}

require.Equal(t, numChannels, cm.count(), "All channels should be registered")

logger := &ulogger.TestLogger{}

startTime := time.Now()
cm.broadcast([]byte("test"), logger)
elapsed := time.Since(startTime)

expectedMin := time.Duration(numChannels/maxConcurrentBroadcasts) * time.Second
expectedMax := expectedMin + 2*time.Second

require.GreaterOrEqual(t, elapsed, expectedMin,
"Broadcast finished too quickly (%v); pool of %d should have serialised %d unresponsive channels into batches taking ~%v",
elapsed, maxConcurrentBroadcasts, numChannels, expectedMin)
require.LessOrEqual(t, elapsed, expectedMax,
"Broadcast took too long (%v); expected at most %v", elapsed, expectedMax)

require.Equal(t, 0, cm.count(), "All timed-out channels should be removed")

t.Logf("Broadcast of %d unresponsive channels with pool=%d completed in %v (expected %v..%v)",
numChannels, maxConcurrentBroadcasts, elapsed, expectedMin, expectedMax)
}
Loading