From d87e1ab02edbccf3df0a02e180fbafb5a5356037 Mon Sep 17 00:00:00 2001 From: Cobalt0s Date: Mon, 4 May 2026 20:44:10 +0300 Subject: [PATCH] [ENG-3815] feat(microsoft): Impl Delete Subscription --- internal/datautils/lists.go | 26 +++++ .../microsoft/internal/subscriber/delete.go | 75 +++++++++++++ .../internal/subscriber/delete_test.go | 64 +++++++++++ .../microsoft/internal/subscriber/state.go | 102 ++++++++++++++++++ .../test/delete/payload-message-1.json | 8 ++ .../test/delete/payload-message-2.json | 8 ++ .../test/delete/payload-message-3.json | 8 ++ .../test/delete/subscriptions-response.json | 12 +++ .../test/read/subscriptions-response.json | 90 ++++++++++++++++ .../subscription/outlook-events/main.go | 5 +- .../subscription/outlook-messages/main.go | 5 +- .../utils/testroutines/subscription-delete.go | 33 ++++++ .../testscenario/subscription-messaging.go | 37 +++++++ 13 files changed, 469 insertions(+), 4 deletions(-) create mode 100644 providers/microsoft/internal/subscriber/delete_test.go create mode 100644 providers/microsoft/internal/subscriber/test/delete/payload-message-1.json create mode 100644 providers/microsoft/internal/subscriber/test/delete/payload-message-2.json create mode 100644 providers/microsoft/internal/subscriber/test/delete/payload-message-3.json create mode 100644 providers/microsoft/internal/subscriber/test/delete/subscriptions-response.json create mode 100644 providers/microsoft/internal/subscriber/test/read/subscriptions-response.json create mode 100644 test/utils/testroutines/subscription-delete.go diff --git a/internal/datautils/lists.go b/internal/datautils/lists.go index a71cbac42d..66cd73521a 100644 --- a/internal/datautils/lists.go +++ b/internal/datautils/lists.go @@ -20,6 +20,32 @@ func (l IndexedLists[ID, V]) Add(bucket ID, objects ...V) { l[bucket] = append(l[bucket], objects...) } +// Remove will remove all elements in the list under the bucket matching the filter. +// A bucket may have a list with no items. If you need to remove the bucket itself, do it explicitly. +func (l IndexedLists[ID, V]) Remove(bucket ID, filter func(V) bool) { + items, exists := l[bucket] + if !exists || len(items) == 0 { + return + } + + // Reuse underlying array to avoid allocations + filtered := items[:0] + + for _, item := range items { + if !filter(item) { + filtered = append(filtered, item) + } + } + + // Clear trailing references. + var zero V + for i := len(filtered); i < len(items); i++ { + items[i] = zero + } + + l[bucket] = filtered +} + func (l NamedLists[V]) Add(bucket string, objects ...V) { IndexedLists[string, V](l).Add(bucket, objects...) } diff --git a/providers/microsoft/internal/subscriber/delete.go b/providers/microsoft/internal/subscriber/delete.go index ad11f8b689..558fe8104b 100644 --- a/providers/microsoft/internal/subscriber/delete.go +++ b/providers/microsoft/internal/subscriber/delete.go @@ -2,11 +2,85 @@ package subscriber import ( "context" + "errors" "net/http" + "github.com/amp-labs/connectors/common" "github.com/amp-labs/connectors/providers/microsoft/internal/batch" ) +// DeleteSubscription removes all remote subscriptions for objects specified in previousResult. +// Executes batch DELETE requests to Microsoft Graph, tolerating 404s (already deleted) as success. +// Discards final state as delete operations are fire-and-forget; returns first error if any. +// https://learn.microsoft.com/en-us/graph/change-notifications-delivery-webhooks?tabs=http#delete-a-subscription +func (s Strategy) DeleteSubscription( + ctx context.Context, + previousResult common.SubscriptionResult, +) error { + remoteSubscriptions, err := s.fetchSubscriptions(ctx) + if err != nil { + return err + } + + subscriptionsForRemoval := remoteSubscriptions.subset(previousResult.ObjectNames()) + + // The state is discarded. + _, err = s.deleteSubscriptions(ctx, subscriptionsForRemoval) + + return err +} + +// deleteSubscriptions batch-deletes all subscriptions in the given RemoteSubscriptions set. +// Handles 404 responses as success (already deleted); aggregates other errors. +// +// Returns updated SubscriptionResult reflecting remaining subscriptions and operation status. +func (s Strategy) deleteSubscriptions( + ctx context.Context, + remoteSubscriptions RemoteSubscriptions, +) (*common.SubscriptionResult, error) { + subscriptionsToRemove := remoteSubscriptions.getIDs() + + batchParams, err := s.paramsForBatchRemoveSubscriptionsByIDs(subscriptionsToRemove) + if err != nil { + return nil, err + } + + bundledResponse := batch.Execute[SubscriptionResource](ctx, s.batchStrategy, batchParams) + + // Aggregate non-404 errors (404 indicates record was already deleted). + var outErr error + + for _, e := range bundledResponse.Errors { + if e.Status != http.StatusNotFound { + // Resource no longer exists. + } else { + outErr = errors.Join(outErr, e.Data) + } + } + + // Filter out the subscription that were removed if any are left . + for requestID, resp := range bundledResponse.Responses { + name := ObjectName(resp.Data.Resource) + RemoteSubsType(remoteSubscriptions).Remove(name, func(subscription SubscriptionResource) bool { + return subscription.ID == SubscriptionID(requestID) + }) + } + + status := common.SubscriptionStatusSuccess + if outErr != nil { + // The rollback does not happen for delete. So it is either Success or Failure. + status = common.SubscriptionStatusFailed + } + + return &common.SubscriptionResult{ + Result: Output{}, + ObjectEvents: remoteSubscriptions.toState(), + Status: status, + }, outErr +} + +// removeSubscriptionsByIDs executes batch DELETE for given subscription IDs. +// Generic helper; returns raw batch result for custom error handling. func (s Strategy) removeSubscriptionsByIDs( ctx context.Context, identifiers []SubscriptionID, ) (*batch.Result[any], error) { @@ -20,6 +94,7 @@ func (s Strategy) removeSubscriptionsByIDs( return bundledResponse, nil } +// paramsForBatchRemoveSubscriptionsByIDs creates batch parameters for DELETE `/subscriptions/{id}` requests. func (s Strategy) paramsForBatchRemoveSubscriptionsByIDs(identifiers []SubscriptionID) (*batch.Params, error) { batchParams := &batch.Params{} diff --git a/providers/microsoft/internal/subscriber/delete_test.go b/providers/microsoft/internal/subscriber/delete_test.go new file mode 100644 index 0000000000..7b0747027f --- /dev/null +++ b/providers/microsoft/internal/subscriber/delete_test.go @@ -0,0 +1,64 @@ +package subscriber + +import ( + "net/http" + "testing" + + "github.com/amp-labs/connectors/common" + "github.com/amp-labs/connectors/internal/components" + "github.com/amp-labs/connectors/test/utils/mockutils/mockcond" + "github.com/amp-labs/connectors/test/utils/mockutils/mockserver" + "github.com/amp-labs/connectors/test/utils/testroutines" + "github.com/amp-labs/connectors/test/utils/testutils" +) + +func TestDelete(t *testing.T) { // nolint:funlen,cyclop + t.Parallel() + + responseReadSubscriptions := testutils.DataFromFile(t, "read/subscriptions-response.json") + responseDeleteSubscriptions := testutils.DataFromFile(t, "delete/subscriptions-response.json") + deleteMessage1 := testutils.DataFromFile(t, "delete/payload-message-1.json") + deleteMessage2 := testutils.DataFromFile(t, "delete/payload-message-2.json") + deleteMessage3 := testutils.DataFromFile(t, "delete/payload-message-3.json") + + tests := []testroutines.DeleteSubscription{ + { + Name: "Successfully remove subscriptions to Outlook messages.", + Input: common.SubscriptionResult{ + ObjectEvents: map[ObjectName]common.ObjectEvents{ + "me/messages": {}, + }, + }, + Server: mockserver.Switch{ + Setup: mockserver.ContentJSON(), + Cases: mockserver.Cases{{ + If: mockcond.And{ + mockcond.MethodGET(), + mockcond.Path("/v1.0/subscriptions"), + }, + Then: mockserver.Response(http.StatusOK, responseReadSubscriptions), + }, { + If: mockcond.And{ + mockcond.MethodPOST(), + mockcond.Path("/v1.0/$batch"), + payloadBatchRequests(deleteMessage1, deleteMessage2, deleteMessage3), + }, + Then: mockserver.Response(http.StatusNoContent, responseDeleteSubscriptions), + }}, + }.Server(), + Expected: testroutines.None{}, + ExpectedErrs: nil, + }, + } + + for _, tt := range tests { // nolint:dupl + // nolint:varnamelen + t.Run(tt.Name, func(t *testing.T) { + t.Parallel() + + tt.Run(t, func() (components.SubscriptionRemover, error) { + return constructTestStrategy(tt.Server.URL) + }) + }) + } +} diff --git a/providers/microsoft/internal/subscriber/state.go b/providers/microsoft/internal/subscriber/state.go index f36bd723ad..a36223c292 100644 --- a/providers/microsoft/internal/subscriber/state.go +++ b/providers/microsoft/internal/subscriber/state.go @@ -1,10 +1,18 @@ package subscriber import ( + "context" + "github.com/amp-labs/connectors/common" + "github.com/amp-labs/connectors/internal/datautils" ) type ( + // RemoteSubscriptions describes the actual state of subscriptions in the remote system. + // Each ObjectName maps to a list of remote SubscriptionResource items. + RemoteSubscriptions RemoteSubsType + RemoteSubsType = datautils.IndexedLists[ObjectName, SubscriptionResource] + // State describes the set of object events that the connector should // try to achieve (desired) and that are also reported back as the observed outcome. // @@ -22,3 +30,97 @@ func newState(objectNames []ObjectName) State { return events } + +func (r RemoteSubscriptions) toState() State { + result := make(State) + + for objectName, subscriptions := range r { + eventSet := datautils.NewSet[common.SubscriptionEventType]() + + for _, subscription := range subscriptions { + eventSet.Add(subscription.ChangeType.EventTypes()) + } + + // List of events may be nil. + var events []common.SubscriptionEventType + if list := eventSet.List(); len(list) != 0 { + events = list + } + + result[objectName] = common.ObjectEvents{ + Events: events, + WatchFields: nil, + WatchFieldsAll: false, + PassThroughEvents: nil, + } + } + + return result +} + +func (r RemoteSubscriptions) subset(objects []ObjectName) RemoteSubscriptions { + result := make(RemoteSubscriptions) + + names := datautils.NewSetFromList(objects) + for name, subscriptions := range r { + if names.Has(name) { + for _, subscription := range subscriptions { + RemoteSubsType(result).Add(name, subscription) + } + } + } + + return result +} + +func (r RemoteSubscriptions) getIDs() []SubscriptionID { + result := make([]SubscriptionID, 0) + + for _, subscriptions := range r { + for _, subscription := range subscriptions { + result = append(result, subscription.ID) + } + } + + return result +} + +// fetchSubscriptions retrieves the current Microsoft Graph change‑notification subscriptions for the given objects. +// +// nolint:lll +// According to https://learn.microsoft.com/en-us/graph/change-notifications-delivery-webhooks?tabs=http#subscription-request, +// Graph should guard against duplicate subscriptions (same changeType and resource), but in practice +// multiple subscriptions for the same combination can exist. Therefore, before blindly creating, +// updating, or deleting subscriptions, this method first reads the current state and allows the caller +// to reconcile the desired vs. actual subscriptions. +func (s Strategy) fetchSubscriptions(ctx context.Context) (RemoteSubscriptions, error) { + url, err := s.getSubscriptionURL() + if err != nil { + return nil, err + } + + response, err := s.client.Get(ctx, url.String()) + if err != nil { + return nil, err + } + + subscriptions, err := common.UnmarshalJSON[subscriptionResources](response) + if err != nil { + return nil, err + } + + result := make(RemoteSubscriptions) + + for _, resource := range subscriptions.List { + objectName := ObjectName(resource.Resource) + RemoteSubsType(result).Add(objectName, resource) + } + + return result, nil +} + +// subscriptionResources is the output of "GET /subscriptions". +// https://learn.microsoft.com/en-us/graph/api/subscription-list?view=graph-rest-1.0&tabs=http#response-1 +type subscriptionResources struct { + List []SubscriptionResource `json:"value"` +} diff --git a/providers/microsoft/internal/subscriber/test/delete/payload-message-1.json b/providers/microsoft/internal/subscriber/test/delete/payload-message-1.json new file mode 100644 index 0000000000..a870c88261 --- /dev/null +++ b/providers/microsoft/internal/subscriber/test/delete/payload-message-1.json @@ -0,0 +1,8 @@ +{ + "id": "c27d2493-0518-48db-b994-6d43aa584355", + "method": "DELETE", + "url": "/subscriptions/c27d2493-0518-48db-b994-6d43aa584355", + "headers": { + "Content-Type": "application/json" + } +} diff --git a/providers/microsoft/internal/subscriber/test/delete/payload-message-2.json b/providers/microsoft/internal/subscriber/test/delete/payload-message-2.json new file mode 100644 index 0000000000..751297b29a --- /dev/null +++ b/providers/microsoft/internal/subscriber/test/delete/payload-message-2.json @@ -0,0 +1,8 @@ +{ + "id": "29772d64-ee45-4e64-ab82-481602e07bc2", + "method": "DELETE", + "url": "/subscriptions/29772d64-ee45-4e64-ab82-481602e07bc2", + "headers": { + "Content-Type": "application/json" + } +} diff --git a/providers/microsoft/internal/subscriber/test/delete/payload-message-3.json b/providers/microsoft/internal/subscriber/test/delete/payload-message-3.json new file mode 100644 index 0000000000..87bda1c2d8 --- /dev/null +++ b/providers/microsoft/internal/subscriber/test/delete/payload-message-3.json @@ -0,0 +1,8 @@ +{ + "id": "90b46999-27c7-4145-a409-a5ef18250522", + "method": "DELETE", + "url": "/subscriptions/90b46999-27c7-4145-a409-a5ef18250522", + "headers": { + "Content-Type": "application/json" + } +} diff --git a/providers/microsoft/internal/subscriber/test/delete/subscriptions-response.json b/providers/microsoft/internal/subscriber/test/delete/subscriptions-response.json new file mode 100644 index 0000000000..29762be8a4 --- /dev/null +++ b/providers/microsoft/internal/subscriber/test/delete/subscriptions-response.json @@ -0,0 +1,12 @@ +{ + "responses": [ + { + "id": "09ac29ca-cb8b-4761-94ab-ce372fd6fce3", + "status": 204, + "headers": { + "Cache-Control": "no-cache" + }, + "body": null + } + ] +} diff --git a/providers/microsoft/internal/subscriber/test/read/subscriptions-response.json b/providers/microsoft/internal/subscriber/test/read/subscriptions-response.json new file mode 100644 index 0000000000..bfd30ec512 --- /dev/null +++ b/providers/microsoft/internal/subscriber/test/read/subscriptions-response.json @@ -0,0 +1,90 @@ +{ + "@odata.context": "https://graph.microsoft.com/v1.0/$metadata#subscriptions", + "value": [ + { + "id": "90b46999-27c7-4145-a409-a5ef18250522", + "resource": "me/messages", + "applicationId": "39d7eec8-2032-4dd3-959f-d92d8a4e2ad3", + "changeType": "created", + "clientState": null, + "notificationUrl": "https://b146-46-150-81-5.ngrok-free.app", + "notificationQueryOptions": null, + "lifecycleNotificationUrl": null, + "expirationDateTime": "2026-04-27T18:06:10Z", + "creatorId": "ae87552f-48fc-4dec-9322-65040bf9fdfd", + "includeResourceData": null, + "latestSupportedTlsVersion": "v1_2", + "encryptionCertificate": null, + "encryptionCertificateId": null, + "notificationUrlAppId": null + }, + { + "id": "29772d64-ee45-4e64-ab82-481602e07bc2", + "resource": "me/messages", + "applicationId": "39d7eec8-2032-4dd3-959f-d92d8a4e2ad3", + "changeType": "created", + "clientState": null, + "notificationUrl": "https://b146-46-150-81-5.ngrok-free.app", + "notificationQueryOptions": null, + "lifecycleNotificationUrl": null, + "expirationDateTime": "2026-04-27T18:57:57Z", + "creatorId": "ae87552f-48fc-4dec-9322-65040bf9fdfd", + "includeResourceData": null, + "latestSupportedTlsVersion": "v1_2", + "encryptionCertificate": null, + "encryptionCertificateId": null, + "notificationUrlAppId": null + }, + { + "id": "7dac694c-f614-4496-878e-fa625196149d", + "resource": "me/events", + "applicationId": "39d7eec8-2032-4dd3-959f-d92d8a4e2ad3", + "changeType": "created,deleted", + "clientState": null, + "notificationUrl": "https://651c-46-150-81-5.ngrok-free.app/", + "notificationQueryOptions": null, + "lifecycleNotificationUrl": null, + "expirationDateTime": "2026-04-30T10:00:16.1003133Z", + "creatorId": "ae87552f-48fc-4dec-9322-65040bf9fdfd", + "includeResourceData": null, + "latestSupportedTlsVersion": "v1_2", + "encryptionCertificate": null, + "encryptionCertificateId": null, + "notificationUrlAppId": null + }, + { + "id": "c27d2493-0518-48db-b994-6d43aa584355", + "resource": "me/messages", + "applicationId": "39d7eec8-2032-4dd3-959f-d92d8a4e2ad3", + "changeType": "created", + "clientState": null, + "notificationUrl": "https://b146-46-150-81-5.ngrok-free.app", + "notificationQueryOptions": null, + "lifecycleNotificationUrl": null, + "expirationDateTime": "2026-04-27T18:58:40Z", + "creatorId": "ae87552f-48fc-4dec-9322-65040bf9fdfd", + "includeResourceData": null, + "latestSupportedTlsVersion": "v1_2", + "encryptionCertificate": null, + "encryptionCertificateId": null, + "notificationUrlAppId": null + }, + { + "id": "63b01115-ba3f-4db6-a1ef-793797ec340a", + "resource": "me/events", + "applicationId": "39d7eec8-2032-4dd3-959f-d92d8a4e2ad3", + "changeType": "created,deleted", + "clientState": null, + "notificationUrl": "https://651c-46-150-81-5.ngrok-free.app/", + "notificationQueryOptions": null, + "lifecycleNotificationUrl": null, + "expirationDateTime": "2026-04-30T23:00:16.8634764Z", + "creatorId": "ae87552f-48fc-4dec-9322-65040bf9fdfd", + "includeResourceData": null, + "latestSupportedTlsVersion": "v1_2", + "encryptionCertificate": null, + "encryptionCertificateId": null, + "notificationUrlAppId": null + } + ] +} diff --git a/test/microsoft/subscription/outlook-events/main.go b/test/microsoft/subscription/outlook-events/main.go index f7176832a7..0d3f332dd1 100644 --- a/test/microsoft/subscription/outlook-events/main.go +++ b/test/microsoft/subscription/outlook-events/main.go @@ -68,8 +68,9 @@ func main() { }, }, }, - WebhookRouter: subscription.NewWebhookRouter(), - VerificationParams: nil, + WebhookRouter: subscription.NewWebhookRouter(), + VerificationParams: nil, + AutoRemoveSubscription: true, }, ) } diff --git a/test/microsoft/subscription/outlook-messages/main.go b/test/microsoft/subscription/outlook-messages/main.go index 2cf476b918..0dcd6b9826 100644 --- a/test/microsoft/subscription/outlook-messages/main.go +++ b/test/microsoft/subscription/outlook-messages/main.go @@ -86,8 +86,9 @@ func main() { }, }, }, - WebhookRouter: subscription.NewWebhookRouter(), - VerificationParams: nil, + WebhookRouter: subscription.NewWebhookRouter(), + VerificationParams: nil, + AutoRemoveSubscription: true, }, ) } diff --git a/test/utils/testroutines/subscription-delete.go b/test/utils/testroutines/subscription-delete.go new file mode 100644 index 0000000000..1fabe6bd84 --- /dev/null +++ b/test/utils/testroutines/subscription-delete.go @@ -0,0 +1,33 @@ +package testroutines + +import ( + "testing" + + "github.com/amp-labs/connectors/common" + "github.com/amp-labs/connectors/internal/components" +) + +type ( + DeleteSubscriptionType = TestCase[common.SubscriptionResult, None] + // DeleteSubscription is a test suite useful for testing part of connectors.SubscribeConnector interface. + DeleteSubscription DeleteSubscriptionType +) + +type DeleteSubscriptionParams struct { + Params common.SubscribeParams + PreviousResult *common.SubscriptionResult +} + +// Run provides a procedure to test connectors.SubscribeConnector +func (s DeleteSubscription) Run(t *testing.T, builder ConnectorBuilder[components.SubscriptionRemover]) { + t.Helper() + t.Cleanup(func() { + DeleteSubscriptionType(s).Close() + }) + + s.Expected = None{} + + conn := builder.Build(t, s.Name) + err := conn.DeleteSubscription(t.Context(), s.Input) + DeleteSubscriptionType(s).Validate(t, err, None{}) +} diff --git a/test/utils/testscenario/subscription-messaging.go b/test/utils/testscenario/subscription-messaging.go index b056cd7697..54b4bb4fd7 100644 --- a/test/utils/testscenario/subscription-messaging.go +++ b/test/utils/testscenario/subscription-messaging.go @@ -42,6 +42,10 @@ type SubscribeReceiveEventsSuite struct { // script cancellation. WebhookRouter WebhookRouter VerificationParams *common.VerificationParams + // AutoRemoveSubscription, if true, removes at the end of script execution any subscriptions + // that were created as part of this test run. If false, subscriptions are left in place + // and must be cleaned up manually. + AutoRemoveSubscription bool } type ConnectorMethod string @@ -129,6 +133,9 @@ func ValidateSubscribeReceiveEvents( return } + // Register a defer function to clean up the successful subscription at the end of the script. + defer cleanupSubscription(ctx, conn, suite, subscriptionResult)() + fmt.Println("============== Invoking connector.Write/Delete() ==================") for _, trigger := range suite.Operations { switch trigger.Method { @@ -215,3 +222,33 @@ func searchForRecord( return objectID, true } + +func cleanupSubscription(ctx context.Context, + conn ConnectorWebhookSubscriber, suite SubscribeReceiveEventsSuite, + subscriptionResult *common.SubscriptionResult, +) func() { + return func() { + if !suite.AutoRemoveSubscription { + fmt.Println( + "REMINDER: subscription is still active and must be removed manually.\n" + + "To automate cleanup in the future, enable the `AutoRemoveSubscription` option.", + ) + return + } + + remover, ok := conn.(components.SubscriptionRemover) + if !ok { + fmt.Println( + "REMINDER: subscription is still active and must be removed manually.\n" + + "The connector does not yet implement `components.SubscriptionRemover`.", + ) + return + } + + fmt.Println("[CLEANUP] Removing subscription.") + err := remover.DeleteSubscription(ctx, *subscriptionResult) + if !printError(err) { + fmt.Println("[CLEANUP] Subscription removed.") + } + } +}