Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 33 additions & 8 deletions discovery/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,15 +437,19 @@ func (c *cachedNetworkMsg) Size() (uint64, error) {
// rejectCacheKey is the cache key that we'll use to track announcements we've
// recently rejected.
type rejectCacheKey struct {
pubkey [33]byte
chanID uint64
gossipVersion lnwire.GossipVersion
pubkey [33]byte
chanID uint64
}

// newRejectCacheKey returns a new cache key for the reject cache.
func newRejectCacheKey(cid uint64, pub [33]byte) rejectCacheKey {
func newRejectCacheKey(v lnwire.GossipVersion, cid uint64,
pub [33]byte) rejectCacheKey {

k := rejectCacheKey{
chanID: cid,
pubkey: pub,
gossipVersion: v,
chanID: cid,
pubkey: pub,
}

return k
Expand Down Expand Up @@ -1688,8 +1692,15 @@ func (d *AuthenticatedGossiper) PruneSyncState(peer route.Vertex) {
func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message,
peerPub [33]byte) bool {

// We only cache rejections for gossip messages. So if it is not
// a gossip message, we return false.
gMsg, ok := msg.(lnwire.GossipMessage)
if !ok {
return false
}

var scid uint64
switch m := msg.(type) {
switch m := gMsg.(type) {
case *lnwire.ChannelUpdate1:
scid = m.ShortChannelID.ToUint64()

Expand All @@ -1700,8 +1711,11 @@ func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message,
return false
}

_, err := d.recentRejects.Get(newRejectCacheKey(scid, peerPub))
return err != cache.ErrElementNotFound
_, err := d.recentRejects.Get(newRejectCacheKey(
gMsg.GossipVersion(), scid, peerPub,
))

return !errors.Is(err, cache.ErrElementNotFound)
}

// retransmitStaleAnns examines all outgoing channels that the source node is
Expand Down Expand Up @@ -2571,6 +2585,7 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(ctx context.Context,
log.Errorf(err.Error())

key := newRejectCacheKey(
ann.GossipVersion(),
scid.ToUint64(),
sourceToPub(nMsg.source),
)
Expand All @@ -2588,6 +2603,7 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(ctx context.Context,
log.Errorf(err.Error())

key := newRejectCacheKey(
ann.GossipVersion(),
scid.ToUint64(),
sourceToPub(nMsg.source),
)
Expand Down Expand Up @@ -2662,6 +2678,7 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(ctx context.Context,
"%v", err)

key := newRejectCacheKey(
ann.GossipVersion(),
scid.ToUint64(),
sourceToPub(nMsg.source),
)
Expand Down Expand Up @@ -2741,6 +2758,7 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(ctx context.Context,
errors.Is(err, ErrInvalidFundingOutput):

key := newRejectCacheKey(
ann.GossipVersion(),
scid.ToUint64(),
sourceToPub(nMsg.source),
)
Expand All @@ -2750,6 +2768,7 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(ctx context.Context,

case errors.Is(err, ErrChannelSpent):
key := newRejectCacheKey(
ann.GossipVersion(),
scid.ToUint64(),
sourceToPub(nMsg.source),
)
Expand All @@ -2776,6 +2795,7 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(ctx context.Context,
// edge. We won't increase the ban score for the
// remote peer.
key := newRejectCacheKey(
ann.GossipVersion(),
scid.ToUint64(),
sourceToPub(nMsg.source),
)
Expand Down Expand Up @@ -2839,6 +2859,7 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(ctx context.Context,
anns, rErr := d.processRejectedEdge(ctx, ann, proof)
if rErr != nil {
key := newRejectCacheKey(
ann.GossipVersion(),
scid.ToUint64(),
sourceToPub(nMsg.source),
)
Expand Down Expand Up @@ -2866,6 +2887,7 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(ctx context.Context,

// Otherwise, this is just a regular rejected edge.
key := newRejectCacheKey(
ann.GossipVersion(),
scid.ToUint64(),
sourceToPub(nMsg.source),
)
Expand Down Expand Up @@ -2998,6 +3020,7 @@ func (d *AuthenticatedGossiper) handleChanUpdate(ctx context.Context,
log.Errorf(err.Error())

key := newRejectCacheKey(
upd.GossipVersion(),
upd.ShortChannelID.ToUint64(),
sourceToPub(nMsg.source),
)
Expand Down Expand Up @@ -3178,6 +3201,7 @@ func (d *AuthenticatedGossiper) handleChanUpdate(ctx context.Context,
nMsg.err <- err

key := newRejectCacheKey(
upd.GossipVersion(),
upd.ShortChannelID.ToUint64(),
sourceToPub(nMsg.source),
)
Expand Down Expand Up @@ -3307,6 +3331,7 @@ func (d *AuthenticatedGossiper) handleChanUpdate(ctx context.Context,
// Since we know the stored SCID in the graph, we'll
// cache that SCID.
key := newRejectCacheKey(
upd.GossipVersion(),
chanInfo.ChannelID,
sourceToPub(nMsg.source),
)
Expand Down
1 change: 1 addition & 0 deletions discovery/gossiper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4717,6 +4717,7 @@ func TestChanAnnBanningNonChanPeer(t *testing.T) {

// Remove the scid from the reject cache.
key := newRejectCacheKey(
ca.GossipVersion(),
ca.ShortChannelID.ToUint64(),
sourceToPub(nodePeer2.IdentityKey()),
)
Expand Down
16 changes: 9 additions & 7 deletions graph/db/sql_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ func migrateSourceNode(ctx context.Context, kvdb kvdb.Backend,
id, err := sqlDB.GetNodeIDByPubKey(
ctx, sqlc.GetNodeIDByPubKeyParams{
PubKey: pub[:],
Version: int16(ProtocolV1),
Version: int16(lnwire.GossipVersion1),
},
)
if err != nil {
Expand All @@ -441,7 +441,9 @@ func migrateSourceNode(ctx context.Context, kvdb kvdb.Backend,
// from the SQL database and checking that the expected DB ID and
// pub key are returned. We don't need to do a whole node comparison
// here, as this was already done in the previous migration step.
srcNodes, err := sqlDB.GetSourceNodesByVersion(ctx, int16(ProtocolV1))
srcNodes, err := sqlDB.GetSourceNodesByVersion(
ctx, int16(lnwire.GossipVersion1),
)
if err != nil {
return fmt.Errorf("could not get source nodes from SQL "+
"store: %w", err)
Expand Down Expand Up @@ -1251,7 +1253,7 @@ func migrateZombieIndex(ctx context.Context, cfg *sqldb.QueryConfig,
// Batch fetch all zombie channels from the database.
rows, err := sqlDB.GetZombieChannelsSCIDs(
ctx, sqlc.GetZombieChannelsSCIDsParams{
Version: int16(ProtocolV1),
Version: int16(lnwire.GossipVersion1),
Scids: scids,
},
)
Expand Down Expand Up @@ -1327,7 +1329,7 @@ func migrateZombieIndex(ctx context.Context, cfg *sqldb.QueryConfig,

err = sqlDB.UpsertZombieChannel(
ctx, sqlc.UpsertZombieChannelParams{
Version: int16(ProtocolV1),
Version: int16(lnwire.GossipVersion1),
Scid: chanIDB,
NodeKey1: pubKey1[:],
NodeKey2: pubKey2[:],
Expand Down Expand Up @@ -1443,7 +1445,7 @@ func insertNodeSQLMig(ctx context.Context, db SQLQueries,
node *models.Node) (int64, error) {

params := sqlc.InsertNodeMigParams{
Version: int16(ProtocolV1),
Version: int16(lnwire.GossipVersion1),
PubKey: node.PubKeyBytes[:],
}

Expand Down Expand Up @@ -1564,7 +1566,7 @@ func insertChannelMig(ctx context.Context, db SQLQueries,
}

createParams := sqlc.InsertChannelMigParams{
Version: int16(ProtocolV1),
Version: int16(lnwire.GossipVersion1),
Scid: channelIDToBytes(edge.ChannelID),
NodeID1: node1DBID,
NodeID2: node2DBID,
Expand Down Expand Up @@ -1655,7 +1657,7 @@ func insertChanEdgePolicyMig(ctx context.Context, tx SQLQueries,
})

id, err := tx.InsertEdgePolicyMig(ctx, sqlc.InsertEdgePolicyMigParams{
Version: int16(ProtocolV1),
Version: int16(lnwire.GossipVersion1),
ChannelID: dbChan.channelID,
NodeID: nodeID,
Timelock: int32(edge.TimeLockDelta),
Expand Down
Loading
Loading