Skip to content

Commit d87e1ab

Browse files
committed
[ENG-3815] feat(microsoft): Impl Delete Subscription
1 parent 8adbb04 commit d87e1ab

13 files changed

Lines changed: 469 additions & 4 deletions

File tree

internal/datautils/lists.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,32 @@ func (l IndexedLists[ID, V]) Add(bucket ID, objects ...V) {
2020
l[bucket] = append(l[bucket], objects...)
2121
}
2222

23+
// Remove will remove all elements in the list under the bucket matching the filter.
24+
// A bucket may have a list with no items. If you need to remove the bucket itself, do it explicitly.
25+
func (l IndexedLists[ID, V]) Remove(bucket ID, filter func(V) bool) {
26+
items, exists := l[bucket]
27+
if !exists || len(items) == 0 {
28+
return
29+
}
30+
31+
// Reuse underlying array to avoid allocations
32+
filtered := items[:0]
33+
34+
for _, item := range items {
35+
if !filter(item) {
36+
filtered = append(filtered, item)
37+
}
38+
}
39+
40+
// Clear trailing references.
41+
var zero V
42+
for i := len(filtered); i < len(items); i++ {
43+
items[i] = zero
44+
}
45+
46+
l[bucket] = filtered
47+
}
48+
2349
func (l NamedLists[V]) Add(bucket string, objects ...V) {
2450
IndexedLists[string, V](l).Add(bucket, objects...)
2551
}

providers/microsoft/internal/subscriber/delete.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,85 @@ package subscriber
22

33
import (
44
"context"
5+
"errors"
56
"net/http"
67

8+
"github.com/amp-labs/connectors/common"
79
"github.com/amp-labs/connectors/providers/microsoft/internal/batch"
810
)
911

