Skip to content

fix(blocksync,p2p,service): fix concurrency bugs behind chronic CI flakes#1340

Open
lklimek wants to merge 7 commits into
v1.6-devfrom
fix/ci-flaky-tests
Open

fix(blocksync,p2p,service): fix concurrency bugs behind chronic CI flakes#1340
lklimek wants to merge 7 commits into
v1.6-devfrom
fix/ci-flaky-tests

Conversation

@lklimek

@lklimek lklimek commented Jun 13, 2026

Copy link
Copy Markdown
Collaborator

Issue being fixed or feature implemented

Why this PR exists

  • Problem: Three recurring CI "Test" failures long treated as flakes are actually real concurrency bugs in shared primitives.
  • What breaks without it: TestReactor_SyncTime wedges ~10% (permanent sync stall at height 1); internal/consensus panics possible deadlock when sending disconnected broadcast on slow runners; TestNodeNewSeedNode fails (Wait() returns before IsRunning() flips). Net: chronically red, untrustworthy CI.

What was done?

7 commits:

  • workerpool: serialize Start/Stop/Reset/Run under one lifecycle mutex — fixes a race where a Stop interleaving Run's Reset→Start left the pool "running" with zero workers; synchronizer.runHandler exits on ErrWorkerPoolStopped.
  • blocksync: drive runHandler off the synchronizer's OWN cancelable context — fixes the goroutine start-race (handler spawned in OnStart could die at birth because BaseService.Start sets running=1 only after OnStart returns) that wedged TestReactor_SyncTime, and the stop-time producer-goroutine leak (caught-up produceJob never observed ErrWorkerPoolStopped); deterministic monitor-interval test hook for the rate assertion.
  • p2p: key the PeerManager broadcast deadlock-guard on a new ErrBroadcastDeadlock sentinel — stops panicking on benign caller-ctx expiry while actually catching the real capacity-exhaustion deadlock it previously missed.
  • service: BaseService.Stop clears running=0 before cancel() so Wait() can't return while IsRunning() is still true.
  • docs: document the workerpool Send/Receive-vs-lifecycle concurrency invariant.

How Has This Been Tested?

-race: TestReactor_SyncTime 0/50 (was 5/50), TestNodeNewSeedNode 0/200, TestReactor_NoBlockResponse 15/15, new TestStopReleasesHandlers (leaktest, red-first/green-after), full internal/p2p suite green with 0 deadlock panics; go build ./..., go vet, gofmt, golangci-lint clean. Adversarially reviewed in two rounds — the first caught a goroutine-leak regression, now fixed and locked down by the leaktest.

Breaking Changes

None — internal concurrency fixes, no API/behavior change for callers.

Notes / follow-ups (pre-existing, not introduced here)

  • Latent (now documented, not enforced): workerpool Send/Receive must not run concurrently with Reset/Run/Stop.
  • BaseService cannot truly restart (never resets quit); a second Start on a stopped instance no-ops. Pre-existing.

Checklist:

  • I have performed a self-review of my own code
  • I have commented my code, particularly in hard-to-understand areas
  • I have added or updated relevant unit/integration/functional/e2e tests
  • I have made corresponding changes to the documentation

For repository code-owners and collaborators only

  • I have assigned this pull request to a milestone

🤖 Co-authored by Claudius the Magnificent AI Agent

lklimek and others added 7 commits June 13, 2026 01:24
…r busy-spin

Reset() and Start() toggled the `stopped` atomic as separate, unsynchronized
operations, and workers are spawned only inside Start(). A Stop() interleaving
between the Reset->Start pair in Run() could make Start() early-return (the old
`if p.stopped.Swap(false) { return }`), leaving the pool with OPEN channels but
ZERO workers: jobs enqueue but never execute, every Receive() blocks forever, and
blocksync wedges.

- Add a single lifecycleMtx that serializes Reset/Start/Stop/Run so the
  Reset->Start pair in Run is atomic with respect to Stop. The mutex (the
  go-deadlock-aliased sync.Mutex already used in this file) is held only around
  state transitions; Stop's blocking waits (worker shutdown, wg.Wait) cannot
  self-deadlock because workers acquire no pool lock. Cold path (once per sync
  session), so tracking overhead is negligible.
- Remove Start's early-return; the invariant is now: after Run/Start returns the
  pool is either stopped OR has exactly initPoolSize live workers, never zero.
- synchronizer.runHandler now EXITS on ErrWorkerPoolStopped instead of logging
  and re-looping, killing the busy-spin / log-flood once the pool is stopping.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…expiry

