diff --git a/channeldb/db.go b/channeldb/db.go index 00b29f65f9f..91f188628b6 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -1363,11 +1363,7 @@ func (c *ChannelStateDB) FetchClosedChannelForID(cid lnwire.ChannelID) ( // the pending funds in a channel that has been forcibly closed have been // swept. func (c *ChannelStateDB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error { - var ( - openChannels []*OpenChannel - pruneLinkNode *btcec.PublicKey - ) - err := kvdb.Update(c.backend, func(tx kvdb.RwTx) error { + return kvdb.Update(c.backend, func(tx kvdb.RwTx) error { var b bytes.Buffer if err := graphdb.WriteOutpoint(&b, chanPoint); err != nil { return err @@ -1413,44 +1409,72 @@ func (c *ChannelStateDB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error { // other open channels with this peer. If we don't we'll // garbage collect it to ensure we don't establish persistent // connections to peers without open channels. - pruneLinkNode = chanSummary.RemotePub - openChannels, err = c.fetchOpenChannels( - tx, pruneLinkNode, - ) + remotePub := chanSummary.RemotePub + openChannels, err := c.fetchOpenChannels(tx, remotePub) if err != nil { return fmt.Errorf("unable to fetch open channels for "+ "peer %x: %v", - pruneLinkNode.SerializeCompressed(), err) + remotePub.SerializeCompressed(), err) } - return nil - }, func() { - openChannels = nil - pruneLinkNode = nil - }) - if err != nil { - return err - } + if len(openChannels) > 0 { + return nil + } + + // If there are no open channels with this peer, prune the + // link node. We do this within the same transaction to avoid + // a race condition where a new channel could be opened + // between this check and the deletion. + log.Infof("Pruning link node %x with zero open "+ + "channels from database", + remotePub.SerializeCompressed()) + + err = deleteLinkNode(tx, remotePub) + if err != nil { + return fmt.Errorf("unable to delete link "+ + "node: %w", err) + } - // Decide whether we want to remove the link node, based upon the number - // of still open channels. - return c.pruneLinkNode(openChannels, pruneLinkNode) + return nil + }, func() {}) } // pruneLinkNode determines whether we should garbage collect a link node from -// the database due to no longer having any open channels with it. If there are -// any left, then this acts as a no-op. -func (c *ChannelStateDB) pruneLinkNode(openChannels []*OpenChannel, - remotePub *btcec.PublicKey) error { +// the database due to no longer having any open channels with it. +// +// NOTE: This function should be called after an initial check shows no open +// channels exist. It will double-check within a write transaction to avoid a +// race condition where a channel could be opened between the initial check +// and the deletion. +func (c *ChannelStateDB) pruneLinkNode(remotePub *btcec.PublicKey) error { + return kvdb.Update(c.backend, func(tx kvdb.RwTx) error { + // Double-check for open channels to avoid deleting a link node + // if a channel was opened since the caller's initial check. + // + // NOTE: This avoids a race condition where a channel could be + // opened between the initial check and the deletion. + openChannels, err := c.fetchOpenChannels(tx, remotePub) + if err != nil { + return err + } - if len(openChannels) > 0 { - return nil - } + // If channels exist now, don't prune. + if len(openChannels) > 0 { + return nil + } - log.Infof("Pruning link node %x with zero open channels from database", - remotePub.SerializeCompressed()) + // No open channels, safe to prune the link node. + log.Infof("Pruning link node %x with zero open channels "+ + "from database", + remotePub.SerializeCompressed()) - return c.linkNodeDB.DeleteLinkNode(remotePub) + err = deleteLinkNode(tx, remotePub) + if err != nil { + return fmt.Errorf("unable to prune link node: %w", err) + } + + return nil + }, func() {}) } // PruneLinkNodes attempts to prune all link nodes found within the database @@ -1479,12 +1503,103 @@ func (c *ChannelStateDB) PruneLinkNodes() error { return err } - err = c.pruneLinkNode(openChannels, linkNode.IdentityPub) + if len(openChannels) > 0 { + continue + } + + err = c.pruneLinkNode(linkNode.IdentityPub) + if err != nil { + return err + } + } + + return nil +} + +// RepairLinkNodes scans all channels in the database and ensures that a +// link node exists for each remote peer. This should be called on startup to +// ensure that our database is consistent. +// +// NOTE: This function is designed to repair database inconsistencies that may +// have occurred due to the race condition in link node pruning (where link +// nodes could be incorrectly deleted while channels still existed). This can +// be removed once we move to native sql. +func (c *ChannelStateDB) RepairLinkNodes(network wire.BitcoinNet) error { + // In a single read transaction, build a list of all peers with open + // channels and check which ones are missing link nodes. + var missingPeers []*btcec.PublicKey + + err := kvdb.View(c.backend, func(tx kvdb.RTx) error { + openChanBucket := tx.ReadBucket(openChannelBucket) + if openChanBucket == nil { + return ErrNoActiveChannels + } + + var peersWithChannels []*btcec.PublicKey + + err := openChanBucket.ForEach(func(nodePubBytes, + _ []byte) error { + + nodePub, err := btcec.ParsePubKey(nodePubBytes) + if err != nil { + return err + } + + channels, err := c.fetchOpenChannels(tx, nodePub) + if err != nil { + return err + } + + if len(channels) > 0 { + peersWithChannels = append( + peersWithChannels, nodePub, + ) + } + + return nil + }) if err != nil { return err } + + // Now check which peers are missing link nodes within the + // same transaction. + missingPeers, err = c.linkNodeDB.FindMissingLinkNodes( + tx, peersWithChannels, + ) + + return err + }, func() { + missingPeers = nil + }) + if err != nil && !errors.Is(err, ErrNoActiveChannels) { + return fmt.Errorf("unable to fetch channels: %w", err) } + // Early exit if no repairs needed. + if len(missingPeers) == 0 { + return nil + } + + // Create all missing link nodes in a single write transaction + // using the LinkNodeDB abstraction. + linkNodesToCreate := make([]*LinkNode, 0, len(missingPeers)) + for _, remotePub := range missingPeers { + linkNode := NewLinkNode(c.linkNodeDB, network, remotePub) + linkNodesToCreate = append(linkNodesToCreate, linkNode) + + log.Infof("Repairing missing link node for peer %x", + remotePub.SerializeCompressed()) + } + + err = c.linkNodeDB.CreateLinkNodes(nil, linkNodesToCreate) + if err != nil { + return err + } + + log.Infof("Repaired %d missing link nodes on startup", + len(missingPeers)) + return nil } diff --git a/channeldb/nodes.go b/channeldb/nodes.go index b17d5c360d5..70f6fad8beb 100644 --- a/channeldb/nodes.go +++ b/channeldb/nodes.go @@ -2,6 +2,8 @@ package channeldb import ( "bytes" + "errors" + "fmt" "io" "net" "time" @@ -134,6 +136,95 @@ type LinkNodeDB struct { backend kvdb.Backend } +// FindMissingLinkNodes checks which of the provided public keys do not have +// corresponding link nodes in the database. If tx is nil, a new read +// transaction will be created. Otherwise, the provided transaction is used, +// allowing this to be part of a larger batch operation. +func (l *LinkNodeDB) FindMissingLinkNodes(tx kvdb.RTx, + pubKeys []*btcec.PublicKey) ([]*btcec.PublicKey, error) { + + var missing []*btcec.PublicKey + + findMissing := func(readTx kvdb.RTx) error { + nodeMetaBucket := readTx.ReadBucket(nodeInfoBucket) + if nodeMetaBucket == nil { + // If the bucket doesn't exist, all peers are missing. + missing = pubKeys + return nil + } + + for _, pubKey := range pubKeys { + _, err := fetchLinkNode(readTx, pubKey) + if err == nil { + // Link node exists. + continue + } + + if !errors.Is(err, ErrNodeNotFound) { + return fmt.Errorf("unable to check link node "+ + "for peer %x: %w", + pubKey.SerializeCompressed(), err) + } + + // Link node doesn't exist. + missing = append(missing, pubKey) + } + + return nil + } + + // If no transaction provided, create our own. + if tx == nil { + err := kvdb.View(l.backend, findMissing, func() { + missing = nil + }) + + return missing, err + } + + // Use the provided transaction. + err := findMissing(tx) + + return missing, err +} + +// CreateLinkNodes creates multiple link nodes. If tx is nil, a new write +// transaction will be created. Otherwise, the provided transaction is used, +// allowing this to be part of a larger batch operation. +func (l *LinkNodeDB) CreateLinkNodes(tx kvdb.RwTx, + linkNodes []*LinkNode) error { + + createNodes := func(writeTx kvdb.RwTx) error { + nodeMetaBucket, err := writeTx.CreateTopLevelBucket( + nodeInfoBucket, + ) + if err != nil { + return err + } + + for _, linkNode := range linkNodes { + err := putLinkNode(nodeMetaBucket, linkNode) + if err != nil { + pubKey := linkNode.IdentityPub. + SerializeCompressed() + + return fmt.Errorf("unable to create link "+ + "node for peer %x: %w", pubKey, err) + } + } + + return nil + } + + // If no transaction provided, create our own. + if tx == nil { + return kvdb.Update(l.backend, createNodes, func() {}) + } + + // Use the provided transaction. + return createNodes(tx) +} + // DeleteLinkNode removes the link node with the given identity from the // database. func (l *LinkNodeDB) DeleteLinkNode(identity *btcec.PublicKey) error { diff --git a/channeldb/nodes_test.go b/channeldb/nodes_test.go index b54cf0045bd..a88e452282d 100644 --- a/channeldb/nodes_test.go +++ b/channeldb/nodes_test.go @@ -8,6 +8,7 @@ import ( "github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/kvdb" "github.com/stretchr/testify/require" ) @@ -129,3 +130,245 @@ func TestDeleteLinkNode(t *testing.T) { t.Fatal("should not have found link node in db, but did") } } + +// TestRepairLinkNodes tests that the RepairLinkNodes function correctly +// identifies and repairs missing link nodes for channels that exist in the +// database. +func TestRepairLinkNodes(t *testing.T) { + t.Parallel() + + fullDB, err := MakeTestDB(t) + require.NoError(t, err, "unable to make test database") + + cdb := fullDB.ChannelStateDB() + + // Create a test channel and save it to the database. + channel1 := createTestChannel(t, cdb) + + // Manually create a link node for the channel. + linkNode1 := NewLinkNode( + cdb.linkNodeDB, wire.MainNet, channel1.IdentityPub, + ) + err = linkNode1.Sync() + require.NoError(t, err, "unable to sync link node") + + // Verify that link node was created. + fetchedLinkNode, err := cdb.linkNodeDB.FetchLinkNode( + channel1.IdentityPub, + ) + require.NoError(t, err, "link node should exist") + require.NotNil(t, fetchedLinkNode, "link node should not be nil") + + // Now, manually delete one of the link nodes to simulate the race + // condition scenario where a link node was incorrectly pruned. + err = cdb.linkNodeDB.DeleteLinkNode(channel1.IdentityPub) + require.NoError(t, err, "unable to delete link node") + + // Verify the link node is gone. + _, err = cdb.linkNodeDB.FetchLinkNode(channel1.IdentityPub) + require.ErrorIs( + t, err, ErrNodeNotFound, + "link node should be deleted", + ) + + // Now run the repair function with the correct network. + err = cdb.RepairLinkNodes(wire.MainNet) + require.NoError(t, err, "repair should succeed") + + // Verify that the link node has been restored. + repairedLinkNode, err := cdb.linkNodeDB.FetchLinkNode( + channel1.IdentityPub, + ) + require.NoError(t, err, "repaired link node should exist") + require.NotNil( + t, repairedLinkNode, "repaired link node should not be nil", + ) + require.Equal( + t, wire.MainNet, repairedLinkNode.Network, + "repaired link node should have correct network", + ) + + // Run repair again - it should be idempotent and not fail. + err = cdb.RepairLinkNodes(wire.MainNet) + require.NoError(t, err, "second repair should succeed") + + // Test with different network to ensure network parameter is used. + err = cdb.linkNodeDB.DeleteLinkNode(channel1.IdentityPub) + require.NoError(t, err, "unable to delete link node") + + err = cdb.RepairLinkNodes(wire.TestNet3) + require.NoError(t, err, "repair with testnet should succeed") + + repairedLinkNode, err = cdb.linkNodeDB.FetchLinkNode( + channel1.IdentityPub, + ) + require.NoError(t, err, "repaired link node should exist") + require.Equal( + t, wire.TestNet3, repairedLinkNode.Network, + "repaired link node should use provided network", + ) +} + +// TestFindMissingLinkNodes tests the FindMissingLinkNodes method with various +// scenarios. +func TestFindMissingLinkNodes(t *testing.T) { + t.Parallel() + + fullDB, err := MakeTestDB(t) + require.NoError(t, err, "unable to make test database") + + cdb := fullDB.ChannelStateDB() + + // Create three test public keys. + _, pub1 := btcec.PrivKeyFromBytes(key[:]) + _, pub2 := btcec.PrivKeyFromBytes(rev[:]) + testKey := [32]byte{0x03} + _, pub3 := btcec.PrivKeyFromBytes(testKey[:]) + + // Test 1: All nodes missing (empty database). + allPubs := []*btcec.PublicKey{pub1, pub2, pub3} + missing, err := cdb.linkNodeDB.FindMissingLinkNodes(nil, allPubs) + require.NoError(t, err, "FindMissingLinkNodes should succeed") + require.Len(t, missing, 3, "all nodes should be missing") + + // Test 2: Create one link node, verify only 2 are missing. + node1 := NewLinkNode(cdb.linkNodeDB, wire.MainNet, pub1) + err = node1.Sync() + require.NoError(t, err, "unable to sync link node") + + missing, err = cdb.linkNodeDB.FindMissingLinkNodes(nil, allPubs) + require.NoError(t, err, "FindMissingLinkNodes should succeed") + require.Len(t, missing, 2, "two nodes should be missing") + require.Contains(t, missing, pub2, "pub2 should be missing") + require.Contains(t, missing, pub3, "pub3 should be missing") + require.NotContains(t, missing, pub1, "pub1 should exist") + + // Test 3: Create remaining nodes, verify none are missing. + node2 := NewLinkNode(cdb.linkNodeDB, wire.MainNet, pub2) + err = node2.Sync() + require.NoError(t, err, "unable to sync link node") + + node3 := NewLinkNode(cdb.linkNodeDB, wire.MainNet, pub3) + err = node3.Sync() + require.NoError(t, err, "unable to sync link node") + + missing, err = cdb.linkNodeDB.FindMissingLinkNodes(nil, allPubs) + require.NoError(t, err, "FindMissingLinkNodes should succeed") + require.Len(t, missing, 0, "no nodes should be missing") + + // Test 4: Use with a provided transaction. + err = cdb.linkNodeDB.DeleteLinkNode(pub2) + require.NoError(t, err, "unable to delete link node") + + backend := fullDB.ChannelStateDB().backend + err = kvdb.View(backend, func(tx kvdb.RTx) error { + missing, err := cdb.linkNodeDB.FindMissingLinkNodes( + tx, allPubs, + ) + require.NoError(t, err, "FindMissingLinkNodes should succeed") + require.Len(t, missing, 1, "one node should be missing") + require.Contains(t, missing, pub2, "pub2 should be missing") + + return nil + }, func() {}) + require.NoError(t, err, "transaction should succeed") + + // Test 5: Empty input list. + missing, err = cdb.linkNodeDB.FindMissingLinkNodes(nil, nil) + require.NoError(t, err, "FindMissingLinkNodes should succeed") + require.Len(t, missing, 0, "no nodes should be missing for empty input") +} + +// TestCreateLinkNodes tests the CreateLinkNodes method with various scenarios. +func TestCreateLinkNodes(t *testing.T) { + t.Parallel() + + fullDB, err := MakeTestDB(t) + require.NoError(t, err, "unable to make test database") + + cdb := fullDB.ChannelStateDB() + + // Create three test public keys and link nodes. + _, pub1 := btcec.PrivKeyFromBytes(key[:]) + _, pub2 := btcec.PrivKeyFromBytes(rev[:]) + testKey := [32]byte{0x03} + _, pub3 := btcec.PrivKeyFromBytes(testKey[:]) + + node1 := NewLinkNode(cdb.linkNodeDB, wire.MainNet, pub1) + node2 := NewLinkNode(cdb.linkNodeDB, wire.TestNet3, pub2) + node3 := NewLinkNode(cdb.linkNodeDB, wire.SimNet, pub3) + + // Test 1: Create multiple link nodes at once with nil transaction. + nodesToCreate := []*LinkNode{node1, node2, node3} + err = cdb.linkNodeDB.CreateLinkNodes(nil, nodesToCreate) + require.NoError(t, err, "CreateLinkNodes should succeed") + + // Verify all nodes were created correctly. + fetchedNode1, err := cdb.linkNodeDB.FetchLinkNode(pub1) + require.NoError(t, err, "node1 should exist") + require.Equal(t, wire.MainNet, fetchedNode1.Network, + "node1 should have correct network") + + fetchedNode2, err := cdb.linkNodeDB.FetchLinkNode(pub2) + require.NoError(t, err, "node2 should exist") + require.Equal(t, wire.TestNet3, fetchedNode2.Network, + "node2 should have correct network") + + fetchedNode3, err := cdb.linkNodeDB.FetchLinkNode(pub3) + require.NoError(t, err, "node3 should exist") + require.Equal(t, wire.SimNet, fetchedNode3.Network, + "node3 should have correct network") + + // Test 2: Create nodes within a provided transaction. + err = cdb.linkNodeDB.DeleteLinkNode(pub2) + require.NoError(t, err, "unable to delete link node") + + // Verify node2 is deleted. + _, err = cdb.linkNodeDB.FetchLinkNode(pub2) + require.ErrorIs(t, err, ErrNodeNotFound, "node2 should be deleted") + + // Recreate node2 using a provided transaction. + backend := fullDB.ChannelStateDB().backend + err = kvdb.Update(backend, func(tx kvdb.RwTx) error { + return cdb.linkNodeDB.CreateLinkNodes(tx, []*LinkNode{node2}) + }, func() {}) + require.NoError(t, err, "transaction should succeed") + + // Verify node2 was recreated. + fetchedNode2, err = cdb.linkNodeDB.FetchLinkNode(pub2) + require.NoError(t, err, "node2 should exist after recreation") + require.Equal(t, wire.TestNet3, fetchedNode2.Network, + "node2 should have correct network") + + // Test 3: Creating nodes that already exist should succeed + // (idempotent behavior). + err = cdb.linkNodeDB.CreateLinkNodes(nil, nodesToCreate) + require.NoError(t, err, "recreating existing nodes should succeed") + + // Verify nodes still exist with correct data. + fetchedNode1, err = cdb.linkNodeDB.FetchLinkNode(pub1) + require.NoError(t, err, "node1 should still exist") + require.Equal(t, wire.MainNet, fetchedNode1.Network, + "node1 should still have correct network") + + // Test 4: Empty input list. + err = cdb.linkNodeDB.CreateLinkNodes(nil, nil) + require.NoError( + t, err, "CreateLinkNodes with empty list should succeed", + ) + + // Test 5: Create single node. + testKey4 := [32]byte{0x04} + _, pub4 := btcec.PrivKeyFromBytes(testKey4[:]) + node4 := NewLinkNode(cdb.linkNodeDB, wire.MainNet, pub4) + + err = cdb.linkNodeDB.CreateLinkNodes(nil, []*LinkNode{node4}) + require.NoError( + t, err, "CreateLinkNodes with single node should succeed", + ) + + fetchedNode4, err := cdb.linkNodeDB.FetchLinkNode(pub4) + require.NoError(t, err, "node4 should exist") + require.Equal(t, wire.MainNet, fetchedNode4.Network, + "node4 should have correct network") +} diff --git a/server.go b/server.go index c3b724ed39d..f7b43516850 100644 --- a/server.go +++ b/server.go @@ -2144,6 +2144,21 @@ func (s *server) Start(ctx context.Context) error { cleanup := cleaner{} s.start.Do(func() { + // Before starting any subsystems, repair any link nodes that + // may have been incorrectly pruned due to the race condition + // that was fixed in the link node pruning logic. This must + // happen before the chain arbitrator and other subsystems load + // channels, to ensure the invariant "link node exists iff + // channels exist" is maintained. + err := s.chanStateDB.RepairLinkNodes(s.cfg.ActiveNetParams.Net) + if err != nil { + srvrLog.Errorf("Failed to repair link nodes: %v", err) + + startErr = err + + return + } + cleanup = cleanup.add(s.customMessageServer.Stop) if err := s.customMessageServer.Start(); err != nil { startErr = err @@ -2473,9 +2488,8 @@ func (s *server) Start(ctx context.Context) error { // With all the relevant sub-systems started, we'll now attempt // to establish persistent connections to our direct channel // collaborators within the network. Before doing so however, - // we'll prune our set of link nodes found within the database - // to ensure we don't reconnect to any nodes we no longer have - // open channels with. + // we'll prune our set of link nodes to ensure we don't + // reconnect to any nodes we no longer have open channels with. if err := s.chanStateDB.PruneLinkNodes(); err != nil { srvrLog.Errorf("Failed to prune link nodes: %v", err)