12+
// DeleteSubscription removes all remote subscriptions for objects specified in previousResult.
13+
// Executes batch DELETE requests to Microsoft Graph, tolerating 404s (already deleted) as success.
14+
// Discards final state as delete operations are fire-and-forget; returns first error if any.
15+
// https://learn.microsoft.com/en-us/graph/change-notifications-delivery-webhooks?tabs=http#delete-a-subscription
16+
func (s Strategy) DeleteSubscription(
17+
ctx context.Context,
18+
previousResult common.SubscriptionResult,
19+
) error {
20+
remoteSubscriptions, err := s.fetchSubscriptions(ctx)
21+
if err != nil {
22+
return err
23+
}
24+
25+
subscriptionsForRemoval := remoteSubscriptions.subset(previousResult.ObjectNames())
26+
27+
// The state is discarded.
28+
_, err = s.deleteSubscriptions(ctx, subscriptionsForRemoval)
29+
30+
return err
31+
}
32+
33+
// deleteSubscriptions batch-deletes all subscriptions in the given RemoteSubscriptions set.
34+
// Handles 404 responses as success (already deleted); aggregates other errors.
35+
//
36+
// Returns updated SubscriptionResult reflecting remaining subscriptions and operation status.
37+
func (s Strategy) deleteSubscriptions(
38+
ctx context.Context,
39+
remoteSubscriptions RemoteSubscriptions,
40+
) (*common.SubscriptionResult, error) {
41+
subscriptionsToRemove := remoteSubscriptions.getIDs()
42+
43+
batchParams, err := s.paramsForBatchRemoveSubscriptionsByIDs(subscriptionsToRemove)
44+
if err != nil {
45+
return nil, err
46+
}
47+
48+
bundledResponse := batch.Execute[SubscriptionResource](ctx, s.batchStrategy, batchParams)
49+
50+
// Aggregate non-404 errors (404 indicates record was already deleted).
51+
var outErr error
52+
53+
for _, e := range bundledResponse.Errors {
54+
if e.Status != http.StatusNotFound {
55+
// Resource no longer exists.
56+
} else {
57+
outErr = errors.Join(outErr, e.Data)
58+
}
59+
}
60+
61+
// Filter out the subscription that were removed if any are left .
62+
for requestID, resp := range bundledResponse.Responses {
63+
name := ObjectName(resp.Data.Resource)
64+
RemoteSubsType(remoteSubscriptions).Remove(name, func(subscription SubscriptionResource) bool {
65+
return subscription.ID == SubscriptionID(requestID)
66+
})
67+
}
68+
69+
status := common.SubscriptionStatusSuccess
70+
if outErr != nil {
71+
// The rollback does not happen for delete. So it is either Success or Failure.
72+
status = common.SubscriptionStatusFailed
73+
}
74+
75+
return &common.SubscriptionResult{
76+
Result: Output{},
77+
ObjectEvents: remoteSubscriptions.toState(),
78+
Status: status,
79+
}, outErr
80+
}
81+
82+
// removeSubscriptionsByIDs executes batch DELETE for given subscription IDs.
83+
// Generic helper; returns raw batch result for custom error handling.
1084
func (s Strategy) removeSubscriptionsByIDs(
1185
ctx context.Context, identifiers []SubscriptionID,
1286
) (*batch.Result[any], error) {
@@ -20,6 +94,7 @@ func (s Strategy) removeSubscriptionsByIDs(
2094
return bundledResponse, nil
2195
}
2296

97+
// paramsForBatchRemoveSubscriptionsByIDs creates batch parameters for DELETE `/subscriptions/{id}` requests.
2398
func (s Strategy) paramsForBatchRemoveSubscriptionsByIDs(identifiers []SubscriptionID) (*batch.Params, error) {
2499
batchParams := &batch.Params{}
25100

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package subscriber
2+
3+
import (
4+
"net/http"
5+
"testing"
6+
7+
"github.com/amp-labs/connectors/common"
8+
"github.com/amp-labs/connectors/internal/components"
9+
"github.com/amp-labs/connectors/test/utils/mockutils/mockcond"
10+
"github.com/amp-labs/connectors/test/utils/mockutils/mockserver"
11+
"github.com/amp-labs/connectors/test/utils/testroutines"
12+
"github.com/amp-labs/connectors/test/utils/testutils"
13+
)
14+
15+
func TestDelete(t *testing.T) { // nolint:funlen,cyclop
16+
t.Parallel()
17+
18+
responseReadSubscriptions := testutils.DataFromFile(t, "read/subscriptions-response.json")
19+
responseDeleteSubscriptions := testutils.DataFromFile(t, "delete/subscriptions-response.json")
20+
deleteMessage1 := testutils.DataFromFile(t, "delete/payload-message-1.json")
21+
deleteMessage2 := testutils.DataFromFile(t, "delete/payload-message-2.json")
22+
deleteMessage3 := testutils.DataFromFile(t, "delete/payload-message-3.json")
23+
24+
tests := []testroutines.DeleteSubscription{
25+
{
26+
Name: "Successfully remove subscriptions to Outlook messages.",
27+
Input: common.SubscriptionResult{
28+
ObjectEvents: map[ObjectName]common.ObjectEvents{
29+
"me/messages": {},
30+
},
31+
},
32+
Server: mockserver.Switch{
33+
Setup: mockserver.ContentJSON(),
34+
Cases: mockserver.Cases{{
35+
If: mockcond.And{
36+
mockcond.MethodGET(),
37+
mockcond.Path("/v1.0/subscriptions"),
38+
},
39+
Then: mockserver.Response(http.StatusOK, responseReadSubscriptions),
40+
}, {
41+
If: mockcond.And{
42+
mockcond.MethodPOST(),
43+
mockcond.Path("/v1.0/$batch"),
44+
payloadBatchRequests(deleteMessage1, deleteMessage2, deleteMessage3),
45+
},
46+
Then: mockserver.Response(http.StatusNoContent, responseDeleteSubscriptions),
47+
}},
48+
}.Server(),
49+
Expected: testroutines.None{},
50+
ExpectedErrs: nil,
51+
},
52+
}
53+
54+
for _, tt := range tests { // nolint:dupl
55+
// nolint:varnamelen
56+
t.Run(tt.Name, func(t *testing.T) {
57+
t.Parallel()
58+
59+
tt.Run(t, func() (components.SubscriptionRemover, error) {
60+
return constructTestStrategy(tt.Server.URL)
61+
})
62+
})
63+
}
64+
}

providers/microsoft/internal/subscriber/state.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,18 @@
11
package subscriber
22

33
import (
4+
"context"
5+
46
"github.com/amp-labs/connectors/common"
7+
"github.com/amp-labs/connectors/internal/datautils"
58
)
69

710
type (
11+
// RemoteSubscriptions describes the actual state of subscriptions in the remote system.
12+
// Each ObjectName maps to a list of remote SubscriptionResource items.
13+
RemoteSubscriptions RemoteSubsType
14+
RemoteSubsType = datautils.IndexedLists[ObjectName, SubscriptionResource]
15+
816
// State describes the set of object events that the connector should
917
// try to achieve (desired) and that are also reported back as the observed outcome.
1018
//
@@ -22,3 +30,97 @@ func newState(objectNames []ObjectName) State {
2230

2331
return events
2432
}
33+
34+
func (r RemoteSubscriptions) toState() State {
35+
result := make(State)
36+
37+
for objectName, subscriptions := range r {
38+
eventSet := datautils.NewSet[common.SubscriptionEventType]()
39+
40+
for _, subscription := range subscriptions {
41+
eventSet.Add(subscription.ChangeType.EventTypes())
42+
}
43+
44+
// List of events may be nil.
45+
var events []common.SubscriptionEventType
46+
if list := eventSet.List(); len(list) != 0 {
47+
events = list
48+
}
49+
50+
result[objectName] = common.ObjectEvents{
51+
Events: events,
52+
WatchFields: nil,
53+
WatchFieldsAll: false,
54+
PassThroughEvents: nil,
55+
}
56+
}
57+
58+
return result
59+
}
60+
61+
func (r RemoteSubscriptions) subset(objects []ObjectName) RemoteSubscriptions {
62+
result := make(RemoteSubscriptions)
63+
64+
names := datautils.NewSetFromList(objects)
65+
for name, subscriptions := range r {
66+
if names.Has(name) {
67+
for _, subscription := range subscriptions {
68+
RemoteSubsType(result).Add(name, subscription)
69+
}
70+
}
71+
}
72+
73+
return result
74+
}
75+
76+
func (r RemoteSubscriptions) getIDs() []SubscriptionID {
77+
result := make([]SubscriptionID, 0)
78+
79+
for _, subscriptions := range r {
80+
for _, subscription := range subscriptions {
81+
result = append(result, subscription.ID)
82+
}
83+
}
84+
85+
return result
86+
}
87+
88+
// fetchSubscriptions retrieves the current Microsoft Graph change‑notification subscriptions for the given objects.
89+
//
90+
// nolint:lll
91+
// According to https://learn.microsoft.com/en-us/graph/change-notifications-delivery-webhooks?tabs=http#subscription-request,
92+
// Graph should guard against duplicate subscriptions (same changeType and resource), but in practice
93+
// multiple subscriptions for the same combination can exist. Therefore, before blindly creating,
94+
// updating, or deleting subscriptions, this method first reads the current state and allows the caller
95+
// to reconcile the desired vs. actual subscriptions.
96+
func (s Strategy) fetchSubscriptions(ctx context.Context) (RemoteSubscriptions, error) {
97+
url, err := s.getSubscriptionURL()
98+
if err != nil {
99+
return nil, err
100+
}
101+
102+
response, err := s.client.Get(ctx, url.String())
103+
if err != nil {
104+
return nil, err
105+
}
106+
107+
subscriptions, err := common.UnmarshalJSON[subscriptionResources](response)
108+
if err != nil {
109+
return nil, err
110+
}
111+
112+
result := make(RemoteSubscriptions)
113+
114+
for _, resource := range subscriptions.List {
115+
objectName := ObjectName(resource.Resource)
116+
RemoteSubsType(result).Add(objectName, resource)
117+
}
118+
119+
return result, nil
120+
}
121+
122+
// subscriptionResources is the output of "GET /subscriptions".
123+
// https://learn.microsoft.com/en-us/graph/api/subscription-list?view=graph-rest-1.0&tabs=http#response-1
124+
type subscriptionResources struct {
125+
List []SubscriptionResource `json:"value"`
126+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
{
2+
"id": "c27d2493-0518-48db-b994-6d43aa584355",
3+
"method": "DELETE",
4+
"url": "/subscriptions/c27d2493-0518-48db-b994-6d43aa584355",
5+
"headers": {
6+
"Content-Type": "application/json"
7+
}
8+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
{
2+
"id": "29772d64-ee45-4e64-ab82-481602e07bc2",
3+
"method": "DELETE",
4+
"url": "/subscriptions/29772d64-ee45-4e64-ab82-481602e07bc2",
5+
"headers": {
6+
"Content-Type": "application/json"
7+
}
8+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
{
2+
"id": "90b46999-27c7-4145-a409-a5ef18250522",
3+
"method": "DELETE",
4+
"url": "/subscriptions/90b46999-27c7-4145-a409-a5ef18250522",
5+
"headers": {
6+
"Content-Type": "application/json"
7+
}
8+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"responses": [
3+
{
4+
"id": "09ac29ca-cb8b-4761-94ab-ce372fd6fce3",
5+
"status": 204,
6+
"headers": {
7+
"Cache-Control": "no-cache"
8+
},
9+
"body": null
10+
}
11+
]
12+
}

0 commit comments

Comments
 (0)