Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 106 additions & 13 deletions discovery/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"runtime/debug"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -1519,19 +1521,33 @@ func (d *AuthenticatedGossiper) networkHandler(ctx context.Context) {
// Channel announcement signatures are amongst the only
// messages that we'll process serially.
case *lnwire.AnnounceSignatures1:
emittedAnnouncements, _ := d.processNetworkAnnouncement(
ctx, announcement,
)
log.Debugf("Processed network message %s, "+
"returned len(announcements)=%v",
announcement.msg.MsgType(),
len(emittedAnnouncements))

if emittedAnnouncements != nil {
announcements.AddMsgs(
emittedAnnouncements...,
// Process in an anonymous function so we can
// recover from any panics without crashing the
// main networkHandler goroutine. We pass nil
// for jobID since AnnounceSignatures bypass the
// validation barrier.
func() {
defer d.finalizeGossipProcessing(
ctx, "processing",
announcement, nil,
)
}

//nolint:ll
emittedAnnouncements, _ := d.processNetworkAnnouncement(
ctx, announcement,
)
log.Debugf("Processed network "+
"message %s, returned "+
"len(announcements)=%v",
announcement.msg.MsgType(),
len(emittedAnnouncements))

if emittedAnnouncements != nil {
announcements.AddMsgs(
emittedAnnouncements...,
)
}
}()
continue
}

Expand Down Expand Up @@ -1614,7 +1630,7 @@ func (d *AuthenticatedGossiper) handleNetworkMessages(ctx context.Context,
nMsg *networkMsg, deDuped *deDupedAnnouncements, jobID JobID) {

defer d.wg.Done()
defer d.vb.CompleteJob()
defer d.finalizeGossipProcessing(ctx, "processing", nMsg, &jobID)

// We should only broadcast this message forward if it originated from
// us or it wasn't received as part of our initial historical sync.
Expand Down Expand Up @@ -1670,6 +1686,83 @@ func (d *AuthenticatedGossiper) handleNetworkMessages(ctx context.Context,
}
}

// finalizeGossipProcessing handles cleanup for gossip message processing,
// including job completion and panic recovery. It guards gossip goroutines
// against panics to keep the daemon alive. On panic, it logs the error,
// signals dependents, and reports back to the caller if possible.
//
// NOTE: This function MUST be called via defer to recover from panics.
func (d *AuthenticatedGossiper) finalizeGossipProcessing(logCtx context.Context,
ctxStr string, nMsg *networkMsg, jobID *JobID) {

// Always complete the job when provided, regardless of panic state.
// This ensures job slots are returned even if callers forget or
// misordering occurs.
if jobID != nil {
d.vb.CompleteJob()
}

r := recover()
if r == nil {
return
}

msgType := "unknown"
if nMsg != nil && nMsg.msg != nil {
msgType = nMsg.msg.MsgType().String()
}

var peerPub string
if nMsg != nil && nMsg.peer != nil {
peerPub = route.NewVertex(nMsg.peer.IdentityKey()).String()
} else {
peerPub = "unknown"
}

log.ErrorS(logCtx, "Panic during gossip message processing",
fmt.Errorf("%v", r),
slog.String("context", ctxStr),
slog.String("msg_type", msgType),
slog.String("peer", peerPub),
)
// Truncate the stack trace to avoid filling up disk space if an
// attacker repeatedly triggers panics.
const maxStackSize = 8192
stack := debug.Stack()
if len(stack) > maxStackSize {
stack = stack[:maxStackSize]
}
log.DebugS(logCtx, "Panic stack trace",
slog.String("stack", string(stack)),
)

// Signal any dependents waiting on this message so they don't block
// forever.
if nMsg != nil && nMsg.msg != nil && jobID != nil {
if err := d.vb.SignalDependents(
nMsg.msg, *jobID,
); err != nil {
log.ErrorS(logCtx, "SignalDependents after panic failed",
err,
slog.String("msg_type", nMsg.msg.MsgType().String()),
)
}
}

// Send an error back to the caller if possible.
if nMsg != nil && nMsg.err != nil {
select {
case nMsg.err <- fmt.Errorf("panic while %s gossip "+
"message %s: %v", ctxStr, msgType, r):
default:
log.WarnS(logCtx, "Unable to send panic error, "+
"error channel blocked", nil,
slog.String("msg_type", msgType),
)
}
}
}

// TODO(roasbeef): d/c peers that send updates not on our chain

// InitSyncState is called by outside sub-systems when a connection is
Expand Down
Loading
Loading