diff --git a/internal/datautils/pair.go b/internal/datautils/pair.go index a512ca35e..c5ed6821b 100644 --- a/internal/datautils/pair.go +++ b/internal/datautils/pair.go @@ -4,10 +4,3 @@ type Pair[L, R any] struct { Left L Right R } - -func NewPair[L, R any](left L, right R) *Pair[L, R] { - return &Pair[L, R]{ - Left: left, - Right: right, - } -} diff --git a/providers/livestorm/jsonapi.go b/providers/livestorm/jsonapi.go index 31250358e..6e155535a 100644 --- a/providers/livestorm/jsonapi.go +++ b/providers/livestorm/jsonapi.go @@ -1,6 +1,8 @@ package livestorm import ( + "maps" + "github.com/amp-labs/connectors/internal/jsonquery" "github.com/spyzhov/ajson" ) @@ -70,9 +72,7 @@ func flattenJSONAPIResourceForFields(n *ajson.Node) (map[string]any, error) { return nil, err } - for k, v := range attrMap { - merged[k] = v - } + maps.Copy(merged, attrMap) return merged, nil } diff --git a/providers/microsoft/internal/subscriber/create.go b/providers/microsoft/internal/subscriber/create.go index 16fbcf47a..c5ef29373 100644 --- a/providers/microsoft/internal/subscriber/create.go +++ b/providers/microsoft/internal/subscriber/create.go @@ -222,3 +222,20 @@ func newPayloadCreateSubscription( return body } + +// Update is only relevant for extending the subscription expiration. +// > Updates a subscription expiration time for renewal and/or updates the notificationUrl for delivery. +// https://learn.microsoft.com/en-us/graph/api/resources/subscription?view=graph-rest-1.0#methods +func newPayloadRefreshSubscription( + clock components.Clock, + webhookURL string, +) SubscriptionResource { + fiveHoursFromNow := clock.Now().Add(subscriptionExpirationWindow) + body := SubscriptionResource{ + WebhookURL: webhookURL, + ExpirationDateTime: fiveHoursFromNow, + IncludeResourceData: false, + } + + return body +} diff --git a/providers/microsoft/internal/subscriber/state.go b/providers/microsoft/internal/subscriber/state.go index a36223c29..d16e5563f 100644 --- a/providers/microsoft/internal/subscriber/state.go +++ b/providers/microsoft/internal/subscriber/state.go @@ -2,9 +2,11 @@ package subscriber import ( "context" + "sort" "github.com/amp-labs/connectors/common" "github.com/amp-labs/connectors/internal/datautils" + "github.com/amp-labs/connectors/providers/microsoft/internal/webhook" ) type ( @@ -20,6 +22,22 @@ type ( // with State, and the connector returns the same type reflecting what actually // happened (which may differ on failure or rollback). State map[ObjectName]common.ObjectEvents + + // ReconciliationPlan partitions operations needed to align RemoteSubscriptions with State. + // Groups creates/refreshes/deletes for efficient batching against Microsoft Graph API. + ReconciliationPlan struct { + // Create: objects missing subscriptions -> create new ones with desired ObjectEvents. + Create map[common.ObjectName]common.ObjectEvents + // Refresh: existing subscriptions nearing expiry -> renew before expirationTime. + Refresh map[common.ObjectName]SubscriptionID + // DeleteSubscriptions: objects removed from State -> delete all their remote subscriptions. + // Contains exact current remote state where each object should be removed. + DeleteSubscriptions RemoteSubscriptions + // Extra holds SubscriptionID surplus beyond desired state (Microsoft Graph specific). + // Occurs when single resource has multiple subscriptions; excess should be cleaned up for consistency. + // These are standalone deletes (not object-grouped like DeleteSubscriptions). + Extra []SubscriptionID + } ) func newState(objectNames []ObjectName) State { @@ -31,6 +49,85 @@ func newState(objectNames []ObjectName) State { return events } +// Equals reports if the subscription configuration matches between both states for the given object. +func (s State) Equals(other State, objectName ObjectName) bool { + return webhook.NewChangeType(s[objectName].Events) == webhook.NewChangeType(other[objectName].Events) +} + +// ReconcileTo partitions objects into three disjoint sets by comparing desired +// and actual (remote) state: +// +// - objectsToRemove: present remotely but not desired → DELETE all subscriptions for the object +// - objectsToCreate: desired but not present remotely → CREATE subscriptions for the object +// - objectsToUpdate: present in both → UPDATE existing subscriptions +// +// For Microsoft Graph, intersecting objects are always updated. +// Subscription.ExpirationDateTime must be continuously renewed, +// so there is no no-op path—every existing subscription requires an update. +// +// TODO The subscription may disappear under the hood as Microsoft auto cleans. Need to create a handler for this case. +func (r RemoteSubscriptions) ReconcileTo(state State) ReconciliationPlan { + remoteObjects := datautils.NewSetFromList(RemoteSubsType(r).GetBuckets()) + desiredObjects := datautils.FromMap(state).KeySet() + + objectsToCreate := desiredObjects.Subtract(remoteObjects) + objectsToRemove := remoteObjects.Subtract(desiredObjects) + objectsToUpdate := remoteObjects.Intersection(desiredObjects) + + plan := ReconciliationPlan{ + Create: make(map[common.ObjectName]common.ObjectEvents), + Refresh: make(map[common.ObjectName]SubscriptionID), + DeleteSubscriptions: r.subset(objectsToRemove), + Extra: make([]SubscriptionID, 0), + } + + for _, name := range objectsToCreate { + plan.Create[name] = state[name] + } + + remoteState := r.toState() + for _, name := range objectsToUpdate { + subscriptions := r[name] + + if len(subscriptions) == 0 { + // Impossible. Remote state by definition must have at least one subscription for an object. + continue + } + + if len(subscriptions) > 1 { + sort.Slice(subscriptions, func(i, j int) bool { + return subscriptions[i].ExpirationDateTime.After(subscriptions[j].ExpirationDateTime) + }) + } + + // Keep the first + subscription := subscriptions[0] + + // Remove the rest + for _, extra := range subscriptions[1:] { + // There are multiple subscriptions associated with this object. + // Keep only one of them, others must be removed. + // This could happen due to user manually altering subscriptions + // or any invalid state the connector has put the provider in. + // This is highly unlikely but such possibility is left open. + // Too many subscriptions for a single object. Remove the excess. + plan.Extra = append(plan.Extra, extra.ID) + } + + // Replace subscription with a more desired version. + if remoteState.Equals(state, name) { + plan.Refresh[name] = subscription.ID + } else { + // Create a new subscription which is different from the original. + plan.Create[name] = state[name] + // Mark old subscription for a cleanup. + plan.Extra = append(plan.Extra, subscription.ID) + } + } + + return plan +} + func (r RemoteSubscriptions) toState() State { result := make(State) diff --git a/providers/microsoft/internal/subscriber/test/delete/payload-calendar-event.json b/providers/microsoft/internal/subscriber/test/delete/payload-calendar-event.json new file mode 100644 index 000000000..a02ac5527 --- /dev/null +++ b/providers/microsoft/internal/subscriber/test/delete/payload-calendar-event.json @@ -0,0 +1,8 @@ +{ + "id": "7dac694c-f614-4496-878e-fa625196149d", + "method": "DELETE", + "url": "/subscriptions/7dac694c-f614-4496-878e-fa625196149d", + "headers": { + "Content-Type": "application/json" + } +} diff --git a/providers/microsoft/internal/subscriber/test/patch/calendar-events-response.json b/providers/microsoft/internal/subscriber/test/patch/calendar-events-response.json new file mode 100644 index 000000000..8f7130c9b --- /dev/null +++ b/providers/microsoft/internal/subscriber/test/patch/calendar-events-response.json @@ -0,0 +1,31 @@ +{ + "responses": [ + { + "id": "me/events", + "status": 200, + "headers": { + "Cache-Control": "no-cache", + "OData-Version": "4.0", + "Content-Type": "application/json;odata.metadata=minimal;odata.streaming=true;IEEE754Compatible=false;charset=utf-8" + }, + "body": { + "@odata.context": "https://graph.microsoft.com/v1.0/$metadata#subscriptions/$entity", + "id": "63b01115-ba3f-4db6-a1ef-793797ec340a", + "resource": "me/events", + "applicationId": "39d7eec8-2032-4dd3-959f-d92d8a4e2ad3", + "changeType": "created,deleted", + "clientState": "me/events", + "notificationUrl": "https://651c-46-150-81-5.ngrok-free.app/", + "notificationQueryOptions": null, + "lifecycleNotificationUrl": null, + "expirationDateTime": "2026-05-01T00:09:21.6065208Z", + "creatorId": "ae87552f-48fc-4dec-9322-65040bf9fdfd", + "includeResourceData": null, + "latestSupportedTlsVersion": "v1_2", + "encryptionCertificate": null, + "encryptionCertificateId": null, + "notificationUrlAppId": null + } + } + ] +} diff --git a/providers/microsoft/internal/subscriber/update.go b/providers/microsoft/internal/subscriber/update.go new file mode 100644 index 000000000..f36852cbf --- /dev/null +++ b/providers/microsoft/internal/subscriber/update.go @@ -0,0 +1,193 @@ +package subscriber + +import ( + "context" + "errors" + "maps" + "net/http" + + "github.com/amp-labs/connectors/common" + "github.com/amp-labs/connectors/internal/datautils" + "github.com/amp-labs/connectors/providers/microsoft/internal/batch" +) + +// UpdateSubscription reconciles current remote subscriptions with new desired state from params. +// Since Microsoft Graph lacks full subscription updates, it performs targeted create/refresh/delete operations: +// - CREATE missing subscriptions +// - REFRESH (PATCH expirationDateTime/notificationUrl) expiring ones +// - DELETE undesried objects +// - DELETE extra subscriptions ensuring only 1 subscription exists for 1 object. +// +// Returns merged SubscriptionResult reflecting achieved state across all operations. +// +// Microsoft Graph limitation: Only expirationDateTime and notificationUrl are PATCHable. +// https://learn.microsoft.com/en-us/graph/api/resources/subscription?view=graph-rest-1.0#methods +func (s Strategy) UpdateSubscription( + ctx context.Context, + params common.SubscribeParams, + previousResult *common.SubscriptionResult, +) (*common.SubscriptionResult, error) { + // Previous result is not used. + // We must fetch the remote subscriptions anyway to know what Subscription IDs to remove. + _ = previousResult + + input, err := s.TypedSubscriptionRequest(params) + if err != nil { + return nil, err + } + + remoteSubscriptions, err := s.fetchSubscriptions(ctx) + if err != nil { + return nil, err + } + + plan := remoteSubscriptions.ReconcileTo(params.SubscriptionEvents) + + var ( + createResult, refreshResult, deleteResult *common.SubscriptionResult + createErr, refreshErr, deleteErr error + ) + + if len(plan.Create) != 0 { + createResult, createErr = s.createSubscription(ctx, common.SubscribeParams{ + Request: params.Request, + RegistrationResult: params.RegistrationResult, + SubscriptionEvents: plan.Create, + }) + } + + if len(plan.Refresh) != 0 { + refreshResult, refreshErr = s.refreshSubscription(ctx, plan.Refresh, input.WebhookURL) + } + + if len(plan.DeleteSubscriptions) != 0 { + deleteResult, deleteErr = s.deleteSubscriptions(ctx, plan.DeleteSubscriptions) + } + + // Cleanup extra subscriptions to ensure 1 subscription exists for 1 object. + // This step is done at the very end of this function and if it fails it is not a big deal. + // Duplicate events will be sent. This is better than to miss data. + if len(plan.Extra) != 0 { + _, _ = s.removeSubscriptionsByIDs(ctx, plan.Extra) + } + + result := combineResults(createResult, refreshResult, deleteResult) + err = errors.Join(createErr, refreshErr, deleteErr) + + return result, err +} + +// combineResults merges create, refresh, and remove subscription results into a single SubscriptionResult. +// +// Semantics: +// - refresh is fully disjoint from both create and remove. +// - create and remove are not necessarily disjoint. +// - an object appearing in both create and remove represents an update +// (remove old object + create new object). +// +// Merge order is therefore significant: +// - create and refresh are applied first and are considered authoritative. +// - remove is applied only for objects not already present in the merged state. +// +// This ensures that updated objects are preserved and not overwritten by stale +// remove entries. Objects present only in remove represent truly deleted subscriptions +// and will have an empty event list. +// +// Nil inputs are ignored. +func combineResults(create, refresh, remove *common.SubscriptionResult) *common.SubscriptionResult { + var ( + state = make(State) + checklist = make(datautils.Set[common.SubscriptionStatus]) + ) + + if create != nil { + maps.Copy(state, create.ObjectEvents) + checklist.AddOne(create.Status) + } + + if refresh != nil { + maps.Copy(state, refresh.ObjectEvents) + checklist.AddOne(refresh.Status) + } + + if remove != nil { + // Remove entries are only applied for objects that were truly deleted. + // Objects already present in state were recreated as part of an update + // flow (create + remove) and therefore must be preserved. + for objectName, events := range remove.ObjectEvents { + if _, exists := state[objectName]; !exists { + state[objectName] = events + } + } + checklist.AddOne(remove.Status) + } + + // Derives a final Status using pessimistic precedence, where the worst outcome dominates. + var status common.SubscriptionStatus + if _, ok := checklist[common.SubscriptionStatusFailedToRollback]; ok { + status = common.SubscriptionStatusFailedToRollback + } else if _, ok = checklist[common.SubscriptionStatusFailed]; ok { + status = common.SubscriptionStatusFailed + } else if _, ok = checklist[common.SubscriptionStatusSuccess]; ok { + status = common.SubscriptionStatusSuccess + } + + return &common.SubscriptionResult{ + Result: Output{}, + ObjectEvents: state, + Status: status, + } +} + +// refreshSubscription batch-PATCHes expirationDateTime/notificationUrl for subscriptions nearing expiry. +func (s Strategy) refreshSubscription( + ctx context.Context, + refreshPlan map[common.ObjectName]SubscriptionID, + webhookURL string, +) (*common.SubscriptionResult, error) { + batchParams, err := s.paramsForBatchRefreshSubscriptions(refreshPlan, webhookURL) + if err != nil { + return nil, err + } + + bundledResponse := batch.Execute[SubscriptionResource](ctx, s.batchStrategy, batchParams) + state := getStateFromCreateResponse(bundledResponse) + + status := common.SubscriptionStatusSuccess + if len(bundledResponse.Errors) != 0 { + // Some requests have failed. No rollback for the refresh. + // The state must still be the same. + status = common.SubscriptionStatusFailed + } + + return &common.SubscriptionResult{ + Result: Output{}, + ObjectEvents: state, + Status: status, + }, nil +} + +// paramsForBatchRefreshSubscriptions creates PATCH /subscriptions/{id} requests for renewal. +func (s Strategy) paramsForBatchRefreshSubscriptions( + refreshPlan map[common.ObjectName]SubscriptionID, + webhookURL string, +) (*batch.Params, error) { + batchParams := &batch.Params{} + + for objectName, subscriptionID := range refreshPlan { + url, err := s.getSubscriptionURL() + if err != nil { + return nil, err + } + + url.AddPath(string(subscriptionID)) + + requestID := batch.RequestID(objectName) + body := newPayloadRefreshSubscription(s.clock, webhookURL) + batchParams.WithRequest(requestID, http.MethodPatch, url, body, map[string]any{ + "Content-Type": "application/json", + }) + } + + return batchParams, nil +} diff --git a/providers/microsoft/internal/subscriber/update_test.go b/providers/microsoft/internal/subscriber/update_test.go new file mode 100644 index 000000000..e11bb1013 --- /dev/null +++ b/providers/microsoft/internal/subscriber/update_test.go @@ -0,0 +1,152 @@ +package subscriber + +import ( + "net/http" + "testing" + + "github.com/amp-labs/connectors/common" + "github.com/amp-labs/connectors/internal/components" + "github.com/amp-labs/connectors/test/utils/mockutils/mockcond" + "github.com/amp-labs/connectors/test/utils/mockutils/mockserver" + "github.com/amp-labs/connectors/test/utils/testroutines" + "github.com/amp-labs/connectors/test/utils/testutils" +) + +func TestUpdate(t *testing.T) { // nolint:funlen,cyclop + t.Parallel() + + responseReadSubscriptions := testutils.DataFromFile(t, "read/subscriptions-response.json") + responseSubscribeToMessages := testutils.DataFromFile(t, "create/messages.json") + responseDeleteSubscriptions := testutils.DataFromFile(t, "delete/subscriptions-response.json") + responseCalendarEventRefresh := testutils.DataFromFile(t, "patch/calendar-events-response.json") + deleteMessage1 := testutils.DataFromFile(t, "delete/payload-message-1.json") + deleteMessage2 := testutils.DataFromFile(t, "delete/payload-message-2.json") + deleteMessage3 := testutils.DataFromFile(t, "delete/payload-message-3.json") + deleteCalendarEvent := testutils.DataFromFile(t, "delete/payload-calendar-event.json") + + payloadSubscribeToMessages := `{ + "id": "me/messages", + "method": "POST", + "url": "/subscriptions", + "body": { + "changeType": "created,updated,deleted", + "notificationUrl": "https://test.com/webhook", + "resource": "me/messages", + "clientState": "me/messages", + "expirationDateTime": "2026-03-04T10:00:00Z" + }, + "headers": {"Content-Type": "application/json"} + }` + payloadRefreshCalendarEvents := `{ + "id": "me/events", + "method": "PATCH", + "url": "/subscriptions/63b01115-ba3f-4db6-a1ef-793797ec340a", + "body": { + "expirationDateTime": "2026-03-04T10:00:00Z", + "notificationUrl": "https://test.com/webhook" + }, + "headers": { + "Content-Type": "application/json" + } + }` + + tests := []testroutines.UpdateSubscription{ + { + Name: "Subscription to Outlook messages is replaced, Outlook events are refreshed and extra are removed", + Input: testroutines.UpdateSubscriptionParams{ + Params: common.SubscribeParams{ + Request: Input{ + WebhookURL: "https://test.com/webhook", + }, + SubscriptionEvents: map[common.ObjectName]common.ObjectEvents{ + "me/messages": { + Events: []common.SubscriptionEventType{ + common.SubscriptionEventTypeCreate, // exists in GET /subscriptions + common.SubscriptionEventTypeUpdate, // new compared to current GET /subscriptions + common.SubscriptionEventTypeDelete, // new compared to current GET /subscriptions + }, + }, + "me/events": { + Events: []common.SubscriptionEventType{ + common.SubscriptionEventTypeCreate, // exists in GET /subscriptions + common.SubscriptionEventTypeDelete, // exists in GET /subscriptions + }, + }, + }, + }, + PreviousResult: nil, + }, + Server: mockserver.Switch{ + Setup: mockserver.ContentJSON(), + Cases: mockserver.Cases{{ + // Fetch current state for subscriptions. + If: mockcond.And{ + mockcond.MethodGET(), + mockcond.Path("/v1.0/subscriptions"), + }, + Then: mockserver.Response(http.StatusOK, responseReadSubscriptions), + }, { + // Create brand-new subscription to messages. + If: mockcond.And{ + mockcond.MethodPOST(), + mockcond.Path("/v1.0/$batch"), + mockcond.Body(`{"requests": [` + payloadSubscribeToMessages + `]}`), + }, + Then: mockserver.Response(http.StatusCreated, responseSubscribeToMessages), + }, { + // Subscription to Calendar events should be only refreshed, the expiration time prolonged. + // This is because the change type for this object doesn't need updating. + // One subscription is going to expire sooner than the other, we pick the most fresh and refresh it. + If: mockcond.And{ + mockcond.MethodPOST(), + mockcond.Path("/v1.0/$batch"), + mockcond.Body(`{"requests": [` + payloadRefreshCalendarEvents + `]}`), + }, + Then: mockserver.Response(http.StatusCreated, responseCalendarEventRefresh), + }, { + // Every other subscription to messages does not reflect the desired setup, they will be removed. + // We have already created the desired subscription in the step above. + // As for the Calendar events, one subscription is going to expire sooner than the other. + // The most fresh was already refreshed, the other will be removed. + If: mockcond.And{ + mockcond.MethodPOST(), + mockcond.Path("/v1.0/$batch"), + payloadBatchRequests(deleteMessage1, deleteMessage2, deleteMessage3, deleteCalendarEvent), + }, + Then: mockserver.Response(http.StatusNoContent, responseDeleteSubscriptions), + }}, + }.Server(), + Expected: &common.SubscriptionResult{ + Result: Output{}, + ObjectEvents: map[common.ObjectName]common.ObjectEvents{ + "me/messages": { + Events: []common.SubscriptionEventType{ + common.SubscriptionEventTypeCreate, + common.SubscriptionEventTypeUpdate, + common.SubscriptionEventTypeDelete, + }, + }, + "me/events": { + Events: []common.SubscriptionEventType{ + common.SubscriptionEventTypeCreate, + common.SubscriptionEventTypeDelete, + }, + }, + }, + Status: "success", + }, + ExpectedErrs: nil, + }, + } + + for _, tt := range tests { // nolint:dupl + // nolint:varnamelen + t.Run(tt.Name, func(t *testing.T) { + t.Parallel() + + tt.Run(t, func() (components.SubscriptionUpdater, error) { + return constructTestStrategy(tt.Server.URL) + }) + }) + } +} diff --git a/test/microsoft/subscription/update/main.go b/test/microsoft/subscription/update/main.go new file mode 100644 index 000000000..e4869ab00 --- /dev/null +++ b/test/microsoft/subscription/update/main.go @@ -0,0 +1,63 @@ +package main + +import ( + "context" + "os/signal" + "syscall" + + "github.com/amp-labs/connectors/common" + "github.com/amp-labs/connectors/providers/microsoft" + connTest "github.com/amp-labs/connectors/test/microsoft" + "github.com/amp-labs/connectors/test/utils/testscenario" +) + +func main() { + // Handle Ctrl-C gracefully. + ctx, done := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer done() + + conn := connTest.GetMicrosoftGraphConnector(ctx) + + testscenario.SubscriptionCreateUpdateDelete(ctx, conn, + func(webhookURL string) *common.SubscribeParams { + return &common.SubscribeParams{ + Request: microsoft.SubscribeRequest{ + WebhookURL: webhookURL, + }, + SubscriptionEvents: map[common.ObjectName]common.ObjectEvents{ + "me/messages": { + Events: []common.SubscriptionEventType{ + common.SubscriptionEventTypeCreate, + }, + }, + "me/events": { + Events: []common.SubscriptionEventType{ + common.SubscriptionEventTypeCreate, + }, + }, + }, + } + }, + func(webhookURL string) *common.SubscribeParams { + return &common.SubscribeParams{ + Request: microsoft.SubscribeRequest{ + WebhookURL: webhookURL, + }, + SubscriptionEvents: map[common.ObjectName]common.ObjectEvents{ + "me/messages": { + Events: []common.SubscriptionEventType{ + common.SubscriptionEventTypeCreate, + common.SubscriptionEventTypeUpdate, + common.SubscriptionEventTypeDelete, + }, + }, + "me/events": { + Events: []common.SubscriptionEventType{ // no change + common.SubscriptionEventTypeCreate, + }, + }, + }, + } + }, + ) +} diff --git a/test/utils/testroutines/subscription-update.go b/test/utils/testroutines/subscription-update.go new file mode 100644 index 000000000..51356c2d7 --- /dev/null +++ b/test/utils/testroutines/subscription-update.go @@ -0,0 +1,31 @@ +package testroutines + +import ( + "testing" + + "github.com/amp-labs/connectors/common" + "github.com/amp-labs/connectors/internal/components" +) + +type ( + UpdateSubscriptionType = TestCase[UpdateSubscriptionParams, *common.SubscriptionResult] + // UpdateSubscription is a test suite useful for testing part of connectors.SubscribeConnector interface. + UpdateSubscription UpdateSubscriptionType +) + +type UpdateSubscriptionParams struct { + Params common.SubscribeParams + PreviousResult *common.SubscriptionResult +} + +// Run provides a procedure to test connectors.SubscribeConnector +func (s UpdateSubscription) Run(t *testing.T, builder ConnectorBuilder[components.SubscriptionUpdater]) { + t.Helper() + t.Cleanup(func() { + UpdateSubscriptionType(s).Close() + }) + + conn := builder.Build(t, s.Name) + output, err := conn.UpdateSubscription(t.Context(), s.Input.Params, s.Input.PreviousResult) + UpdateSubscriptionType(s).Validate(t, err, output) +} diff --git a/test/utils/testscenario/subscription-cud.go b/test/utils/testscenario/subscription-cud.go new file mode 100644 index 000000000..7db48fe23 --- /dev/null +++ b/test/utils/testscenario/subscription-cud.go @@ -0,0 +1,68 @@ +package testscenario + +import ( + "context" + "errors" + "fmt" + "os" + + "github.com/amp-labs/connectors/common" + "github.com/amp-labs/connectors/internal/components" + "github.com/amp-labs/connectors/test/utils" +) + +type ConnectorSubscriptionManager interface { + components.SubscriptionCreator + components.SubscriptionUpdater + components.SubscriptionRemover +} + +// SubscriptionCreateUpdateDelete is a test scenario utilizing +// Subscribe/UpdateSubscription/DeleteSubscription connector operations. +// Each step will be displayed on the screen and to be analyzed by developer. +func SubscriptionCreateUpdateDelete( + ctx context.Context, conn ConnectorSubscriptionManager, + createParams, updateParams SubscribeParamBuilder, +) { + fmt.Println("> TEST Subscription Create/Update/Delete") + publicURL, ok := getPublicWebhookURL(ctx) + if !ok { + failOnError(errors.New("webhook URL is needed")) + } + + fmt.Println("============= Create =============") + result, err := conn.Subscribe(ctx, *createParams(publicURL)) + if err != nil { + fmt.Println("conn.Subscribe() -> failed") + failOnError(err) + } + validateSubscriptionResult(result) + + fmt.Println("============= Update =============") + result, err = conn.UpdateSubscription(ctx, *updateParams(publicURL), result) + if err != nil { + fmt.Println("conn.UpdateSubscription() -> failed") + failOnError(err) + } + validateSubscriptionResult(result) + + fmt.Println("============= Delete =============") + err = conn.DeleteSubscription(ctx, *result) + if err != nil { + fmt.Println("conn.DeleteSubscription() -> failed") + failOnError(err) + } + + fmt.Println("> Successful test completion") +} + +func validateSubscriptionResult(result *common.SubscriptionResult) { + fmt.Println("(1) Result:") + utils.DumpJSON(result.Result, os.Stdout) + fmt.Println("(2) ObjectEvents:") + utils.DumpJSON(result.ObjectEvents, os.Stdout) + fmt.Printf("(3) Status: \"%v\"\n", result.Status) + if result.Status != common.SubscriptionStatusSuccess { + failOnError(errors.New("subscription has not succeeded")) + } +}