diff --git a/.gitignore b/.gitignore index 19f529a2e..1b7fb407a 100644 --- a/.gitignore +++ b/.gitignore @@ -147,6 +147,6 @@ output/ # Salesforce test binaries /apex test/salesforce/apex/apex -/subscribe -subscriber +./subscribe +./subscriber test/salesforce/subscribe/subscribe diff --git a/common/types.go b/common/types.go index 2e182b5f4..07bcff4d9 100644 --- a/common/types.go +++ b/common/types.go @@ -914,6 +914,10 @@ type SubscriptionResult struct { // this corresponds to each API call. // provider specific events ["contact.merged"] for hubspot or ["jira_issue:restored", "jira_issue:archived"] for jira. } +func (r SubscriptionResult) ObjectNames() []ObjectName { + return datautils.FromMap(r.ObjectEvents).Keys() +} + type SubscriptionStatus string const ( diff --git a/common/validation.go b/common/validation.go index ce2ede1d5..c7d50c039 100644 --- a/common/validation.go +++ b/common/validation.go @@ -142,3 +142,11 @@ func (p SearchParams) ValidateParams(withRequiredFields bool) error { return nil } + +func (p SubscribeParams) ValidateParams() error { + if len(p.SubscriptionEvents) == 0 { + return ErrMissingObjects + } + + return nil +} diff --git a/internal/components/clock.go b/internal/components/clock.go new file mode 100644 index 000000000..eaeccda8b --- /dev/null +++ b/internal/components/clock.go @@ -0,0 +1,35 @@ +package components + +import "time" + +// Clock provides the current time, enabling deterministic testing of time-dependent logic. +// Implementations include production (real time) and test (fixed time) variants. +type Clock interface { + Now() time.Time +} + +// RealClock returns the current wall-clock time using time.Now(). +type RealClock struct{} + +func NewRealClock() *RealClock { + return new(RealClock) +} + +func (RealClock) Now() time.Time { + return time.Now() +} + +// FixedClock returns a fixed timestamp for reproducible tests. +// Use time.Date(...) or time.Now().Add(...) to set specific values. +type FixedClock struct { + t time.Time +} + +// NewFixedClock creates a FixedClock at the given time. +func NewFixedClock(t time.Time) *FixedClock { + return &FixedClock{t: t} +} + +func (c *FixedClock) Now() time.Time { + return c.t +} diff --git a/internal/datautils/pair.go b/internal/datautils/pair.go index c5ed6821b..a512ca35e 100644 --- a/internal/datautils/pair.go +++ b/internal/datautils/pair.go @@ -4,3 +4,10 @@ type Pair[L, R any] struct { Left L Right R } + +func NewPair[L, R any](left L, right R) *Pair[L, R] { + return &Pair[L, R]{ + Left: left, + Right: right, + } +} diff --git a/providers/microsoft/connector.go b/providers/microsoft/connector.go index 6ef46b068..3fe19950e 100644 --- a/providers/microsoft/connector.go +++ b/providers/microsoft/connector.go @@ -13,13 +13,16 @@ import ( "github.com/amp-labs/connectors/providers" "github.com/amp-labs/connectors/providers/microsoft/internal/batch" "github.com/amp-labs/connectors/providers/microsoft/internal/metadata" + "github.com/amp-labs/connectors/providers/microsoft/internal/subscriber" "github.com/amp-labs/connectors/providers/microsoft/internal/webhook" ) const apiVersion = "v1.0" type ( - EventCollection = webhook.EventCollection + EventCollection = webhook.EventCollection + SubscribeRequest = subscriber.Input + SubscribeResponse = subscriber.Output ) type Connector struct { @@ -35,6 +38,7 @@ type Connector struct { components.Writer components.Deleter webhook.Verifier + subscriber.Strategy // Dependent services. batchStrategy *batch.Strategy @@ -59,8 +63,7 @@ func NewConnectorForProvider(provider providers.Provider, params common.Connecto // nolint:funlen func constructor(base *components.Connector) (*Connector, error) { connector := &Connector{ - Connector: base, - batchStrategy: batch.NewStrategy(base.JSONHTTPClient(), base.ProviderInfo()), + Connector: base, } connector.SchemaProvider = schema.NewOpenAPISchemaProvider(connector.ProviderContext.Module(), metadata.Schemas) @@ -107,6 +110,9 @@ func constructor(base *components.Connector) (*Connector, error) { }, ) + connector.batchStrategy = batch.NewStrategy(base.JSONHTTPClient(), base.ProviderInfo()) + connector.Strategy = *subscriber.NewStrategy(base.JSONHTTPClient(), base.ProviderInfo(), connector.batchStrategy) + return connector, nil } diff --git a/providers/microsoft/internal/subscriber/create.go b/providers/microsoft/internal/subscriber/create.go new file mode 100644 index 000000000..16fbcf47a --- /dev/null +++ b/providers/microsoft/internal/subscriber/create.go @@ -0,0 +1,224 @@ +package subscriber + +import ( + "context" + "net/http" + "time" + + "github.com/amp-labs/connectors/common" + "github.com/amp-labs/connectors/internal/components" + "github.com/amp-labs/connectors/internal/datautils" + "github.com/amp-labs/connectors/providers/microsoft/internal/batch" + "github.com/amp-labs/connectors/providers/microsoft/internal/webhook" +) + +const subscriptionExpirationWindow = 5 * time.Hour + +// Subscribe creates a Microsoft Graph subscription for the specified objects and events. +// +// nolint:lll +// See the [request body]( https://learn.microsoft.com/en-us/graph/api/subscription-post-subscriptions?view=graph-rest-1.0&tabs=http#request-body) for details. +// Supported resources are listed [here](https://learn.microsoft.com/en-us/graph/api/resources/change-notifications-api-overview?view=graph-rest-1.0). +func (s Strategy) Subscribe( + ctx context.Context, + params common.SubscribeParams, +) (*common.SubscriptionResult, error) { + if err := params.ValidateParams(); err != nil { + return nil, err + } + + return s.createSubscription(ctx, params) +} + +// createSubscription creates subscriptions using batch requests for efficiency. +// Handles rollback on partial failures to maintain consistency. +// Pre-existing subscriptions for the same resource may result in duplicates (handled only by UpdateSubscription). +func (s Strategy) createSubscription( + ctx context.Context, + params common.SubscribeParams, +) (*common.SubscriptionResult, error) { + batchParams, err := s.paramsForBatchCreateSubscriptions(params) + if err != nil { + return nil, err + } + + bundledResponse := batch.Execute[SubscriptionResource](ctx, s.batchStrategy, batchParams) + state := getStateFromCreateResponse(bundledResponse) + + if len(bundledResponse.Errors) != 0 { + // Some requests failed; initiate rollback. + return s.rollbackSubscriptionCreation(ctx, params, state, bundledResponse) + } + + return &common.SubscriptionResult{ + Result: Output{}, + ObjectEvents: state, + Status: common.SubscriptionStatusSuccess, + }, nil +} + +// paramsForBatchCreateSubscriptions prepares batch parameters for creating multiple subscriptions. +// Constructs payloads for each object-event combination using the webhook URL. +func (s Strategy) paramsForBatchCreateSubscriptions(params common.SubscribeParams) (*batch.Params, error) { + input, err := s.TypedSubscriptionRequest(params) + if err != nil { + return nil, err + } + + webhookURL := input.WebhookURL + + url, err := s.getSubscriptionURL() + if err != nil { + return nil, err + } + + batchParams := &batch.Params{} + + for objectName, events := range params.SubscriptionEvents { + requestID := batch.RequestID(objectName) + body := newPayloadCreateSubscription(objectName, events, webhookURL, s.clock) + batchParams.WithRequest(requestID, http.MethodPost, url, body, map[string]any{ + "Content-Type": "application/json", + }) + } + + return batchParams, nil +} + +// getStateFromCreateResponse extracts the subscription state from batch responses. +// Successful responses set the events state; errors result in empty ObjectEvents state. +func getStateFromCreateResponse(response *batch.Result[SubscriptionResource]) State { + result := make(State) + + for objectName, envelope := range response.Responses { + // Map successful subscription to its events. + subscription := envelope.Data + result[ObjectName(objectName)] = common.ObjectEvents{ + Events: subscription.ChangeType.EventTypes(), + WatchFields: nil, + WatchFieldsAll: false, + PassThroughEvents: nil, + } + } + + for objectName := range response.Errors { + // Failed requests yield no subscription. + result[ObjectName(objectName)] = common.ObjectEvents{} + } + + return result +} + +// rollbackSubscriptionCreation deletes successfully created subscriptions on partial failure. +// It updates state to reflect remaining subscriptions after attempted rollback. +// Returns appropriate status based on rollback success. +func (s Strategy) rollbackSubscriptionCreation( + ctx context.Context, + params common.SubscribeParams, + state State, + partialCreation *batch.Result[SubscriptionResource], +) (*common.SubscriptionResult, error) { + requestsRegistry := make(datautils.Map[SubscriptionID, ObjectName]) + + for _, envelope := range partialCreation.Responses { + subscription := envelope.Data + requestsRegistry[subscription.ID] = ObjectName(subscription.Resource) + } + + bundledResponse, err := s.removeSubscriptionsByIDs(ctx, requestsRegistry.Keys()) + if err != nil { + return nil, err + } + + if len(bundledResponse.Errors) == 0 { + // Full rollback succeeded. + objectNames := datautils.FromMap(params.SubscriptionEvents).Keys() + + return &common.SubscriptionResult{ + Result: Output{}, + ObjectEvents: newState(objectNames), + Status: common.SubscriptionStatusFailed, + }, nil + } + + // Partial rollback; track remaining subscriptions. + existingObjects := datautils.NewSet[ObjectName]() + + for requestID := range bundledResponse.Errors { + // Convert request ID back to object. + id := SubscriptionID(requestID) + objectName := requestsRegistry[id] + existingObjects.AddOne(objectName) + } + + allObjects := datautils.FromMap(params.SubscriptionEvents).KeySet() + removedObjects := allObjects.Subtract(existingObjects) + + // Clear state for successfully removed objects. + for _, objectName := range removedObjects { + state[objectName] = common.ObjectEvents{} + } + + return &common.SubscriptionResult{ + Result: Output{}, + ObjectEvents: state, + Status: common.SubscriptionStatusFailedToRollback, + }, nil +} + +// SubscriptionResource represents a Microsoft Graph subscription. +// See [properties](https://learn.microsoft.com/en-us/graph/api/resources/subscription?view=graph-rest-1.0#properties). +// +// Custom usage: clientState field is repurposed to store ObjectName for identification. +// Ignored fields: +// +// encryptionCertificateId, encryptionCertificate, lifecycleNotificationUrl, +// notificationQueryOptions, notificationUrlAppId. +type SubscriptionResource struct { + // ID is the unique subscription identifier returned by POST/GET/PATCH requests. + ID SubscriptionID `json:"id,omitempty"` + // ChangeType specifies the event types (created, updated, deleted) to subscribe to. + ChangeType webhook.ChangeType `json:"changeType,omitempty"` + // ObjectName uses the clientState field to store the connector's object name for identification. + ObjectName ObjectName `json:"clientState,omitempty"` + // WebhookURL is the notification URL where Microsoft Graph sends change notifications. + WebhookURL string `json:"notificationUrl,omitempty"` + // Resource identifies the Microsoft Graph resource being monitored (e.g., "me/messages"). + Resource string `json:"resource,omitempty"` + // ExpirationDateTime is the UTC datetime when the subscription expires and auto-deletes. + // Must respect per-resource maximum lifetimes (ranges from 5 hours to 30 days). + // https://learn.microsoft.com/en-us/graph/api/resources/subscription?view=graph-rest-1.0#subscription-lifetime + ExpirationDateTime time.Time `json:"expirationDateTime"` + // IncludeResourceData is set to false. This is to avoid encryption requirements. + // Resource data is fetched separately via ReadByIds, therefore it is not needed. + IncludeResourceData bool `json:"includeResourceData,omitempty"` +} + +type SubscriptionID string + +// newPayloadCreateSubscription constructs a subscription payload for creation. +// Uses clientState to store objectName for identification. +// Expiration is set to 5 hours to safely fit common maximums (e.g., presence: 1h excluded; others 3-30 days). +// +// nolint:lll +// See [lifetime limits](https://learn.microsoft.com/en-us/graph/api/resources/subscription?view=graph-rest-1.0#subscription-lifetime) +func newPayloadCreateSubscription( + objectName ObjectName, + events common.ObjectEvents, + webhookURL string, + clock components.Clock, +) SubscriptionResource { + resource := objectName.String() + + fiveHoursFromNow := clock.Now().Add(subscriptionExpirationWindow) + body := SubscriptionResource{ + ChangeType: webhook.NewChangeType(events.Events), + ObjectName: objectName, + WebhookURL: webhookURL, + Resource: resource, + ExpirationDateTime: fiveHoursFromNow, + IncludeResourceData: false, + } + + return body +} diff --git a/providers/microsoft/internal/subscriber/create_test.go b/providers/microsoft/internal/subscriber/create_test.go new file mode 100644 index 000000000..bd12f18ae --- /dev/null +++ b/providers/microsoft/internal/subscriber/create_test.go @@ -0,0 +1,226 @@ +package subscriber + +import ( + "net/http" + "testing" + + "github.com/amp-labs/connectors/common" + "github.com/amp-labs/connectors/internal/components" + "github.com/amp-labs/connectors/test/utils/mockutils/mockcond" + "github.com/amp-labs/connectors/test/utils/mockutils/mockserver" + "github.com/amp-labs/connectors/test/utils/testroutines" + "github.com/amp-labs/connectors/test/utils/testutils" +) + +func TestCreate(t *testing.T) { // nolint:funlen,cyclop + t.Parallel() + + responseSubscribeToMessages := testutils.DataFromFile(t, "create/messages.json") + responseSubscribeToMessagesAndUnknown := testutils.DataFromFile(t, "create/partial-success.json") + responseRollbackDelete := testutils.DataFromFile(t, "create/rollback-success-delete.json") + errorRollbackDelete := testutils.DataFromFile(t, "create/rollback-fail-delete.json") + + eventTypesCUD := []common.SubscriptionEventType{ + common.SubscriptionEventTypeCreate, + common.SubscriptionEventTypeUpdate, + common.SubscriptionEventTypeDelete, + } + + payloadSubscribeToMessages := []byte(`{ + "id": "me/messages", + "method": "POST", + "url": "/subscriptions", + "body": { + "changeType": "created,updated,deleted", + "notificationUrl": "https://test.com/webhook", + "resource": "me/messages", + "clientState": "me/messages", + "expirationDateTime": "2026-03-04T10:00:00Z" + }, + "headers": {"Content-Type": "application/json"} + }`) + + payloadSubscribeToMovies := []byte(`{ + "id": "movies", + "method": "POST", + "url": "/subscriptions", + "body": { + "changeType": "created", + "notificationUrl": "https://test.com/webhook", + "resource": "movies", + "clientState": "movies", + "expirationDateTime": "2026-03-04T10:00:00Z" + }, + "headers": {"Content-Type": "application/json"} + }`) + payloadRemoveSubscription := []byte(`{ + "id": "38ca43fa-4602-43ef-a865-aca8a3ddc1ce", + "method": "DELETE", + "url": "/subscriptions/38ca43fa-4602-43ef-a865-aca8a3ddc1ce", + "headers": {"Content-Type": "application/json"} + }`) + + tests := []testroutines.CreateSubscription{ + { + Name: "Missing object for subscription", + Input: common.SubscribeParams{ + Request: Input{WebhookURL: "https://test.com/webhook"}, + }, + Server: mockserver.Dummy(), + ExpectedErrs: []error{common.ErrMissingObjects}, + }, + { + Name: "Invalid subscription request type", + Input: common.SubscribeParams{ + Request: "invalid", + SubscriptionEvents: State{ + "butterflies": {}, + }, + }, + Server: mockserver.Dummy(), + ExpectedErrs: []error{components.ErrInvalidSubscriptionRequestType}, + }, + { + Name: "Subscription to Outlook messages and unknown object with rollback", + Input: common.SubscribeParams{ + Request: Input{ + WebhookURL: "https://test.com/webhook", + }, + SubscriptionEvents: State{ + "me/messages": {Events: eventTypesCUD}, + "movies": {Events: []common.SubscriptionEventType{common.SubscriptionEventTypeCreate}}, + }, + }, + Server: mockserver.Switch{ + Setup: mockserver.ContentJSON(), + Cases: mockserver.Cases{{ + If: mockcond.And{ + mockcond.MethodPOST(), + mockcond.Path("/v1.0/$batch"), + // Connector may send request payloads in different order. + payloadBatchRequests(payloadSubscribeToMessages, payloadSubscribeToMovies), + }, + Then: mockserver.Response(http.StatusOK, responseSubscribeToMessagesAndUnknown), + }, { + If: mockcond.And{ + mockcond.MethodPOST(), + mockcond.Path("/v1.0/$batch"), + payloadBatchRequests(payloadRemoveSubscription), + }, + Then: mockserver.Response(http.StatusNoContent, responseRollbackDelete), + }}, + }.Server(), + Comparator: testroutines.ComparatorSubscriptionWithResult(resultComparator), + Expected: &common.SubscriptionResult{ + ObjectEvents: State{ + "me/messages": {}, + "movies": {}, + }, + Status: "failed", + }, + ExpectedErrs: nil, + }, + { + Name: "Subscription to Outlook messages and unknown object with failing rollback", + Input: common.SubscribeParams{ + Request: Input{ + WebhookURL: "https://test.com/webhook", + }, + SubscriptionEvents: State{ + "me/messages": {Events: eventTypesCUD}, + "movies": {Events: []common.SubscriptionEventType{common.SubscriptionEventTypeCreate}}, + }, + }, + Server: mockserver.Switch{ + Setup: mockserver.ContentJSON(), + Cases: mockserver.Cases{{ + If: mockcond.And{ + mockcond.MethodPOST(), + mockcond.Path("/v1.0/$batch"), + // Connector may send request payloads in different order. + payloadBatchRequests(payloadSubscribeToMessages, payloadSubscribeToMovies), + }, + Then: mockserver.Response(http.StatusOK, responseSubscribeToMessagesAndUnknown), + }, { + If: mockcond.And{ + mockcond.MethodPOST(), + mockcond.Path("/v1.0/$batch"), + payloadBatchRequests(payloadRemoveSubscription), + }, + Then: mockserver.Response(http.StatusOK, errorRollbackDelete), + }}, + }.Server(), + Comparator: testroutines.ComparatorSubscriptionWithResult(resultComparator), + Expected: &common.SubscriptionResult{ + ObjectEvents: State{ + "me/messages": {Events: eventTypesCUD}, // was created and couldn't clean up + "movies": {}, // was never created + }, + Status: "failed_to_rollback", + }, + ExpectedErrs: nil, + }, + { + Name: "Subscription to Outlook messages with success", + Input: common.SubscribeParams{ + Request: Input{ + WebhookURL: "https://test.com/webhook", + }, + SubscriptionEvents: State{ + "me/messages": {Events: eventTypesCUD}, + }, + }, + Server: mockserver.Conditional{ + Setup: mockserver.ContentJSON(), + If: mockcond.And{ + mockcond.MethodPOST(), + mockcond.Path("/v1.0/$batch"), + payloadBatchRequests(payloadSubscribeToMessages), + }, + Then: mockserver.Response(http.StatusCreated, responseSubscribeToMessages), + }.Server(), + Comparator: testroutines.ComparatorSubscriptionWithResult(resultComparator), + Expected: &common.SubscriptionResult{ + ObjectEvents: State{ + "me/messages": {Events: eventTypesCUD}, + }, + Status: "success", + }, + ExpectedErrs: nil, + }, + } + + for _, tt := range tests { // nolint:dupl + // nolint:varnamelen + t.Run(tt.Name, func(t *testing.T) { + t.Parallel() + + tt.Run(t, func() (components.SubscriptionCreator, error) { + return constructTestStrategy(tt.Server.URL) + }) + }) + } +} + +func resultComparator(expectedResult, actualResult any) *testutils.CompareResult { + result := testutils.NewCompareResult() + + // No-op. + + return result +} + +func payloadBatchRequests(requests ...[]byte) mockcond.Condition { + values := make([]string, len(requests)) + for index, req := range requests { + values[index] = string(req) + } + + return mockcond.PermuteJSONBody(`{ "requests": [%requests]}`, + mockcond.PermuteSlot{ + Name: "requests", + NoQuotes: true, + Values: values, + }, + ) +} diff --git a/providers/microsoft/internal/subscriber/delete.go b/providers/microsoft/internal/subscriber/delete.go new file mode 100644 index 000000000..ad11f8b68 --- /dev/null +++ b/providers/microsoft/internal/subscriber/delete.go @@ -0,0 +1,41 @@ +package subscriber + +import ( + "context" + "net/http" + + "github.com/amp-labs/connectors/providers/microsoft/internal/batch" +) + +func (s Strategy) removeSubscriptionsByIDs( + ctx context.Context, identifiers []SubscriptionID, +) (*batch.Result[any], error) { + batchParams, err := s.paramsForBatchRemoveSubscriptionsByIDs(identifiers) + if err != nil { + return nil, err + } + + bundledResponse := batch.Execute[any](ctx, s.batchStrategy, batchParams) + + return bundledResponse, nil +} + +func (s Strategy) paramsForBatchRemoveSubscriptionsByIDs(identifiers []SubscriptionID) (*batch.Params, error) { + batchParams := &batch.Params{} + + for _, identifier := range identifiers { + url, err := s.getSubscriptionURL() + if err != nil { + return nil, err + } + + url.AddPath(string(identifier)) + + // RequestID is Subscription identifier. + batchParams.WithRequest(batch.RequestID(identifier), http.MethodDelete, url, nil, map[string]any{ + "Content-Type": "application/json", + }) + } + + return batchParams, nil +} diff --git a/providers/microsoft/internal/subscriber/state.go b/providers/microsoft/internal/subscriber/state.go new file mode 100644 index 000000000..f36bd723a --- /dev/null +++ b/providers/microsoft/internal/subscriber/state.go @@ -0,0 +1,24 @@ +package subscriber + +import ( + "github.com/amp-labs/connectors/common" +) + +type ( + // State describes the set of object events that the connector should + // try to achieve (desired) and that are also reported back as the observed outcome. + // + // It is part of the connector's input/output contract: the caller expresses intent + // with State, and the connector returns the same type reflecting what actually + // happened (which may differ on failure or rollback). + State map[ObjectName]common.ObjectEvents +) + +func newState(objectNames []ObjectName) State { + events := make(State) + for _, objectName := range objectNames { + events[objectName] = common.ObjectEvents{} + } + + return events +} diff --git a/providers/microsoft/internal/subscriber/strategy.go b/providers/microsoft/internal/subscriber/strategy.go new file mode 100644 index 000000000..6d4409391 --- /dev/null +++ b/providers/microsoft/internal/subscriber/strategy.go @@ -0,0 +1,91 @@ +package subscriber + +import ( + "time" + + "github.com/amp-labs/connectors/common" + "github.com/amp-labs/connectors/common/urlbuilder" + "github.com/amp-labs/connectors/internal/components" + "github.com/amp-labs/connectors/providers" + "github.com/amp-labs/connectors/providers/microsoft/internal/batch" + "github.com/amp-labs/connectors/test/utils/mockutils" +) + +const apiVersion = "v1.0" + +// Strategy implements Microsoft subscription lifecycle operations. +// +// It embeds SubscriptionInputOutput to provide type-safe handling of +// subscription input/output while conforming to the non-generic connectors.SubscribeConnector interface. +// +// The strategy relies on: +// - JSONHTTPClient for API communication +// - batch.Strategy for batched operations +// - Clock for time.Now. (useful in tests for deterministic outcomes) +type Strategy struct { + components.SubscriptionInputOutput[Input, Output] + + client *common.JSONHTTPClient + providerInfo *providers.ProviderInfo + + // Dependent services. + batchStrategy *batch.Strategy + clock components.Clock +} + +// NewStrategy constructs a Strategy with required dependencies. +func NewStrategy( + client *common.JSONHTTPClient, providerInfo *providers.ProviderInfo, batchStrategy *batch.Strategy, +) *Strategy { + return &Strategy{ + client: client, + providerInfo: providerInfo, + batchStrategy: batchStrategy, + clock: components.NewRealClock(), + } +} + +// Input defines the subscription request payload for Microsoft Graph. +type Input struct { + // WebhookURL is the target URL where messages will be sent. + WebhookURL string `json:"notificationUrl"` +} + +// Output represents the subscription result payload. +// +// Currently empty, as Microsoft does not return structured data +// that needs to be captured for this connector. +type Output struct{} + +// ObjectName represents a Microsoft resource type used for subscriptions. +type ObjectName = common.ObjectName + +// getSubscriptionURL builds the Microsoft Graph endpoint for subscription operations. +// +// Docs: +// https://learn.microsoft.com/en-us/graph/change-notifications-delivery-webhooks?tabs=http#subscription-request +func (s Strategy) getSubscriptionURL() (*urlbuilder.URL, error) { + return urlbuilder.New(s.providerInfo.BaseURL, apiVersion, "subscriptions") +} + +// constructTestStrategy creates a Strategy configured for unit testing. +// +// It uses a mock HTTP client and overrides the base URL to point to a test server. +// A fixed clock is injected to ensure deterministic behavior in tests. +func constructTestStrategy(serverURL string) (*Strategy, error) { + transport, err := components.NewTransport(providers.Microsoft, common.ConnectorParams{ + AuthenticatedClient: mockutils.NewClient(), + }) + if err != nil { + return nil, err + } + + transport.SetUnitTestMockServerBaseURL(serverURL) + + client := transport.JSONHTTPClient() + info := transport.ProviderInfo() + strategy := NewStrategy(client, info, batch.NewStrategy(client, info)) + strategy.clock = components.NewFixedClock(time.Date(2026, 3, 4, 5, 0, 0, 0, time.UTC)) + + return strategy, nil +} diff --git a/providers/microsoft/internal/subscriber/test/create/messages.json b/providers/microsoft/internal/subscriber/test/create/messages.json new file mode 100644 index 000000000..db6bd2075 --- /dev/null +++ b/providers/microsoft/internal/subscriber/test/create/messages.json @@ -0,0 +1,32 @@ +{ + "responses": [ + { + "id": "me/messages", + "status": 201, + "headers": { + "Location": "https://subscriptionstore.windows.net/1.0/NA/subscriptions('5917db81-22d5-426d-af91-e285927592b7')", + "Cache-Control": "no-cache", + "OData-Version": "4.0", + "Content-Type": "application/json;odata.metadata=minimal;odata.streaming=true;IEEE754Compatible=false;charset=utf-8" + }, + "body": { + "@odata.context": "https://graph.microsoft.com/v1.0/$metadata#subscriptions/$entity", + "id": "5917db81-22d5-426d-af91-e285927592b7", + "resource": "me/messages", + "applicationId": "39d7eec8-2032-4dd3-959f-d92d8a4e2ad3", + "changeType": "created,updated,deleted", + "clientState": "me/messages", + "notificationUrl": "https://test.com/webhook", + "notificationQueryOptions": null, + "lifecycleNotificationUrl": null, + "expirationDateTime": "2026-04-17T17:00:57Z", + "creatorId": "ae87552f-48fc-4dec-9322-65040bf9fdfd", + "includeResourceData": null, + "latestSupportedTlsVersion": "v1_2", + "encryptionCertificate": null, + "encryptionCertificateId": null, + "notificationUrlAppId": null + } + } + ] +} diff --git a/providers/microsoft/internal/subscriber/test/create/partial-success.json b/providers/microsoft/internal/subscriber/test/create/partial-success.json new file mode 100644 index 000000000..2edd3bca5 --- /dev/null +++ b/providers/microsoft/internal/subscriber/test/create/partial-success.json @@ -0,0 +1,50 @@ +{ + "responses": [ + { + "id": "movies", + "status": 400, + "headers": { + "Content-Type": "application/json" + }, + "body": { + "error": { + "code": "BadRequest", + "message": "Resource not found for the segment 'movies'.", + "innerError": { + "date": "2026-04-23T18:08:47", + "request-id": "2863d3ab-64cd-42be-89c6-a7e58ae5545a", + "client-request-id": "2863d3ab-64cd-42be-89c6-a7e58ae5545a" + } + } + } + }, + { + "id": "me/messages", + "status": 201, + "headers": { + "Location": "https://subscriptionstore.windows.net/1.0/NA/subscriptions('38ca43fa-4602-43ef-a865-aca8a3ddc1ce')", + "Cache-Control": "no-cache", + "OData-Version": "4.0", + "Content-Type": "application/json;odata.metadata=minimal;odata.streaming=true;IEEE754Compatible=false;charset=utf-8" + }, + "body": { + "@odata.context": "https://graph.microsoft.com/v1.0/$metadata#subscriptions/$entity", + "id": "38ca43fa-4602-43ef-a865-aca8a3ddc1ce", + "resource": "me/messages", + "applicationId": "39d7eec8-2032-4dd3-959f-d92d8a4e2ad3", + "changeType": "created,updated,deleted", + "clientState": "me/messages", + "notificationUrl": "https://4a4e-46-150-81-5.ngrok-free.app", + "notificationQueryOptions": null, + "lifecycleNotificationUrl": null, + "expirationDateTime": "2026-04-23T19:08:46Z", + "creatorId": "ae87552f-48fc-4dec-9322-65040bf9fdfd", + "includeResourceData": null, + "latestSupportedTlsVersion": "v1_2", + "encryptionCertificate": null, + "encryptionCertificateId": null, + "notificationUrlAppId": null + } + } + ] +} diff --git a/providers/microsoft/internal/subscriber/test/create/rollback-fail-delete.json b/providers/microsoft/internal/subscriber/test/create/rollback-fail-delete.json new file mode 100644 index 000000000..401bc4bb8 --- /dev/null +++ b/providers/microsoft/internal/subscriber/test/create/rollback-fail-delete.json @@ -0,0 +1,22 @@ +{ + "responses": [ + { + "id": "38ca43fa-4602-43ef-a865-aca8a3ddc1ce", + "status": 400, + "headers": { + "Content-Type": "application/json" + }, + "body": { + "error": { + "code": "BadRequest", + "message": "Resource not found for the segment 'path'.", + "innerError": { + "date": "2026-04-24T00:48:24", + "request-id": "a7c222d4-ae41-4d62-8887-58cb858ebad9", + "client-request-id": "a7c222d4-ae41-4d62-8887-58cb858ebad9" + } + } + } + } + ] +} diff --git a/providers/microsoft/internal/subscriber/test/create/rollback-success-delete.json b/providers/microsoft/internal/subscriber/test/create/rollback-success-delete.json new file mode 100644 index 000000000..5b815fedb --- /dev/null +++ b/providers/microsoft/internal/subscriber/test/create/rollback-success-delete.json @@ -0,0 +1,12 @@ +{ + "responses": [ + { + "id": "38ca43fa-4602-43ef-a865-aca8a3ddc1ce", + "status": 204, + "headers": { + "Cache-Control": "no-cache" + }, + "body": null + } + ] +} diff --git a/test/microsoft/subscription/consumer/main.go b/test/microsoft/subscription/consumer/main.go new file mode 100644 index 000000000..79a9715b4 --- /dev/null +++ b/test/microsoft/subscription/consumer/main.go @@ -0,0 +1,21 @@ +package main + +import ( + "context" + "os/signal" + "syscall" + + connTest "github.com/amp-labs/connectors/test/microsoft" + "github.com/amp-labs/connectors/test/microsoft/subscription" + "github.com/amp-labs/connectors/test/utils/testscenario" +) + +func main() { + // Handle Ctrl-C gracefully. + ctx, done := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer done() + + conn := connTest.GetMicrosoftGraphConnector(ctx) + + testscenario.RunWebhookConsumer(ctx, conn, subscription.NewWebhookRouter(), nil) +} diff --git a/test/microsoft/subscription/outlook-events/main.go b/test/microsoft/subscription/outlook-events/main.go new file mode 100644 index 000000000..f7176832a --- /dev/null +++ b/test/microsoft/subscription/outlook-events/main.go @@ -0,0 +1,79 @@ +package main + +import ( + "context" + "os/signal" + "syscall" + + "github.com/amp-labs/connectors/common" + "github.com/amp-labs/connectors/internal/datautils" + "github.com/amp-labs/connectors/providers/microsoft" + connTest "github.com/amp-labs/connectors/test/microsoft" + "github.com/amp-labs/connectors/test/microsoft/subscription" + "github.com/amp-labs/connectors/test/utils/testscenario" + "github.com/brianvoe/gofakeit/v6" +) + +func main() { + // Handle Ctrl-C gracefully. + ctx, done := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer done() + + conn := connTest.GetMicrosoftGraphConnector(ctx) + + subject := gofakeit.Name() + updatedSubject := gofakeit.Name() + + testscenario.ValidateSubscribeReceiveEvents(ctx, conn, + testscenario.SubscribeReceiveEventsSuite{ + SubscribeParamBuilder: func(webhookURL string) *common.SubscribeParams { + return &common.SubscribeParams{ + Request: microsoft.SubscribeRequest{ + WebhookURL: webhookURL, + }, + SubscriptionEvents: map[common.ObjectName]common.ObjectEvents{ + "me/events": { + Events: []common.SubscriptionEventType{ + common.SubscriptionEventTypeCreate, + common.SubscriptionEventTypeDelete, + }, + }, + }, + } + }, + ExpectedWebhookCalls: 2, + Operations: []testscenario.ConnectorOperation{ + /* Create Outlook calendar event */ { + ObjectName: "me/events", + Method: testscenario.ConnectorMethodCreate, + Payload: payload{Subject: subject}, + }, + /* Update Outlook calendar event */ { + ObjectName: "me/events", + Method: testscenario.ConnectorMethodUpdate, // We are not listening to this event. + Payload: payload{Subject: updatedSubject}, + SearchProcedure: testscenario.SearchProcedure{ + ReadFields: datautils.NewSet("id", "subject"), + RecordIdentifierKey: "id", + SearchBy: testscenario.Property{Key: "subject", Value: subject}, + }, + }, + /* Remove Outlook calendar event */ { + ObjectName: "me/events", + Method: testscenario.ConnectorMethodDelete, + SearchProcedure: testscenario.SearchProcedure{ + ReadFields: datautils.NewSet("id", "subject"), + RecordIdentifierKey: "id", + SearchBy: testscenario.Property{Key: "subject", Value: updatedSubject}, + }, + }, + }, + WebhookRouter: subscription.NewWebhookRouter(), + VerificationParams: nil, + }, + ) +} + +type payload struct { + Subject string `json:"subject"` +} diff --git a/test/microsoft/subscription/outlook-messages/main.go b/test/microsoft/subscription/outlook-messages/main.go new file mode 100644 index 000000000..2cf476b91 --- /dev/null +++ b/test/microsoft/subscription/outlook-messages/main.go @@ -0,0 +1,116 @@ +package main + +import ( + "context" + "os/signal" + "syscall" + + "github.com/amp-labs/connectors/common" + "github.com/amp-labs/connectors/internal/datautils" + "github.com/amp-labs/connectors/providers/microsoft" + connTest "github.com/amp-labs/connectors/test/microsoft" + "github.com/amp-labs/connectors/test/microsoft/subscription" + "github.com/amp-labs/connectors/test/utils/testscenario" + "github.com/brianvoe/gofakeit/v6" +) + +func main() { + // Handle Ctrl-C gracefully. + ctx, done := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer done() + + conn := connTest.GetMicrosoftGraphConnector(ctx) + + subject := gofakeit.Name() + bodyData := gofakeit.Name() + from := gofakeit.Username() + to := gofakeit.Username() + updatedSubject := gofakeit.Name() + updatedBodyData := gofakeit.Name() + + testscenario.ValidateSubscribeReceiveEvents(ctx, conn, + testscenario.SubscribeReceiveEventsSuite{ + SubscribeParamBuilder: func(webhookURL string) *common.SubscribeParams { + return &common.SubscribeParams{ + Request: microsoft.SubscribeRequest{ + WebhookURL: webhookURL, + }, + SubscriptionEvents: map[common.ObjectName]common.ObjectEvents{ + "me/messages": { + Events: []common.SubscriptionEventType{ + common.SubscriptionEventTypeCreate, + common.SubscriptionEventTypeUpdate, + common.SubscriptionEventTypeDelete, + }, + }, + //"butterflies": { + // Events: []common.SubscriptionEventType{ + // common.SubscriptionEventTypeCreate, + // }, + //}, + }, + } + }, + ExpectedWebhookCalls: 3, + Operations: []testscenario.ConnectorOperation{ + /* Create outlook message */ { + ObjectName: "me/messages", + Method: testscenario.ConnectorMethodCreate, + Payload: payload{ + Subject: subject, + Body: body{Content: bodyData, ContentType: TextContentType}, + From: &recipient{EmailAddress: address{Address: from + "@test.com", Name: from}}, + ToRecipients: []recipient{{EmailAddress: address{Address: to + "@test.com", Name: to}}}, + }, + }, + /* Update outlook message */ { + ObjectName: "me/messages", + Method: testscenario.ConnectorMethodUpdate, + Payload: payload{ + Subject: updatedSubject, + Body: body{Content: updatedBodyData, ContentType: TextContentType}, + }, + SearchProcedure: testscenario.SearchProcedure{ + ReadFields: datautils.NewSet("id", "subject"), + RecordIdentifierKey: "id", + SearchBy: testscenario.Property{Key: "subject", Value: subject}, + }, + }, + /* Remove outlook message */ { + ObjectName: "me/messages", + Method: testscenario.ConnectorMethodDelete, + SearchProcedure: testscenario.SearchProcedure{ + ReadFields: datautils.NewSet("id", "subject"), + RecordIdentifierKey: "id", + SearchBy: testscenario.Property{Key: "subject", Value: updatedSubject}, + }, + }, + }, + WebhookRouter: subscription.NewWebhookRouter(), + VerificationParams: nil, + }, + ) +} + +type payload struct { + Subject string `json:"subject,omitempty"` + Body body `json:"body,omitempty"` + From *recipient `json:"from,omitempty"` + ToRecipients []recipient `json:"toRecipients,omitempty"` +} + +type body struct { + Content string `json:"content"` + ContentType string `json:"contentType"` +} + +type recipient struct { + EmailAddress address `json:"emailAddress"` +} + +type address struct { + Address string `json:"address"` + Name string `json:"name"` +} + +const TextContentType = "text" diff --git a/test/microsoft/subscription/webhook.go b/test/microsoft/subscription/webhook.go new file mode 100644 index 000000000..b24823f02 --- /dev/null +++ b/test/microsoft/subscription/webhook.go @@ -0,0 +1,46 @@ +package subscription + +import ( + "net/http" + + "github.com/amp-labs/connectors/common/urlbuilder" + "github.com/amp-labs/connectors/test/utils/testscenario" +) + +func NewWebhookRouter() testscenario.WebhookRouter { + return testscenario.WebhookRouter{ + Routes: []testscenario.Route{subscriptionConfirmation}, + } +} + +// Default handling. +// https://learn.microsoft.com/en-us/graph/change-notifications-with-resource-data?tabs=csharp#decrypting-resource-data-from-change-notifications +var subscriptionConfirmation = testscenario.Route{ + // This route is executed when Microsoft is verifying that webhook is rechable. + Left: func(request *http.Request) bool { + url, err := urlbuilder.FromRawURL(request.URL) + if err != nil { + return false + } + + return url.HasQueryParam("validationToken") + }, + // https://learn.microsoft.com/en-us/graph/change-notifications-delivery-webhooks?tabs=http#notificationurl-validation + Right: func(writer http.ResponseWriter, request *http.Request) { + url, err := urlbuilder.FromRawURL(request.URL) + if err != nil { + writer.WriteHeader(http.StatusInternalServerError) + return + } + + validationToken, ok := url.GetFirstQueryParam("validationToken") + if !ok { + writer.WriteHeader(http.StatusInternalServerError) + return + } + + writer.WriteHeader(http.StatusOK) + writer.Header().Set("Content-Type", "text/plain") + _, _ = writer.Write([]byte(validationToken)) + }, +}