The deadlock guards in PeerManager.Ready and Disconnected panicked when broadcast
returned an error matching context.DeadlineExceeded. But broadcast's only
DeadlineExceeded source is the caller's ctx.Err() — a benign expiry (e.g. a slow
CI runner blowing a test's WithTimeout). The REAL deadlock path (a subscriber
failing to drain within broadcastTimeout while the PeerManager mutex is held)
returned a plain fmt.Errorf the guard never matched. So the guard both
false-positived on benign expiry AND missed the case it was meant to catch.

- Introduce the errors.Is-able sentinel ErrBroadcastDeadlock and wrap it into the
  capacity-exhaustion error.
- Both guards now panic ONLY on ErrBroadcastDeadlock; caller ctx
  cancellation/expiry is logged at debug and skipped, no panic.
- Add regression tests: benign canceled-ctx broadcast does not panic; the
  sentinel is matchable and distinct from context.DeadlineExceeded.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Stop() called bs.cancel() — which closes the quit channel that Wait() blocks on —
BEFORE atomic.StoreUint32(&bs.running, 0). A goroutine in Wait() could therefore
unblock while IsRunning() still reported true, violating the documented contract
that "Wait blocks until the service is stopped".

Swap the two statements so running is set to 0 before cancel(), guaranteeing
IsRunning() is false by the time Wait() returns. OnStop() ordering is unchanged.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…-race wedge

OnStart spawns the producer/consumer goroutines (go runHandler ...) but
BaseService.Start only sets running=1 AFTER OnStart returns. A handler goroutine
scheduled in that window read IsRunning()==false and exited at birth, freezing
s.height at 1 forever — the dominant cause of TestReactor_SyncTime flakiness
("expected node to be partially synced" never satisfied). This is the start-side
twin of the BaseService.Stop ordering bug.

Loop on ctx.Err() == nil instead of s.IsRunning(). The context is already live
when OnStart runs, so there is no dependency on the running-flag timing. The two
exits remain correct and leak-free: full-reactor cancellation ends the loop via
ctx.Err(), and a manual synchronizer.Stop() ends it via the kept
ErrWorkerPoolStopped early-return (Send/Receive observe the stopped pool).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The assertion getLastSyncRate() > 0.001 raced the sync-rate filter: with the
default monitorInterval=100 the rate is only computed once the syncing node
reaches height ~101, so a slow run could blow the 10s window before the rate
became non-zero — a separate, rarer flake from the start-race wedge.

Add a WithSynchronizerOptions reactor option that forwards OptionFuncs to the
synchronizer built in OnStart, and use it in the test to set
WithMonitorInterval(10) so the rate computes after ~10 blocks. This makes the
assertion deterministic rather than widening the Eventually window.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The previous fix looped runHandler on the EXTERNAL reactor context, but OnStop
stops the worker pool without cancelling that context. On the common
switch-to-consensus path, poolRoutine calls synchronizer.Stop() while the parent
context is still live: consumeJobResult exits (Receive -> ErrWorkerPoolStopped),
but a caught-up produceJob keeps sleeping 50ms and returning nil — it never calls
Send, never sees ErrWorkerPoolStopped, and ctx.Err() stays nil. The producer
goroutine and its timer leaked for the entire consensus phase.

- OnStart derives s.ctx, s.cancel = context.WithCancel(ctx) and runs the worker
  pool + handlers under s.ctx. It is created live before the goroutines spawn, so
  the original goroutine start-race stays fixed without consulting IsRunning().
- OnStop calls s.cancel() before workerPool.Stop(), so cancellation releases both
  handlers even when the caller's context is still live.
- produceJob's idle wait now selects on ctx.Done() so a caught-up producer
  observes cancellation promptly instead of blocking on a bare sleep.

Add TestStopReleasesHandlers (leaktest) reproducing the exact leak: Stop() with a
live parent context and a caught-up job generator must release the handlers.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…riant

Send and Receive read jobCh/resultCh/doneCh locklessly while Reset/Run/Stop
reassign or close them under lifecycleMtx. This is safe today only because
callers serialize the lifecycle against in-flight Send/Receive (service
OnStart/OnStop). Document the invariant on WorkerPool so future callers do not
introduce a data race by overlapping them.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 13, 2026

Copy link
Copy Markdown
Contributor

Warning

Review limit reached

@lklimek, we couldn't start this review because you've reached your PR review rate limit.

More reviews will be available in 57 minutes and 26 seconds. Learn how PR review limits work.

Your organization has used up its prepaid credits, and credit purchases are no longer available. Enable the review add-on in the billing tab to keep reviews running — you're only billed for reviews past your plan's rate limits ($0.25/file).

⌛ How to resolve this issue?

After more reviews become available, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available.

Please see our Fair Usage Limits Policy for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 48885a17-fbb8-483b-b5ad-e2481a6dfaad

📥 Commits

