diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 96a8c047ae3..976f2ccfd44 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -5,6 +5,8 @@ import ( "context" "errors" "fmt" + "log/slog" + "runtime/debug" "strings" "sync" "sync/atomic" @@ -1519,19 +1521,33 @@ func (d *AuthenticatedGossiper) networkHandler(ctx context.Context) { // Channel announcement signatures are amongst the only // messages that we'll process serially. case *lnwire.AnnounceSignatures1: - emittedAnnouncements, _ := d.processNetworkAnnouncement( - ctx, announcement, - ) - log.Debugf("Processed network message %s, "+ - "returned len(announcements)=%v", - announcement.msg.MsgType(), - len(emittedAnnouncements)) - - if emittedAnnouncements != nil { - announcements.AddMsgs( - emittedAnnouncements..., + // Process in an anonymous function so we can + // recover from any panics without crashing the + // main networkHandler goroutine. We pass nil + // for jobID since AnnounceSignatures bypass the + // validation barrier. + func() { + defer d.finalizeGossipProcessing( + ctx, "processing", + announcement, nil, ) - } + + //nolint:ll + emittedAnnouncements, _ := d.processNetworkAnnouncement( + ctx, announcement, + ) + log.Debugf("Processed network "+ + "message %s, returned "+ + "len(announcements)=%v", + announcement.msg.MsgType(), + len(emittedAnnouncements)) + + if emittedAnnouncements != nil { + announcements.AddMsgs( + emittedAnnouncements..., + ) + } + }() continue } @@ -1614,7 +1630,7 @@ func (d *AuthenticatedGossiper) handleNetworkMessages(ctx context.Context, nMsg *networkMsg, deDuped *deDupedAnnouncements, jobID JobID) { defer d.wg.Done() - defer d.vb.CompleteJob() + defer d.finalizeGossipProcessing(ctx, "processing", nMsg, &jobID) // We should only broadcast this message forward if it originated from // us or it wasn't received as part of our initial historical sync. @@ -1670,6 +1686,83 @@ func (d *AuthenticatedGossiper) handleNetworkMessages(ctx context.Context, } } +// finalizeGossipProcessing handles cleanup for gossip message processing, +// including job completion and panic recovery. It guards gossip goroutines +// against panics to keep the daemon alive. On panic, it logs the error, +// signals dependents, and reports back to the caller if possible. +// +// NOTE: This function MUST be called via defer to recover from panics. +func (d *AuthenticatedGossiper) finalizeGossipProcessing(logCtx context.Context, + ctxStr string, nMsg *networkMsg, jobID *JobID) { + + // Always complete the job when provided, regardless of panic state. + // This ensures job slots are returned even if callers forget or + // misordering occurs. + if jobID != nil { + d.vb.CompleteJob() + } + + r := recover() + if r == nil { + return + } + + msgType := "unknown" + if nMsg != nil && nMsg.msg != nil { + msgType = nMsg.msg.MsgType().String() + } + + var peerPub string + if nMsg != nil && nMsg.peer != nil { + peerPub = route.NewVertex(nMsg.peer.IdentityKey()).String() + } else { + peerPub = "unknown" + } + + log.ErrorS(logCtx, "Panic during gossip message processing", + fmt.Errorf("%v", r), + slog.String("context", ctxStr), + slog.String("msg_type", msgType), + slog.String("peer", peerPub), + ) + // Truncate the stack trace to avoid filling up disk space if an + // attacker repeatedly triggers panics. + const maxStackSize = 8192 + stack := debug.Stack() + if len(stack) > maxStackSize { + stack = stack[:maxStackSize] + } + log.DebugS(logCtx, "Panic stack trace", + slog.String("stack", string(stack)), + ) + + // Signal any dependents waiting on this message so they don't block + // forever. + if nMsg != nil && nMsg.msg != nil && jobID != nil { + if err := d.vb.SignalDependents( + nMsg.msg, *jobID, + ); err != nil { + log.ErrorS(logCtx, "SignalDependents after panic failed", + err, + slog.String("msg_type", nMsg.msg.MsgType().String()), + ) + } + } + + // Send an error back to the caller if possible. + if nMsg != nil && nMsg.err != nil { + select { + case nMsg.err <- fmt.Errorf("panic while %s gossip "+ + "message %s: %v", ctxStr, msgType, r): + default: + log.WarnS(logCtx, "Unable to send panic error, "+ + "error channel blocked", nil, + slog.String("msg_type", msgType), + ) + } + } +} + // TODO(roasbeef): d/c peers that send updates not on our chain // InitSyncState is called by outside sub-systems when a connection is diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 01faf5561c4..42e071629a0 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -4855,3 +4855,312 @@ func assertChanChainRejection(t *testing.T, ctx *testCtx, require.NoError(t, err) require.True(t, isZombie, "edge should be marked as zombie") } + +// TestRecoverGossipPanic tests that the finalizeGossipProcessing function +// correctly handles panics in gossip goroutines by recovering, logging, and +// sending errors back to callers. +func TestRecoverGossipPanic(t *testing.T) { + t.Parallel() + + testCases := []struct { + name string + setupMsg func() (*networkMsg, chan error) + checkError bool + }{ + { + name: "panic with full message context", + setupMsg: func() (*networkMsg, chan error) { + errChan := make(chan error, 1) + return &networkMsg{ + msg: &lnwire.ChannelUpdate1{ + Timestamp: testTimestamp, + }, + peer: &mockPeer{ + remoteKeyPub1, nil, nil, + atomic.Bool{}, + }, + err: errChan, + }, errChan + }, + checkError: true, + }, + { + name: "panic with nil message", + setupMsg: func() (*networkMsg, chan error) { + errChan := make(chan error, 1) + return &networkMsg{ + msg: nil, + peer: nil, + err: errChan, + }, errChan + }, + checkError: true, + }, + { + name: "panic with nil error channel", + setupMsg: func() (*networkMsg, chan error) { + return &networkMsg{ + msg: &lnwire.ChannelUpdate1{ + Timestamp: testTimestamp, + }, + peer: nil, + err: nil, + }, nil + }, + checkError: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + ctx, err := createTestCtx(t, proofMatureDelta, false) + require.NoError(t, err) + + nMsg, errChan := tc.setupMsg() + + // Initialize a proper job so CompleteJob has a slot + // to return. + var jobIDRef *JobID + if nMsg.msg != nil { + job, err := ctx.gossiper.vb.InitJobDependencies( + nMsg.msg, + ) + require.NoError(t, err) + jobIDRef = &job + } + + // Create a function that will panic and then recover. + panicked := make(chan struct{}) + go func() { + defer ctx.gossiper.finalizeGossipProcessing( + context.Background(), "testing", + nMsg, jobIDRef, + ) + defer close(panicked) + + panic("test panic") + }() + + // Wait for the goroutine to complete. + select { + case <-panicked: + case <-time.After(time.Second): + t.Fatal("timeout waiting for panic recovery") + } + + // If we expect an error to be sent back, verify it. + if tc.checkError { + require.NotNil(t, errChan, "test expects "+ + "error but errChan is nil") + } + if tc.checkError && errChan != nil { + select { + case err := <-errChan: + require.Error(t, err) + require.Contains( + t, err.Error(), "panic while", + ) + require.Contains( + t, err.Error(), "test panic", + ) + case <-time.After(time.Second): + t.Fatal("timeout waiting for error") + } + } + }) + } +} + +// TestRecoverGossipPanicBlockedErrorChannel verifies that the panic recovery +// does not hang when the error channel is unbuffered and not being read from. +// The recovery should use a non-blocking send with a default case. +func TestRecoverGossipPanicBlockedErrorChannel(t *testing.T) { + t.Parallel() + + ctx, err := createTestCtx(t, proofMatureDelta, false) + require.NoError(t, err) + + // Create an UNBUFFERED channel and don't read from it. + errChan := make(chan error) + + nMsg := &networkMsg{ + msg: &lnwire.ChannelUpdate1{Timestamp: testTimestamp}, + peer: &mockPeer{remoteKeyPub1, nil, nil, atomic.Bool{}}, + err: errChan, + } + + // Initialize a proper job so CompleteJob has a slot to return. + jobID, err := ctx.gossiper.vb.InitJobDependencies(nMsg.msg) + require.NoError(t, err) + + panicked := make(chan struct{}) + go func() { + defer ctx.gossiper.finalizeGossipProcessing( + context.Background(), "testing", nMsg, &jobID, + ) + defer close(panicked) + + panic("test panic") + }() + + // Should not hang - the default case should handle blocked channel. + select { + case <-panicked: + // Success - didn't hang. + case <-time.After(time.Second): + t.Fatal("panic recovery hung on blocked error channel") + } +} + +// TestRecoverGossipPanicSignalsDependents verifies that when a parent job +// panics during gossip processing, the panic recovery correctly signals +// dependent jobs via the validation barrier so they don't block forever. +func TestRecoverGossipPanicSignalsDependents(t *testing.T) { + t.Parallel() + + ctx, err := createTestCtx(t, proofMatureDelta, false) + require.NoError(t, err) + + // Create a channel announcement directly without mocks. We only need + // it to register with the validation barrier. + chanAnn := &lnwire.ChannelAnnouncement1{ + ShortChannelID: lnwire.NewShortChanIDFromInt(12345), + NodeID1: [33]byte{0x02}, + NodeID2: [33]byte{0x03}, + } + + // Register the channel announcement as a parent job. + parentJobID, err := ctx.gossiper.vb.InitJobDependencies(chanAnn) + require.NoError(t, err) + + // Create a channel update that depends on this channel announcement. + // Channel updates wait for their parent channel announcement. + chanUpdate := &lnwire.ChannelUpdate1{ + ShortChannelID: chanAnn.ShortChannelID, + Timestamp: testTimestamp, + } + + // Register the channel update as a child job. + childJobID, err := ctx.gossiper.vb.InitJobDependencies(chanUpdate) + require.NoError(t, err) + + // Start a goroutine that waits for the parent job to complete. + childDone := make(chan error, 1) + go func() { + err := ctx.gossiper.vb.WaitForParents(childJobID, chanUpdate) + childDone <- err + }() + + // Give the child goroutine time to start waiting. + time.Sleep(50 * time.Millisecond) + + // Now simulate the parent job panicking and recovering. + // The recovery should call SignalDependents. + errChan := make(chan error, 1) + nMsg := &networkMsg{ + msg: chanAnn, + peer: &mockPeer{ + remoteKeyPub1, nil, nil, atomic.Bool{}, + }, + err: errChan, + } + + panicked := make(chan struct{}) + go func() { + defer ctx.gossiper.finalizeGossipProcessing( + context.Background(), "testing", nMsg, &parentJobID, + ) + defer close(panicked) + + panic("parent job panic") + }() + + // Wait for the panic to be recovered. + select { + case <-panicked: + case <-time.After(time.Second): + t.Fatal("timeout waiting for panic recovery") + } + + // Verify error was sent back on the parent's error channel. + select { + case err := <-errChan: + require.Error(t, err) + require.Contains(t, err.Error(), "panic while") + require.Contains(t, err.Error(), "parent job panic") + case <-time.After(time.Second): + t.Fatal("timeout waiting for error on parent") + } + + // The child job should now be unblocked because SignalDependents + // was called during panic recovery. + select { + case err := <-childDone: + // Child should complete without error (or with nil if + // parent jobs are now empty). + require.NoError(t, err) + case <-time.After(2 * time.Second): + t.Fatal("child job still blocked - SignalDependents " + + "did not unblock waiting jobs") + } + + // Clean up the child job. The parent job was already completed by + // finalizeGossipProcessing. + ctx.gossiper.vb.CompleteJob() +} + +// TestRecoverGossipPanicNilJobID verifies that panic recovery works correctly +// when jobID is nil (e.g., for AnnounceSignatures which bypass the validation +// barrier). +func TestRecoverGossipPanicNilJobID(t *testing.T) { + t.Parallel() + + ctx, err := createTestCtx(t, proofMatureDelta, false) + require.NoError(t, err) + + // Create an announce signatures message (these bypass validation + // barrier and thus have nil jobID in the recovery path). + annSigs := &lnwire.AnnounceSignatures1{ + ShortChannelID: lnwire.NewShortChanIDFromInt(12345), + } + + errChan := make(chan error, 1) + nMsg := &networkMsg{ + msg: annSigs, + peer: &mockPeer{ + remoteKeyPub1, nil, nil, atomic.Bool{}, + }, + err: errChan, + } + + // Call finalizeGossipProcessing with nil jobID (simulating the + // AnnounceSignatures serial processing path). + panicked := make(chan struct{}) + go func() { + defer ctx.gossiper.finalizeGossipProcessing( + context.Background(), "processing", nMsg, nil, + ) + defer close(panicked) + + panic("announce signatures panic") + }() + + // Wait for panic recovery. + select { + case <-panicked: + case <-time.After(time.Second): + t.Fatal("timeout waiting for panic recovery") + } + + // Verify error was sent back. + select { + case err := <-errChan: + require.Error(t, err) + require.Contains(t, err.Error(), "panic while") + require.Contains(t, err.Error(), "announce signatures panic") + case <-time.After(time.Second): + t.Fatal("timeout waiting for error") + } +} diff --git a/docs/release-notes/release-notes-0.20.1.md b/docs/release-notes/release-notes-0.20.1.md index 7cb46cff59c..d56ff26ad82 100644 --- a/docs/release-notes/release-notes-0.20.1.md +++ b/docs/release-notes/release-notes-0.20.1.md @@ -73,6 +73,13 @@ # Improvements ## Functional Updates +* [Added panic recovery](https://github.com/lightningnetwork/lnd/pull/10470) to + the gossiper's message processing goroutines. This increases the robustness + of the gossiper subsystem by allowing it to continue operating even if a + logic error causes a panic during message processing. The recovery mechanism + ensures dependencies are properly freed and logs the panic trace for + debugging. + ## RPC Updates * The `EstimateRouteFee` RPC now implements an [LSP detection