diff --git a/docs/release-notes/release-notes-0.21.0.md b/docs/release-notes/release-notes-0.21.0.md index 8dc5aba7c3..81f2e8d366 100644 --- a/docs/release-notes/release-notes-0.21.0.md +++ b/docs/release-notes/release-notes-0.21.0.md @@ -86,6 +86,16 @@ potential loss of funds. The compilation of the server is hidden behind the non-default `switchrpc` build tag. +* Added a new [`CleanStore` RPC](https://github.com/lightningnetwork/lnd/pull/10484) + to the `switchrpc` sub-system. This new endpoint allows remote entities + responsible for payment lifecycle management to perform cleanup of HTLC + attempt information. The `CleanStore` method deletes all stored attempts + except for a specified `keepSet` of IDs. **IMPORTANT**: As with other + `switchrpc` endpoints, it is currently only safe to allow a *single* entity to + dispatch attempts and manage the store via the Switch at any given time. Using + `CleanStore` in an uncoordinated, multi-controller environment can lead to + lead to stuck payment attempts, failures, and potential loss of funds. + ## lncli Additions # Improvements diff --git a/htlcswitch/payment_result.go b/htlcswitch/payment_result.go index cc490cfac8..43207855c3 100644 --- a/htlcswitch/payment_result.go +++ b/htlcswitch/payment_result.go @@ -122,6 +122,12 @@ type networkResultStore struct { // its read-then-write sequence from concurrent calls, and it maintains // consistency between the database state and result subscribers. attemptIDMtx *multimutex.Mutex[uint64] + + // storeMtx is a read-write mutex that protects the entire store during + // global operations, such as a full cleanup. A read-lock should be + // held by all per-attempt operations, while a full write-lock should + // be held by CleanStore. + storeMtx sync.RWMutex } func newNetworkResultStore(db kvdb.Backend) *networkResultStore { @@ -147,6 +153,11 @@ func newNetworkResultStore(db kvdb.Backend) *networkResultStore { // NOTE: This is part of the AttemptStore interface. Subscribed clients do not // receive notice of this initialization. func (store *networkResultStore) InitAttempt(attemptID uint64) error { + // Acquire a lock to protect this fine-grained operation against a + // concurrent, store-wide CleanStore operation. + store.storeMtx.Lock() + defer store.storeMtx.Unlock() + // We get a mutex for this attempt ID to serialize init, store, and // subscribe operations. This is needed to ensure consistency between // the database state and the subscribers in case of concurrent calls. @@ -239,6 +250,11 @@ func (store *networkResultStore) InitAttempt(attemptID uint64) error { func (store *networkResultStore) StoreResult(attemptID uint64, result *networkResult) error { + // Acquire a lock to protect this fine-grained operation against a + // concurrent, store-wide CleanStore operation. + store.storeMtx.Lock() + defer store.storeMtx.Unlock() + // We get a mutex for this attempt ID. This is needed to ensure // consistency between the database state and the subscribers in case // of concurrent calls. @@ -299,6 +315,11 @@ func (store *networkResultStore) notifySubscribers(attemptID uint64, func (store *networkResultStore) SubscribeResult(attemptID uint64) ( <-chan *networkResult, error) { + // Acquire a read lock to protect this fine-grained operation against + // a concurrent, store-wide CleanStore operation. + store.storeMtx.RLock() + defer store.storeMtx.RUnlock() + // We get a mutex for this payment ID. This is needed to ensure // consistency between the database state and the subscribers in case // of concurrent calls. @@ -374,6 +395,11 @@ func (store *networkResultStore) SubscribeResult(attemptID uint64) ( func (store *networkResultStore) GetResult(pid uint64) ( *networkResult, error) { + // Acquire a read lock to protect this fine-grained operation against + // a concurrent, store-wide CleanStore operation. + store.storeMtx.RLock() + defer store.storeMtx.RUnlock() + var result *networkResult err := kvdb.View(store.backend, func(tx kvdb.RTx) error { var err error @@ -427,6 +453,14 @@ func fetchResult(tx kvdb.RTx, pid uint64) (*networkResult, error) { // concurrently while this process is ongoing, as its result might end up being // deleted. func (store *networkResultStore) CleanStore(keep map[uint64]struct{}) error { + // The CleanStore operation is coarse-grained ("snapshot-and-delete") + // and must acquire a full write lock to serialize it against all + // fine-grained, per-attempt operations, preventing race conditions. + // NOTE: An alternative DeleteAttempts API would allow for more + // fine-grained locking. + store.storeMtx.Lock() + defer store.storeMtx.Unlock() + return kvdb.Update(store.backend, func(tx kvdb.RwTx) error { networkResults, err := tx.CreateTopLevelBucket( networkResultStoreBucketKey, @@ -471,6 +505,11 @@ func (store *networkResultStore) CleanStore(keep map[uint64]struct{}) error { // // NOTE: This function is NOT safe for concurrent access. func (store *networkResultStore) FetchPendingAttempts() ([]uint64, error) { + // Acquire a read lock to protect this fine-grained operation against + // a concurrent, store-wide CleanStore operation. + store.storeMtx.RLock() + defer store.storeMtx.RUnlock() + var pending []uint64 err := kvdb.View(store.backend, func(tx kvdb.RTx) error { bucket := tx.ReadBucket(networkResultStoreBucketKey) @@ -533,6 +572,11 @@ func (store *networkResultStore) FetchPendingAttempts() ([]uint64, error) { func (store *networkResultStore) FailPendingAttempt(attemptID uint64, linkErr *LinkError) error { + // Acquire a lock to protect this fine-grained operation against a + // concurrent, store-wide CleanStore operation. + store.storeMtx.Lock() + defer store.storeMtx.Unlock() + // We get a mutex for this attempt ID to ensure consistency between the // database state and the subscribers in case of concurrent calls. store.attemptIDMtx.Lock(attemptID) diff --git a/itest/list_on_test.go b/itest/list_on_test.go index a26ff06287..ccee744b3e 100644 --- a/itest/list_on_test.go +++ b/itest/list_on_test.go @@ -764,8 +764,8 @@ var allTestCases = []*lntest.TestCase{ TestFunc: testSendOnion, }, { - Name: "send onion twice", - TestFunc: testSendOnionTwice, + Name: "send onion lifecycle", + TestFunc: testSendOnionIdempotencyLifecycle, }, { Name: "track onion", diff --git a/itest/lnd_sendonion_test.go b/itest/lnd_sendonion_test.go index bff161eeaa..130758c0b0 100644 --- a/itest/lnd_sendonion_test.go +++ b/itest/lnd_sendonion_test.go @@ -99,13 +99,18 @@ func testSendOnion(ht *lntest.HarnessTest) { ht.AssertInvoiceSettled(dave, invoices[0].PaymentAddr) } -// testSendOnionTwice tests that the switch correctly rejects a duplicate -// payment attempt for an HTLC that is already in-flight. It sends an onion, -// then immediately sends the exact same onion with the same attempt ID. The -// test asserts that the second attempt is rejected with a DUPLICATE_HTLC -// error. It also verifies that sending again after the original HTLC has -// settled is also rejected. -func testSendOnionTwice(ht *lntest.HarnessTest) { +// testSendOnionIdempotencyLifecycle verifies the full lifecycle of SendOnion's +// idempotency guarantees. It tests that the switch correctly rejects duplicate +// payment attempts across various states: +// 1. While the original HTLC is still in-flight. +// 2. After the original HTLC has definitively settled. +// 3. After the attempt record has been explicitly preserved via CleanStore. +// 4. After the attempt record has been explicitly deleted via CleanStore, +// proving the idempotency key can be reused. +// +// This test ensures the SendOnion RPC provides a stable and trustworthy +// contract for clients managing payment attempts. +func testSendOnionIdempotencyLifecycle(ht *lntest.HarnessTest) { // Create a four-node context consisting of Alice, Bob, Carol, and // Dave with the following topology: // Alice -> Bob -> Carol -> Dave @@ -203,11 +208,40 @@ func testSendOnionTwice(ht *lntest.HarnessTest) { require.Equal(ht, preimage[:], trackResp.Preimage) // Now that the original HTLC attempt has settled, we'll send the same - // onion again with the same attempt ID. - // - // NOTE: Currently, this does not error. When we make SendOnion fully - // duplicate safe, this should be updated to assert an error is - // returned. + // onion again with the same attempt ID to prove the idempotency key + // persists across the full lifecycle. + resp = alice.RPC.SendOnion(sendReq) + require.False(ht, resp.Success, "expected failure on onion send") + require.Equal(ht, resp.ErrorCode, + switchrpc.ErrorCode_DUPLICATE_HTLC, + "unexpected error code") + require.Equal(ht, resp.ErrorMessage, htlcswitch.ErrDuplicateAdd.Error()) + + // Now we'll test the CleanStore RPC. First, we'll call it with the + // attempt ID in the keep list. + cleanReq := &switchrpc.CleanStoreRequest{ + KeepAttemptIds: []uint64{sendReq.AttemptId}, + } + alice.RPC.CleanStore(cleanReq) + + // Since we specified that the ID should be kept, calling SendOnion + // again should still result in a duplicate error. + resp = alice.RPC.SendOnion(sendReq) + require.False(ht, resp.Success, "expected failure on onion send") + require.Equal(ht, resp.ErrorCode, + switchrpc.ErrorCode_DUPLICATE_HTLC, + "unexpected error code") + require.Equal(ht, resp.ErrorMessage, htlcswitch.ErrDuplicateAdd.Error()) + + // Now, we'll call CleanStore with an empty keep list, which should + // delete our record. + cleanReq.KeepAttemptIds = []uint64{} + alice.RPC.CleanStore(cleanReq) + + // Finally, we'll send the onion one last time. Since the idempotency + // record has been deleted, this should be accepted by the switch. It + // will ultimately fail at the final hop since the invoice is already + // paid, but the initial dispatch will succeed. resp = alice.RPC.SendOnion(sendReq) require.True(ht, resp.Success, "expected successful onion send") require.Empty(ht, resp.ErrorMessage, "unexpected failure to send onion") diff --git a/lnrpc/switchrpc/mock.go b/lnrpc/switchrpc/mock.go index 0fc66c849d..3ed5ee73eb 100644 --- a/lnrpc/switchrpc/mock.go +++ b/lnrpc/switchrpc/mock.go @@ -17,6 +17,12 @@ type mockPayer struct { getResultResult *htlcswitch.PaymentResult getResultErr error resultChan chan *htlcswitch.PaymentResult + + // cleanStoreErr is the error to return from CleanStore. + cleanStoreErr error + + // keptPids stores the IDs passed to CleanStore. + keptPids map[uint64]struct{} } // SendHTLC is a mock implementation of the SendHTLC method. @@ -56,7 +62,9 @@ func (m *mockPayer) HasAttemptResult(attemptID uint64) (bool, error) { // CleanStore is a mock implementation of the CleanStore method. func (m *mockPayer) CleanStore(keepPids map[uint64]struct{}) error { - return nil + m.keptPids = keepPids + + return m.cleanStoreErr } // mockRouteProcessor is a mock implementation of the routing.RouteProcessor diff --git a/lnrpc/switchrpc/switch.pb.go b/lnrpc/switchrpc/switch.pb.go index 7f65607580..80fe5b2649 100644 --- a/lnrpc/switchrpc/switch.pb.go +++ b/lnrpc/switchrpc/switch.pb.go @@ -600,6 +600,93 @@ func (x *BuildOnionResponse) GetHopPubkeys() [][]byte { return nil } +type CleanStoreRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // The attempt IDs to keep. All other attempts in the namespace will be + // deleted. + KeepAttemptIds []uint64 `protobuf:"varint,1,rep,packed,name=keep_attempt_ids,json=keepAttemptIds,proto3" json:"keep_attempt_ids,omitempty"` +} + +func (x *CleanStoreRequest) Reset() { + *x = CleanStoreRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_switchrpc_switch_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CleanStoreRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CleanStoreRequest) ProtoMessage() {} + +func (x *CleanStoreRequest) ProtoReflect() protoreflect.Message { + mi := &file_switchrpc_switch_proto_msgTypes[6] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CleanStoreRequest.ProtoReflect.Descriptor instead. +func (*CleanStoreRequest) Descriptor() ([]byte, []int) { + return file_switchrpc_switch_proto_rawDescGZIP(), []int{6} +} + +func (x *CleanStoreRequest) GetKeepAttemptIds() []uint64 { + if x != nil { + return x.KeepAttemptIds + } + return nil +} + +type CleanStoreResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *CleanStoreResponse) Reset() { + *x = CleanStoreResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_switchrpc_switch_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CleanStoreResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CleanStoreResponse) ProtoMessage() {} + +func (x *CleanStoreResponse) ProtoReflect() protoreflect.Message { + mi := &file_switchrpc_switch_proto_msgTypes[7] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CleanStoreResponse.ProtoReflect.Descriptor instead. +func (*CleanStoreResponse) Descriptor() ([]byte, []int) { + return file_switchrpc_switch_proto_rawDescGZIP(), []int{7} +} + var File_switchrpc_switch_proto protoreflect.FileDescriptor var file_switchrpc_switch_proto_rawDesc = []byte{ @@ -686,33 +773,43 @@ var file_switchrpc_switch_proto_rawDesc = []byte{ 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x12, 0x1f, 0x0a, 0x0b, 0x68, 0x6f, 0x70, 0x5f, 0x70, 0x75, 0x62, 0x6b, 0x65, 0x79, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0c, 0x52, 0x0a, 0x68, 0x6f, 0x70, - 0x50, 0x75, 0x62, 0x6b, 0x65, 0x79, 0x73, 0x2a, 0xc5, 0x01, 0x0a, 0x09, 0x45, 0x72, 0x72, 0x6f, - 0x72, 0x43, 0x6f, 0x64, 0x65, 0x12, 0x0f, 0x0a, 0x0b, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, - 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x18, 0x0a, 0x14, 0x50, 0x41, 0x59, 0x4d, 0x45, 0x4e, - 0x54, 0x5f, 0x49, 0x44, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x01, - 0x12, 0x14, 0x0a, 0x10, 0x46, 0x4f, 0x52, 0x57, 0x41, 0x52, 0x44, 0x49, 0x4e, 0x47, 0x5f, 0x45, - 0x52, 0x52, 0x4f, 0x52, 0x10, 0x02, 0x12, 0x14, 0x0a, 0x10, 0x43, 0x4c, 0x45, 0x41, 0x52, 0x5f, - 0x54, 0x45, 0x58, 0x54, 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x10, 0x03, 0x12, 0x1e, 0x0a, 0x1a, - 0x55, 0x4e, 0x52, 0x45, 0x41, 0x44, 0x41, 0x42, 0x4c, 0x45, 0x5f, 0x46, 0x41, 0x49, 0x4c, 0x55, - 0x52, 0x45, 0x5f, 0x4d, 0x45, 0x53, 0x53, 0x41, 0x47, 0x45, 0x10, 0x04, 0x12, 0x12, 0x0a, 0x0e, - 0x44, 0x55, 0x50, 0x4c, 0x49, 0x43, 0x41, 0x54, 0x45, 0x5f, 0x48, 0x54, 0x4c, 0x43, 0x10, 0x05, - 0x12, 0x0b, 0x0a, 0x07, 0x4e, 0x4f, 0x5f, 0x4c, 0x49, 0x4e, 0x4b, 0x10, 0x06, 0x12, 0x12, 0x0a, - 0x0e, 0x53, 0x57, 0x49, 0x54, 0x43, 0x48, 0x5f, 0x45, 0x58, 0x49, 0x54, 0x49, 0x4e, 0x47, 0x10, - 0x07, 0x12, 0x0c, 0x0a, 0x08, 0x49, 0x4e, 0x54, 0x45, 0x52, 0x4e, 0x41, 0x4c, 0x10, 0x08, 0x32, - 0xe6, 0x01, 0x0a, 0x06, 0x53, 0x77, 0x69, 0x74, 0x63, 0x68, 0x12, 0x46, 0x0a, 0x09, 0x53, 0x65, - 0x6e, 0x64, 0x4f, 0x6e, 0x69, 0x6f, 0x6e, 0x12, 0x1b, 0x2e, 0x73, 0x77, 0x69, 0x74, 0x63, 0x68, - 0x72, 0x70, 0x63, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x4f, 0x6e, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x73, 0x77, 0x69, 0x74, 0x63, 0x68, 0x72, 0x70, 0x63, - 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x4f, 0x6e, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x12, 0x49, 0x0a, 0x0a, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x4f, 0x6e, 0x69, 0x6f, 0x6e, - 0x12, 0x1c, 0x2e, 0x73, 0x77, 0x69, 0x74, 0x63, 0x68, 0x72, 0x70, 0x63, 0x2e, 0x54, 0x72, 0x61, - 0x63, 0x6b, 0x4f, 0x6e, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, - 0x2e, 0x73, 0x77, 0x69, 0x74, 0x63, 0x68, 0x72, 0x70, 0x63, 0x2e, 0x54, 0x72, 0x61, 0x63, 0x6b, + 0x50, 0x75, 0x62, 0x6b, 0x65, 0x79, 0x73, 0x22, 0x3d, 0x0a, 0x11, 0x43, 0x6c, 0x65, 0x61, 0x6e, + 0x53, 0x74, 0x6f, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x28, 0x0a, 0x10, + 0x6b, 0x65, 0x65, 0x70, 0x5f, 0x61, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x5f, 0x69, 0x64, 0x73, + 0x18, 0x01, 0x20, 0x03, 0x28, 0x04, 0x52, 0x0e, 0x6b, 0x65, 0x65, 0x70, 0x41, 0x74, 0x74, 0x65, + 0x6d, 0x70, 0x74, 0x49, 0x64, 0x73, 0x22, 0x14, 0x0a, 0x12, 0x43, 0x6c, 0x65, 0x61, 0x6e, 0x53, + 0x74, 0x6f, 0x72, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2a, 0xc5, 0x01, 0x0a, + 0x09, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x43, 0x6f, 0x64, 0x65, 0x12, 0x0f, 0x0a, 0x0b, 0x55, 0x4e, + 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x18, 0x0a, 0x14, 0x50, + 0x41, 0x59, 0x4d, 0x45, 0x4e, 0x54, 0x5f, 0x49, 0x44, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f, + 0x55, 0x4e, 0x44, 0x10, 0x01, 0x12, 0x14, 0x0a, 0x10, 0x46, 0x4f, 0x52, 0x57, 0x41, 0x52, 0x44, + 0x49, 0x4e, 0x47, 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x10, 0x02, 0x12, 0x14, 0x0a, 0x10, 0x43, + 0x4c, 0x45, 0x41, 0x52, 0x5f, 0x54, 0x45, 0x58, 0x54, 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x10, + 0x03, 0x12, 0x1e, 0x0a, 0x1a, 0x55, 0x4e, 0x52, 0x45, 0x41, 0x44, 0x41, 0x42, 0x4c, 0x45, 0x5f, + 0x46, 0x41, 0x49, 0x4c, 0x55, 0x52, 0x45, 0x5f, 0x4d, 0x45, 0x53, 0x53, 0x41, 0x47, 0x45, 0x10, + 0x04, 0x12, 0x12, 0x0a, 0x0e, 0x44, 0x55, 0x50, 0x4c, 0x49, 0x43, 0x41, 0x54, 0x45, 0x5f, 0x48, + 0x54, 0x4c, 0x43, 0x10, 0x05, 0x12, 0x0b, 0x0a, 0x07, 0x4e, 0x4f, 0x5f, 0x4c, 0x49, 0x4e, 0x4b, + 0x10, 0x06, 0x12, 0x12, 0x0a, 0x0e, 0x53, 0x57, 0x49, 0x54, 0x43, 0x48, 0x5f, 0x45, 0x58, 0x49, + 0x54, 0x49, 0x4e, 0x47, 0x10, 0x07, 0x12, 0x0c, 0x0a, 0x08, 0x49, 0x4e, 0x54, 0x45, 0x52, 0x4e, + 0x41, 0x4c, 0x10, 0x08, 0x32, 0xb1, 0x02, 0x0a, 0x06, 0x53, 0x77, 0x69, 0x74, 0x63, 0x68, 0x12, + 0x46, 0x0a, 0x09, 0x53, 0x65, 0x6e, 0x64, 0x4f, 0x6e, 0x69, 0x6f, 0x6e, 0x12, 0x1b, 0x2e, 0x73, + 0x77, 0x69, 0x74, 0x63, 0x68, 0x72, 0x70, 0x63, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x4f, 0x6e, 0x69, + 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x73, 0x77, 0x69, 0x74, + 0x63, 0x68, 0x72, 0x70, 0x63, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x4f, 0x6e, 0x69, 0x6f, 0x6e, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x49, 0x0a, 0x0a, 0x54, 0x72, 0x61, 0x63, 0x6b, + 0x4f, 0x6e, 0x69, 0x6f, 0x6e, 0x12, 0x1c, 0x2e, 0x73, 0x77, 0x69, 0x74, 0x63, 0x68, 0x72, 0x70, + 0x63, 0x2e, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x4f, 0x6e, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x73, 0x77, 0x69, 0x74, 0x63, 0x68, 0x72, 0x70, 0x63, 0x2e, + 0x54, 0x72, 0x61, 0x63, 0x6b, 0x4f, 0x6e, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x12, 0x49, 0x0a, 0x0a, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x4f, 0x6e, 0x69, 0x6f, 0x6e, + 0x12, 0x1c, 0x2e, 0x73, 0x77, 0x69, 0x74, 0x63, 0x68, 0x72, 0x70, 0x63, 0x2e, 0x42, 0x75, 0x69, + 0x6c, 0x64, 0x4f, 0x6e, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, + 0x2e, 0x73, 0x77, 0x69, 0x74, 0x63, 0x68, 0x72, 0x70, 0x63, 0x2e, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x4f, 0x6e, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x49, 0x0a, - 0x0a, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x4f, 0x6e, 0x69, 0x6f, 0x6e, 0x12, 0x1c, 0x2e, 0x73, 0x77, - 0x69, 0x74, 0x63, 0x68, 0x72, 0x70, 0x63, 0x2e, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x4f, 0x6e, 0x69, - 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x73, 0x77, 0x69, 0x74, - 0x63, 0x68, 0x72, 0x70, 0x63, 0x2e, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x4f, 0x6e, 0x69, 0x6f, 0x6e, + 0x0a, 0x43, 0x6c, 0x65, 0x61, 0x6e, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x12, 0x1c, 0x2e, 0x73, 0x77, + 0x69, 0x74, 0x63, 0x68, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x6c, 0x65, 0x61, 0x6e, 0x53, 0x74, 0x6f, + 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x73, 0x77, 0x69, 0x74, + 0x63, 0x68, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x6c, 0x65, 0x61, 0x6e, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x31, 0x5a, 0x2f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x6e, 0x69, 0x6e, 0x67, 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x2f, 0x6c, 0x6e, 0x64, 0x2f, 0x6c, 0x6e, 0x72, 0x70, @@ -733,7 +830,7 @@ func file_switchrpc_switch_proto_rawDescGZIP() []byte { } var file_switchrpc_switch_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_switchrpc_switch_proto_msgTypes = make([]protoimpl.MessageInfo, 7) +var file_switchrpc_switch_proto_msgTypes = make([]protoimpl.MessageInfo, 9) var file_switchrpc_switch_proto_goTypes = []interface{}{ (ErrorCode)(0), // 0: switchrpc.ErrorCode (*SendOnionRequest)(nil), // 1: switchrpc.SendOnionRequest @@ -742,25 +839,29 @@ var file_switchrpc_switch_proto_goTypes = []interface{}{ (*TrackOnionResponse)(nil), // 4: switchrpc.TrackOnionResponse (*BuildOnionRequest)(nil), // 5: switchrpc.BuildOnionRequest (*BuildOnionResponse)(nil), // 6: switchrpc.BuildOnionResponse - nil, // 7: switchrpc.SendOnionRequest.CustomRecordsEntry - (*lnrpc.Route)(nil), // 8: lnrpc.Route + (*CleanStoreRequest)(nil), // 7: switchrpc.CleanStoreRequest + (*CleanStoreResponse)(nil), // 8: switchrpc.CleanStoreResponse + nil, // 9: switchrpc.SendOnionRequest.CustomRecordsEntry + (*lnrpc.Route)(nil), // 10: lnrpc.Route } var file_switchrpc_switch_proto_depIdxs = []int32{ - 7, // 0: switchrpc.SendOnionRequest.custom_records:type_name -> switchrpc.SendOnionRequest.CustomRecordsEntry - 0, // 1: switchrpc.SendOnionResponse.error_code:type_name -> switchrpc.ErrorCode - 0, // 2: switchrpc.TrackOnionResponse.error_code:type_name -> switchrpc.ErrorCode - 8, // 3: switchrpc.BuildOnionRequest.route:type_name -> lnrpc.Route - 1, // 4: switchrpc.Switch.SendOnion:input_type -> switchrpc.SendOnionRequest - 3, // 5: switchrpc.Switch.TrackOnion:input_type -> switchrpc.TrackOnionRequest - 5, // 6: switchrpc.Switch.BuildOnion:input_type -> switchrpc.BuildOnionRequest - 2, // 7: switchrpc.Switch.SendOnion:output_type -> switchrpc.SendOnionResponse - 4, // 8: switchrpc.Switch.TrackOnion:output_type -> switchrpc.TrackOnionResponse - 6, // 9: switchrpc.Switch.BuildOnion:output_type -> switchrpc.BuildOnionResponse - 7, // [7:10] is the sub-list for method output_type - 4, // [4:7] is the sub-list for method input_type - 4, // [4:4] is the sub-list for extension type_name - 4, // [4:4] is the sub-list for extension extendee - 0, // [0:4] is the sub-list for field type_name + 9, // 0: switchrpc.SendOnionRequest.custom_records:type_name -> switchrpc.SendOnionRequest.CustomRecordsEntry + 0, // 1: switchrpc.SendOnionResponse.error_code:type_name -> switchrpc.ErrorCode + 0, // 2: switchrpc.TrackOnionResponse.error_code:type_name -> switchrpc.ErrorCode + 10, // 3: switchrpc.BuildOnionRequest.route:type_name -> lnrpc.Route + 1, // 4: switchrpc.Switch.SendOnion:input_type -> switchrpc.SendOnionRequest + 3, // 5: switchrpc.Switch.TrackOnion:input_type -> switchrpc.TrackOnionRequest + 5, // 6: switchrpc.Switch.BuildOnion:input_type -> switchrpc.BuildOnionRequest + 7, // 7: switchrpc.Switch.CleanStore:input_type -> switchrpc.CleanStoreRequest + 2, // 8: switchrpc.Switch.SendOnion:output_type -> switchrpc.SendOnionResponse + 4, // 9: switchrpc.Switch.TrackOnion:output_type -> switchrpc.TrackOnionResponse + 6, // 10: switchrpc.Switch.BuildOnion:output_type -> switchrpc.BuildOnionResponse + 8, // 11: switchrpc.Switch.CleanStore:output_type -> switchrpc.CleanStoreResponse + 8, // [8:12] is the sub-list for method output_type + 4, // [4:8] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name } func init() { file_switchrpc_switch_proto_init() } @@ -841,6 +942,30 @@ func file_switchrpc_switch_proto_init() { return nil } } + file_switchrpc_switch_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CleanStoreRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_switchrpc_switch_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CleanStoreResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } file_switchrpc_switch_proto_msgTypes[0].OneofWrappers = []interface{}{} file_switchrpc_switch_proto_msgTypes[2].OneofWrappers = []interface{}{} @@ -851,7 +976,7 @@ func file_switchrpc_switch_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_switchrpc_switch_proto_rawDesc, NumEnums: 1, - NumMessages: 7, + NumMessages: 9, NumExtensions: 0, NumServices: 1, }, diff --git a/lnrpc/switchrpc/switch.proto b/lnrpc/switchrpc/switch.proto index 06fa60fbb6..c1833d0128 100644 --- a/lnrpc/switchrpc/switch.proto +++ b/lnrpc/switchrpc/switch.proto @@ -36,6 +36,12 @@ service Switch { BuildOnion attempts to build an onion packet for the specified route. */ rpc BuildOnion (BuildOnionRequest) returns (BuildOnionResponse); + + /* + CleanStore deletes all attempt results except those specified in + keep_attempt_ids. + */ + rpc CleanStore (CleanStoreRequest) returns (CleanStoreResponse); } message SendOnionRequest { @@ -203,3 +209,12 @@ message BuildOnionResponse { // constructed onion. repeated bytes hop_pubkeys = 3; } + +message CleanStoreRequest { + // The attempt IDs to keep. All other attempts in the namespace will be + // deleted. + repeated uint64 keep_attempt_ids = 1; +} + +message CleanStoreResponse { +} diff --git a/lnrpc/switchrpc/switch.swagger.json b/lnrpc/switchrpc/switch.swagger.json index 21cc6a9599..b0fa266315 100644 --- a/lnrpc/switchrpc/switch.swagger.json +++ b/lnrpc/switchrpc/switch.swagger.json @@ -350,6 +350,9 @@ }, "description": "BuildOnionResponse contains the constructed onion packet." }, + "switchrpcCleanStoreResponse": { + "type": "object" + }, "switchrpcErrorCode": { "type": "string", "enum": [ diff --git a/lnrpc/switchrpc/switch_grpc.pb.go b/lnrpc/switchrpc/switch_grpc.pb.go index cd06275a97..5fca1b6d31 100644 --- a/lnrpc/switchrpc/switch_grpc.pb.go +++ b/lnrpc/switchrpc/switch_grpc.pb.go @@ -37,6 +37,9 @@ type SwitchClient interface { TrackOnion(ctx context.Context, in *TrackOnionRequest, opts ...grpc.CallOption) (*TrackOnionResponse, error) // BuildOnion attempts to build an onion packet for the specified route. BuildOnion(ctx context.Context, in *BuildOnionRequest, opts ...grpc.CallOption) (*BuildOnionResponse, error) + // CleanStore deletes all attempt results except those specified in + // keep_attempt_ids. + CleanStore(ctx context.Context, in *CleanStoreRequest, opts ...grpc.CallOption) (*CleanStoreResponse, error) } type switchClient struct { @@ -74,6 +77,15 @@ func (c *switchClient) BuildOnion(ctx context.Context, in *BuildOnionRequest, op return out, nil } +func (c *switchClient) CleanStore(ctx context.Context, in *CleanStoreRequest, opts ...grpc.CallOption) (*CleanStoreResponse, error) { + out := new(CleanStoreResponse) + err := c.cc.Invoke(ctx, "/switchrpc.Switch/CleanStore", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // SwitchServer is the server API for Switch service. // All implementations must embed UnimplementedSwitchServer // for forward compatibility @@ -97,6 +109,9 @@ type SwitchServer interface { TrackOnion(context.Context, *TrackOnionRequest) (*TrackOnionResponse, error) // BuildOnion attempts to build an onion packet for the specified route. BuildOnion(context.Context, *BuildOnionRequest) (*BuildOnionResponse, error) + // CleanStore deletes all attempt results except those specified in + // keep_attempt_ids. + CleanStore(context.Context, *CleanStoreRequest) (*CleanStoreResponse, error) mustEmbedUnimplementedSwitchServer() } @@ -113,6 +128,9 @@ func (UnimplementedSwitchServer) TrackOnion(context.Context, *TrackOnionRequest) func (UnimplementedSwitchServer) BuildOnion(context.Context, *BuildOnionRequest) (*BuildOnionResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method BuildOnion not implemented") } +func (UnimplementedSwitchServer) CleanStore(context.Context, *CleanStoreRequest) (*CleanStoreResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method CleanStore not implemented") +} func (UnimplementedSwitchServer) mustEmbedUnimplementedSwitchServer() {} // UnsafeSwitchServer may be embedded to opt out of forward compatibility for this service. @@ -180,6 +198,24 @@ func _Switch_BuildOnion_Handler(srv interface{}, ctx context.Context, dec func(i return interceptor(ctx, in, info, handler) } +func _Switch_CleanStore_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(CleanStoreRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(SwitchServer).CleanStore(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/switchrpc.Switch/CleanStore", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(SwitchServer).CleanStore(ctx, req.(*CleanStoreRequest)) + } + return interceptor(ctx, in, info, handler) +} + // Switch_ServiceDesc is the grpc.ServiceDesc for Switch service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -199,6 +235,10 @@ var Switch_ServiceDesc = grpc.ServiceDesc{ MethodName: "BuildOnion", Handler: _Switch_BuildOnion_Handler, }, + { + MethodName: "CleanStore", + Handler: _Switch_CleanStore_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "switchrpc/switch.proto", diff --git a/lnrpc/switchrpc/switch_server.go b/lnrpc/switchrpc/switch_server.go index 2f54a11908..523f177062 100644 --- a/lnrpc/switchrpc/switch_server.go +++ b/lnrpc/switchrpc/switch_server.go @@ -757,3 +757,32 @@ func ParseForwardingError(errStr string) (*htlcswitch.ForwardingError, error) { return htlcswitch.NewForwardingError(wireMsg, idx), nil } + +// CleanStore deletes all attempt results except those specified in request as +// to be kept. This allows for remote maintenance of HTLC attempt data in the +// Switch's underlying attempt store and should be used by routers to +// periodically clean up results for completed attempts. +func (s *Server) CleanStore(_ context.Context, + req *CleanStoreRequest) (*CleanStoreResponse, error) { + + // Construct keep set from provided IDs. + keepSet := make(map[uint64]struct{}, len(req.KeepAttemptIds)) + for _, id := range req.KeepAttemptIds { + keepSet[id] = struct{}{} + } + + // Clean the attempt store. + // TODO(calvin): Support namespace-aware deletion. This will be required + // once multiple clients are concurrently using the Switch. + err := s.cfg.HtlcDispatcher.CleanStore(keepSet) + if err != nil { + log.Errorf("Unable to cleanup Switch attempt store: %v", err) + + return nil, status.Errorf(codes.Internal, "unable to cleanup "+ + "Switch attempt store: %v", err) + } + + log.Debugf("Successfully cleaned Switch attempt store.") + + return &CleanStoreResponse{}, nil +} diff --git a/lnrpc/switchrpc/switch_server_test.go b/lnrpc/switchrpc/switch_server_test.go index 87fcde057d..50a7297049 100644 --- a/lnrpc/switchrpc/switch_server_test.go +++ b/lnrpc/switchrpc/switch_server_test.go @@ -810,3 +810,90 @@ func TestBuildErrorDecryptor(t *testing.T) { }) } } + +// TestCleanStore verifies the behavior of the CleanStore RPC handler. It +// ensures that the RPC correctly processes cleanup requests by accurately +// passing the set of IDs to keep to the underlying Switch. +func TestCleanStore(t *testing.T) { + t.Parallel() + + testCases := []struct { + name string + + // setup is a function that modifies the server or request for a + // specific test case. + setup func(*testing.T, *mockPayer, *CleanStoreRequest) + + // expectedKeepSet is the set of IDs we expect to be passed to + // the mock payer. + expectedKeepSet map[uint64]struct{} + + // expectedErrCode is the gRPC error code we expect from the + // call. + expectedErrCode codes.Code + }{ + { + name: "clean with keep set", + setup: func(t *testing.T, m *mockPayer, + req *CleanStoreRequest) { + + req.KeepAttemptIds = []uint64{1, 3, 5} + }, + expectedKeepSet: map[uint64]struct{}{ + 1: {}, + 3: {}, + 5: {}, + }, + }, + { + name: "clean with empty keep set", + setup: nil, + expectedKeepSet: map[uint64]struct{}{}, + }, + { + name: "underlying store fails", + setup: func(t *testing.T, m *mockPayer, + req *CleanStoreRequest) { + + m.cleanStoreErr = errors.New("db is down") + }, + expectedErrCode: codes.Internal, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + mockPayer := &mockPayer{} + server, _, err := New(&Config{ + HtlcDispatcher: mockPayer, + }) + require.NoError(t, err) + + req := &CleanStoreRequest{} + + if tc.setup != nil { + tc.setup(t, mockPayer, req) + } + + _, err = server.CleanStore(t.Context(), req) + + // Check for gRPC level errors. + if tc.expectedErrCode != codes.OK { + require.Error(t, err) + s, ok := status.FromError(err) + require.True(t, ok) + require.Equal(t, tc.expectedErrCode, s.Code()) + + return + } + + // If no error was expected, assert success and check + // the passed keepSet. + require.NoError(t, err) + require.Equal(t, tc.expectedKeepSet, mockPayer.keptPids) + }) + } +} diff --git a/lntest/rpc/switch.go b/lntest/rpc/switch.go index e69b899a77..da1af935ac 100644 --- a/lntest/rpc/switch.go +++ b/lntest/rpc/switch.go @@ -54,3 +54,18 @@ func (h *HarnessRPC) BuildOnion( return resp } + +// CleanStore makes a RPC call to CleanStore and asserts. +// +//nolint:lll +func (h *HarnessRPC) CleanStore( + req *switchrpc.CleanStoreRequest) *switchrpc.CleanStoreResponse { + + ctxt, cancel := context.WithTimeout(h.runCtx, DefaultTimeout) + defer cancel() + + resp, err := h.Switch.CleanStore(ctxt, req) + h.NoError(err, "CleanStore") + + return resp +}