Reviewing files that changed from the base of the PR and between 58912eb and d125f00.

📒 Files selected for processing (8)
  • internal/blocksync/reactor.go
  • internal/blocksync/reactor_test.go
  • internal/blocksync/synchronizer.go
  • internal/blocksync/synchronizer_test.go
  • internal/p2p/peermanager.go
  • internal/p2p/peermanager_test.go
  • libs/service/service.go
  • libs/workerpool/worker_pool.go
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/ci-flaky-tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@lklimek lklimek requested a review from Copilot June 13, 2026 19:35
@lklimek lklimek marked this pull request as ready for review June 13, 2026 19:35
@lklimek lklimek requested a review from QuantumExplorer as a code owner June 13, 2026 19:35

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses long-standing CI “flake” failures by fixing concurrency/lifecycle races across the workerpool, blocksync synchronizer/reactor, PeerManager broadcast deadlock detection, and BaseService shutdown ordering.

Changes:

  • Workerpool: serialize lifecycle transitions (Start/Run/Reset/Stop) under a dedicated mutex and document concurrency invariants.
  • Blocksync: drive handler goroutines from a synchronizer-owned cancelable context; make producer/consumer handlers exit cleanly on workerpool stop; add leak test and determinism hooks for monitor interval.
  • P2P/service: add an errors.Is()-matchable broadcast-deadlock sentinel and adjust BaseService.Stop ordering so Wait()/IsRunning() become consistent.

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
libs/workerpool/worker_pool.go Adds lifecycle mutex + helpers to prevent Stop/Run interleavings; documents invariants.
libs/service/service.go Ensures running=0 is stored before cancel so Wait() and IsRunning() align.
internal/p2p/peermanager.go Introduces ErrBroadcastDeadlock sentinel and wraps broadcast timeout errors.
internal/p2p/peermanager_test.go Adds coverage for benign ctx expiry vs deadlock sentinel behavior.
internal/blocksync/synchronizer.go Switches handlers to synchronizer-owned context; exits loops on pool stop; refactors handler signatures.
internal/blocksync/synchronizer_test.go Adds leaktest to lock down Stop releasing handler goroutines.
internal/blocksync/reactor.go Adds reactor option forwarding to synchronizer construction.
internal/blocksync/reactor_test.go Makes sync-rate assertion deterministic via monitor interval option.
Comments suppressed due to low confidence (1)

libs/workerpool/worker_pool.go:282

  • resetLocked assumes p.workers is already allocated and indexed up to initPoolSize. If Stop() is called before the first Start()/Run() (so workers were never initialized), stopped becomes true and a subsequent Reset()/Run() will panic with an index-out-of-range/nil slice when writing p.workers[i]. Reset should be safe regardless of whether workers were previously started.
// resetLocked re-initializes channels and workers. The caller must hold lifecycleMtx.
func (p *WorkerPool) resetLocked() {
	if !p.stopped.Swap(false) {
		return
	}
	p.doneCh = make(chan struct{})
	p.jobCh = make(chan *Job, p.initPoolSize)
	p.resultCh = make(chan Result, p.initPoolSize*2)
	for i := 0; i < p.initPoolSize; i++ {
		p.workers[i] = newWorker(i, p.jobCh, p.resultCh, p.doneCh, p.logger)
	}
}

Comment on lines 188 to +201
s.jobProgressCounter.Add(1)
job, err := s.jobGen.nextJob(ctx)
if err != nil {
s.logger.Error("cannot create a next job", "error", err)
return
return nil
}
err = s.workerPool.Send(ctx, job)
if err != nil {
if errors.Is(err, workerpool.ErrWorkerPoolStopped) {
return err
}
s.logger.Error("cannot add a job to worker-pool", "error", err)
}
return nil
Comment on lines +1696 to +1710
// A subscriber that never drains its channel.
sub := peerManager.Subscribe(ctx, "p2p")
_ = sub

added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID))

// Fill the subscription buffer exactly to capacity with non-blocking buffered
// sends. Capacity is the unexported broadcastSubscriptionChannelCapacity (3);
// Accepted does not broadcast, so three Ready calls fill the channel.
for i := 0; i < 3; i++ {
peerManager.Ready(ctx, a.NodeID, nil)
}
Comment on lines +390 to +402
checkLeaks := leaktest.CheckTimeout(suite.T(), 5*time.Second)

suite.Require().NoError(sync.Start(ctx))
suite.Require().Eventually(sync.IsRunning, time.Second, 5*time.Millisecond)
// Give the idling producer a few iterations before stopping.
time.Sleep(50 * time.Millisecond)

sync.Stop()

// With the parent ctx still live, both handler goroutines must have exited.
suite.Require().NoError(ctx.Err())
checkLeaks()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants