Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions internal/datautils/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,32 @@ func (l IndexedLists[ID, V]) Add(bucket ID, objects ...V) {
l[bucket] = append(l[bucket], objects...)
}

// Remove will remove all elements in the list under the bucket matching the filter.
// A bucket may have a list with no items. If you need to remove the bucket itself, do it explicitly.
func (l IndexedLists[ID, V]) Remove(bucket ID, filter func(V) bool) {
items, exists := l[bucket]
if !exists || len(items) == 0 {
return
}

// Reuse underlying array to avoid allocations
filtered := items[:0]

for _, item := range items {
if !filter(item) {
filtered = append(filtered, item)
}
}

// Clear trailing references.
var zero V
for i := len(filtered); i < len(items); i++ {
items[i] = zero
}

l[bucket] = filtered
}

func (l NamedLists[V]) Add(bucket string, objects ...V) {
IndexedLists[string, V](l).Add(bucket, objects...)
}
Expand Down
75 changes: 75 additions & 0 deletions providers/microsoft/internal/subscriber/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,85 @@ package subscriber

import (
"context"
"errors"
"net/http"

"github.com/amp-labs/connectors/common"
"github.com/amp-labs/connectors/providers/microsoft/internal/batch"
)

// DeleteSubscription removes all remote subscriptions for objects specified in previousResult.
// Executes batch DELETE requests to Microsoft Graph, tolerating 404s (already deleted) as success.
// Discards final state as delete operations are fire-and-forget; returns first error if any.
// https://learn.microsoft.com/en-us/graph/change-notifications-delivery-webhooks?tabs=http#delete-a-subscription
func (s Strategy) DeleteSubscription(
ctx context.Context,
previousResult common.SubscriptionResult,
) error {
remoteSubscriptions, err := s.fetchSubscriptions(ctx)
if err != nil {
return err
}

subscriptionsForRemoval := remoteSubscriptions.subset(previousResult.ObjectNames())

// The state is discarded.
_, err = s.deleteSubscriptions(ctx, subscriptionsForRemoval)

return err
}

// deleteSubscriptions batch-deletes all subscriptions in the given RemoteSubscriptions set.
// Handles 404 responses as success (already deleted); aggregates other errors.
//
// Returns updated SubscriptionResult reflecting remaining subscriptions and operation status.
func (s Strategy) deleteSubscriptions(
ctx context.Context,
remoteSubscriptions RemoteSubscriptions,
) (*common.SubscriptionResult, error) {
subscriptionsToRemove := remoteSubscriptions.getIDs()

batchParams, err := s.paramsForBatchRemoveSubscriptionsByIDs(subscriptionsToRemove)
if err != nil {
return nil, err
}

bundledResponse := batch.Execute[SubscriptionResource](ctx, s.batchStrategy, batchParams)

// Aggregate non-404 errors (404 indicates record was already deleted).
var outErr error

for _, e := range bundledResponse.Errors {
if e.Status != http.StatusNotFound {
// Resource no longer exists.
} else {
outErr = errors.Join(outErr, e.Data)
}
}

// Filter out the subscription that were removed if any are left .
for requestID, resp := range bundledResponse.Responses {
name := ObjectName(resp.Data.Resource)
RemoteSubsType(remoteSubscriptions).Remove(name, func(subscription SubscriptionResource) bool {
return subscription.ID == SubscriptionID(requestID)
})
}

status := common.SubscriptionStatusSuccess
if outErr != nil {
// The rollback does not happen for delete. So it is either Success or Failure.
status = common.SubscriptionStatusFailed
}

return &common.SubscriptionResult{
Result: Output{},
ObjectEvents: remoteSubscriptions.toState(),
Status: status,
}, outErr
}

// removeSubscriptionsByIDs executes batch DELETE for given subscription IDs.
// Generic helper; returns raw batch result for custom error handling.
func (s Strategy) removeSubscriptionsByIDs(
ctx context.Context, identifiers []SubscriptionID,
) (*batch.Result[any], error) {
Expand All @@ -20,6 +94,7 @@ func (s Strategy) removeSubscriptionsByIDs(
return bundledResponse, nil
}

// paramsForBatchRemoveSubscriptionsByIDs creates batch parameters for DELETE `/subscriptions/{id}` requests.
func (s Strategy) paramsForBatchRemoveSubscriptionsByIDs(identifiers []SubscriptionID) (*batch.Params, error) {
batchParams := &batch.Params{}

Expand Down
64 changes: 64 additions & 0 deletions providers/microsoft/internal/subscriber/delete_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package subscriber

import (
"net/http"
"testing"

"github.com/amp-labs/connectors/common"
"github.com/amp-labs/connectors/internal/components"
"github.com/amp-labs/connectors/test/utils/mockutils/mockcond"
"github.com/amp-labs/connectors/test/utils/mockutils/mockserver"
"github.com/amp-labs/connectors/test/utils/testroutines"
"github.com/amp-labs/connectors/test/utils/testutils"
)

func TestDelete(t *testing.T) { // nolint:funlen,cyclop
t.Parallel()

responseReadSubscriptions := testutils.DataFromFile(t, "read/subscriptions-response.json")
responseDeleteSubscriptions := testutils.DataFromFile(t, "delete/subscriptions-response.json")
deleteMessage1 := testutils.DataFromFile(t, "delete/payload-message-1.json")
deleteMessage2 := testutils.DataFromFile(t, "delete/payload-message-2.json")
deleteMessage3 := testutils.DataFromFile(t, "delete/payload-message-3.json")

tests := []testroutines.DeleteSubscription{
{
Name: "Successfully remove subscriptions to Outlook messages.",
Input: common.SubscriptionResult{
ObjectEvents: map[ObjectName]common.ObjectEvents{
"me/messages": {},
},
},
Server: mockserver.Switch{
Setup: mockserver.ContentJSON(),
Cases: mockserver.Cases{{
If: mockcond.And{
mockcond.MethodGET(),
mockcond.Path("/v1.0/subscriptions"),
},
Then: mockserver.Response(http.StatusOK, responseReadSubscriptions),
}, {
If: mockcond.And{
mockcond.MethodPOST(),
mockcond.Path("/v1.0/$batch"),
payloadBatchRequests(deleteMessage1, deleteMessage2, deleteMessage3),
},
Then: mockserver.Response(http.StatusNoContent, responseDeleteSubscriptions),
}},
}.Server(),
Expected: testroutines.None{},
ExpectedErrs: nil,
},
}

for _, tt := range tests { // nolint:dupl
// nolint:varnamelen
t.Run(tt.Name, func(t *testing.T) {
t.Parallel()

tt.Run(t, func() (components.SubscriptionRemover, error) {
return constructTestStrategy(tt.Server.URL)
})
})
}
}
102 changes: 102 additions & 0 deletions providers/microsoft/internal/subscriber/state.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
package subscriber

import (
"context"

"github.com/amp-labs/connectors/common"
"github.com/amp-labs/connectors/internal/datautils"
)

type (
// RemoteSubscriptions describes the actual state of subscriptions in the remote system.
// Each ObjectName maps to a list of remote SubscriptionResource items.
RemoteSubscriptions RemoteSubsType
RemoteSubsType = datautils.IndexedLists[ObjectName, SubscriptionResource]

// State describes the set of object events that the connector should
// try to achieve (desired) and that are also reported back as the observed outcome.
//
Expand All @@ -22,3 +30,97 @@ func newState(objectNames []ObjectName) State {

return events
}

func (r RemoteSubscriptions) toState() State {
result := make(State)

for objectName, subscriptions := range r {
eventSet := datautils.NewSet[common.SubscriptionEventType]()

for _, subscription := range subscriptions {
eventSet.Add(subscription.ChangeType.EventTypes())
}

// List of events may be nil.
var events []common.SubscriptionEventType
if list := eventSet.List(); len(list) != 0 {
events = list
}

result[objectName] = common.ObjectEvents{
Events: events,
WatchFields: nil,
WatchFieldsAll: false,
PassThroughEvents: nil,
}
}

return result
}

func (r RemoteSubscriptions) subset(objects []ObjectName) RemoteSubscriptions {
result := make(RemoteSubscriptions)

names := datautils.NewSetFromList(objects)
for name, subscriptions := range r {
if names.Has(name) {
for _, subscription := range subscriptions {
RemoteSubsType(result).Add(name, subscription)
}
}
}

return result
}

func (r RemoteSubscriptions) getIDs() []SubscriptionID {
result := make([]SubscriptionID, 0)

for _, subscriptions := range r {
for _, subscription := range subscriptions {
result = append(result, subscription.ID)
}
}

return result
}

// fetchSubscriptions retrieves the current Microsoft Graph change‑notification subscriptions for the given objects.
//
// nolint:lll
// According to https://learn.microsoft.com/en-us/graph/change-notifications-delivery-webhooks?tabs=http#subscription-request,
// Graph should guard against duplicate subscriptions (same changeType and resource), but in practice
// multiple subscriptions for the same combination can exist. Therefore, before blindly creating,
// updating, or deleting subscriptions, this method first reads the current state and allows the caller
// to reconcile the desired vs. actual subscriptions.
func (s Strategy) fetchSubscriptions(ctx context.Context) (RemoteSubscriptions, error) {
url, err := s.getSubscriptionURL()
if err != nil {
return nil, err
}

response, err := s.client.Get(ctx, url.String())
if err != nil {
return nil, err
}

subscriptions, err := common.UnmarshalJSON[subscriptionResources](response)
if err != nil {
return nil, err
}

result := make(RemoteSubscriptions)

for _, resource := range subscriptions.List {
objectName := ObjectName(resource.Resource)
RemoteSubsType(result).Add(objectName, resource)
}

return result, nil
}

// subscriptionResources is the output of "GET /subscriptions".
// https://learn.microsoft.com/en-us/graph/api/subscription-list?view=graph-rest-1.0&tabs=http#response-1
type subscriptionResources struct {
List []SubscriptionResource `json:"value"`
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"id": "c27d2493-0518-48db-b994-6d43aa584355",
"method": "DELETE",
"url": "/subscriptions/c27d2493-0518-48db-b994-6d43aa584355",
"headers": {
"Content-Type": "application/json"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"id": "29772d64-ee45-4e64-ab82-481602e07bc2",
"method": "DELETE",
"url": "/subscriptions/29772d64-ee45-4e64-ab82-481602e07bc2",
"headers": {
"Content-Type": "application/json"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"id": "90b46999-27c7-4145-a409-a5ef18250522",
"method": "DELETE",
"url": "/subscriptions/90b46999-27c7-4145-a409-a5ef18250522",
"headers": {
"Content-Type": "application/json"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"responses": [
{
"id": "09ac29ca-cb8b-4761-94ab-ce372fd6fce3",
"status": 204,
"headers": {
"Cache-Control": "no-cache"
},
"body": null
}
]
}
Loading
Loading