Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,6 @@ output/
# Salesforce test binaries
/apex
test/salesforce/apex/apex
/subscribe
subscriber
./subscribe
./subscriber
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what is the reason for ignoring these and where exactly this data is stored. The directory names are good and shouldn't be excluded.

test/salesforce/subscribe/subscribe
4 changes: 4 additions & 0 deletions common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,10 @@ type SubscriptionResult struct { // this corresponds to each API call.
// provider specific events ["contact.merged"] for hubspot or ["jira_issue:restored", "jira_issue:archived"] for jira.
}

func (r SubscriptionResult) ObjectNames() []ObjectName {
return datautils.FromMap(r.ObjectEvents).Keys()
}

type SubscriptionStatus string

const (
Expand Down
8 changes: 8 additions & 0 deletions common/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,11 @@ func (p SearchParams) ValidateParams(withRequiredFields bool) error {

return nil
}

func (p SubscribeParams) ValidateParams() error {
if len(p.SubscriptionEvents) == 0 {
return ErrMissingObjects
}

return nil
}
35 changes: 35 additions & 0 deletions internal/components/clock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package components

import "time"

// Clock provides the current time, enabling deterministic testing of time-dependent logic.
// Implementations include production (real time) and test (fixed time) variants.
type Clock interface {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am strongly against adding new components that are just wrappers of native packages.

This only introduces onboarding complexity and load.

Please remove this type and just the native package.

Now() time.Time
}

// RealClock returns the current wall-clock time using time.Now().
type RealClock struct{}

func NewRealClock() *RealClock {
return new(RealClock)
}

func (RealClock) Now() time.Time {
return time.Now()
}

// FixedClock returns a fixed timestamp for reproducible tests.
// Use time.Date(...) or time.Now().Add(...) to set specific values.
type FixedClock struct {
t time.Time
}

// NewFixedClock creates a FixedClock at the given time.
func NewFixedClock(t time.Time) *FixedClock {
return &FixedClock{t: t}
}

func (c *FixedClock) Now() time.Time {
return c.t
}
7 changes: 7 additions & 0 deletions internal/datautils/pair.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,10 @@ type Pair[L, R any] struct {
Left L
Right R
}

func NewPair[L, R any](left L, right R) *Pair[L, R] {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary util func. Please remove.

return &Pair[L, R]{
Left: left,
Right: right,
}
}
12 changes: 9 additions & 3 deletions providers/microsoft/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@ import (
"github.com/amp-labs/connectors/providers"
"github.com/amp-labs/connectors/providers/microsoft/internal/batch"
"github.com/amp-labs/connectors/providers/microsoft/internal/metadata"
"github.com/amp-labs/connectors/providers/microsoft/internal/subscriber"
"github.com/amp-labs/connectors/providers/microsoft/internal/webhook"
)

const apiVersion = "v1.0"

type (
EventCollection = webhook.EventCollection
EventCollection = webhook.EventCollection
SubscribeRequest = subscriber.Input
SubscribeResponse = subscriber.Output
)

type Connector struct {
Expand All @@ -35,6 +38,7 @@ type Connector struct {
components.Writer
components.Deleter
webhook.Verifier
subscriber.Strategy

// Dependent services.
batchStrategy *batch.Strategy
Expand All @@ -59,8 +63,7 @@ func NewConnectorForProvider(provider providers.Provider, params common.Connecto
// nolint:funlen
func constructor(base *components.Connector) (*Connector, error) {
connector := &Connector{
Connector: base,
batchStrategy: batch.NewStrategy(base.JSONHTTPClient(), base.ProviderInfo()),
Connector: base,
}

connector.SchemaProvider = schema.NewOpenAPISchemaProvider(connector.ProviderContext.Module(), metadata.Schemas)
Expand Down Expand Up @@ -107,6 +110,9 @@ func constructor(base *components.Connector) (*Connector, error) {
},
)

connector.batchStrategy = batch.NewStrategy(base.JSONHTTPClient(), base.ProviderInfo())
connector.Strategy = *subscriber.NewStrategy(base.JSONHTTPClient(), base.ProviderInfo(), connector.batchStrategy)

return connector, nil
}

Expand Down
224 changes: 224 additions & 0 deletions providers/microsoft/internal/subscriber/create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
package subscriber

import (
"context"
"net/http"
"time"

"github.com/amp-labs/connectors/common"
"github.com/amp-labs/connectors/internal/components"
"github.com/amp-labs/connectors/internal/datautils"
"github.com/amp-labs/connectors/providers/microsoft/internal/batch"
"github.com/amp-labs/connectors/providers/microsoft/internal/webhook"
)

const subscriptionExpirationWindow = 5 * time.Hour

// Subscribe creates a Microsoft Graph subscription for the specified objects and events.
//
// nolint:lll
// See the [request body]( https://learn.microsoft.com/en-us/graph/api/subscription-post-subscriptions?view=graph-rest-1.0&tabs=http#request-body) for details.
// Supported resources are listed [here](https://learn.microsoft.com/en-us/graph/api/resources/change-notifications-api-overview?view=graph-rest-1.0).
func (s Strategy) Subscribe(
ctx context.Context,
params common.SubscribeParams,
) (*common.SubscriptionResult, error) {
if err := params.ValidateParams(); err != nil {
return nil, err
}

return s.createSubscription(ctx, params)
}

// createSubscription creates subscriptions using batch requests for efficiency.
// Handles rollback on partial failures to maintain consistency.
// Pre-existing subscriptions for the same resource may result in duplicates (handled only by UpdateSubscription).
func (s Strategy) createSubscription(
ctx context.Context,
params common.SubscribeParams,
) (*common.SubscriptionResult, error) {
batchParams, err := s.paramsForBatchCreateSubscriptions(params)
if err != nil {
return nil, err
}

bundledResponse := batch.Execute[SubscriptionResource](ctx, s.batchStrategy, batchParams)
state := getStateFromCreateResponse(bundledResponse)

if len(bundledResponse.Errors) != 0 {
// Some requests failed; initiate rollback.
return s.rollbackSubscriptionCreation(ctx, params, state, bundledResponse)
}

return &common.SubscriptionResult{
Result: Output{},
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need to have an empty struct type as a placeholder. Result type is any, so we don't even need to populate it as it will default to nil.

ObjectEvents: state,
Status: common.SubscriptionStatusSuccess,
}, nil
}

// paramsForBatchCreateSubscriptions prepares batch parameters for creating multiple subscriptions.
// Constructs payloads for each object-event combination using the webhook URL.
func (s Strategy) paramsForBatchCreateSubscriptions(params common.SubscribeParams) (*batch.Params, error) {
input, err := s.TypedSubscriptionRequest(params)
if err != nil {
return nil, err
}

webhookURL := input.WebhookURL

url, err := s.getSubscriptionURL()
if err != nil {
return nil, err
}

batchParams := &batch.Params{}

for objectName, events := range params.SubscriptionEvents {
requestID := batch.RequestID(objectName)
body := newPayloadCreateSubscription(objectName, events, webhookURL, s.clock)
batchParams.WithRequest(requestID, http.MethodPost, url, body, map[string]any{
"Content-Type": "application/json",
})
}

return batchParams, nil
}

// getStateFromCreateResponse extracts the subscription state from batch responses.
// Successful responses set the events state; errors result in empty ObjectEvents state.
func getStateFromCreateResponse(response *batch.Result[SubscriptionResource]) State {
result := make(State)

for objectName, envelope := range response.Responses {
// Map successful subscription to its events.
subscription := envelope.Data
result[ObjectName(objectName)] = common.ObjectEvents{
Events: subscription.ChangeType.EventTypes(),
WatchFields: nil,
WatchFieldsAll: false,
PassThroughEvents: nil,
}
}

for objectName := range response.Errors {
// Failed requests yield no subscription.
result[ObjectName(objectName)] = common.ObjectEvents{}
}

return result
}

// rollbackSubscriptionCreation deletes successfully created subscriptions on partial failure.
// It updates state to reflect remaining subscriptions after attempted rollback.
// Returns appropriate status based on rollback success.
func (s Strategy) rollbackSubscriptionCreation(
ctx context.Context,
params common.SubscribeParams,
state State,
partialCreation *batch.Result[SubscriptionResource],
) (*common.SubscriptionResult, error) {
requestsRegistry := make(datautils.Map[SubscriptionID, ObjectName])

for _, envelope := range partialCreation.Responses {
subscription := envelope.Data
requestsRegistry[subscription.ID] = ObjectName(subscription.Resource)
}

bundledResponse, err := s.removeSubscriptionsByIDs(ctx, requestsRegistry.Keys())
if err != nil {
return nil, err
}

if len(bundledResponse.Errors) == 0 {
// Full rollback succeeded.
objectNames := datautils.FromMap(params.SubscriptionEvents).Keys()

return &common.SubscriptionResult{
Result: Output{},
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nil is sufficient. or just leave it empty.

ObjectEvents: newState(objectNames),
Status: common.SubscriptionStatusFailed,
}, nil
}

// Partial rollback; track remaining subscriptions.
existingObjects := datautils.NewSet[ObjectName]()

for requestID := range bundledResponse.Errors {
// Convert request ID back to object.
id := SubscriptionID(requestID)
objectName := requestsRegistry[id]
existingObjects.AddOne(objectName)
}

allObjects := datautils.FromMap(params.SubscriptionEvents).KeySet()
removedObjects := allObjects.Subtract(existingObjects)

// Clear state for successfully removed objects.
for _, objectName := range removedObjects {
state[objectName] = common.ObjectEvents{}
}

return &common.SubscriptionResult{
Result: Output{},
ObjectEvents: state,
Status: common.SubscriptionStatusFailedToRollback,
}, nil
}

// SubscriptionResource represents a Microsoft Graph subscription.
// See [properties](https://learn.microsoft.com/en-us/graph/api/resources/subscription?view=graph-rest-1.0#properties).
//
// Custom usage: clientState field is repurposed to store ObjectName for identification.
// Ignored fields:
//
// encryptionCertificateId, encryptionCertificate, lifecycleNotificationUrl,
// notificationQueryOptions, notificationUrlAppId.
type SubscriptionResource struct {
// ID is the unique subscription identifier returned by POST/GET/PATCH requests.
ID SubscriptionID `json:"id,omitempty"`
// ChangeType specifies the event types (created, updated, deleted) to subscribe to.
ChangeType webhook.ChangeType `json:"changeType,omitempty"`
// ObjectName uses the clientState field to store the connector's object name for identification.
ObjectName ObjectName `json:"clientState,omitempty"`
// WebhookURL is the notification URL where Microsoft Graph sends change notifications.
WebhookURL string `json:"notificationUrl,omitempty"`
// Resource identifies the Microsoft Graph resource being monitored (e.g., "me/messages").
Resource string `json:"resource,omitempty"`
// ExpirationDateTime is the UTC datetime when the subscription expires and auto-deletes.
// Must respect per-resource maximum lifetimes (ranges from 5 hours to 30 days).
// https://learn.microsoft.com/en-us/graph/api/resources/subscription?view=graph-rest-1.0#subscription-lifetime
ExpirationDateTime time.Time `json:"expirationDateTime"`
// IncludeResourceData is set to false. This is to avoid encryption requirements.
// Resource data is fetched separately via ReadByIds, therefore it is not needed.
IncludeResourceData bool `json:"includeResourceData,omitempty"`
}

type SubscriptionID string

// newPayloadCreateSubscription constructs a subscription payload for creation.
// Uses clientState to store objectName for identification.
// Expiration is set to 5 hours to safely fit common maximums (e.g., presence: 1h excluded; others 3-30 days).
//
// nolint:lll
// See [lifetime limits](https://learn.microsoft.com/en-us/graph/api/resources/subscription?view=graph-rest-1.0#subscription-lifetime)
func newPayloadCreateSubscription(
objectName ObjectName,
events common.ObjectEvents,
webhookURL string,
clock components.Clock,
) SubscriptionResource {
resource := objectName.String()

fiveHoursFromNow := clock.Now().Add(subscriptionExpirationWindow)
body := SubscriptionResource{
ChangeType: webhook.NewChangeType(events.Events),
ObjectName: objectName,
WebhookURL: webhookURL,
Resource: resource,
ExpirationDateTime: fiveHoursFromNow,
IncludeResourceData: false,
}

return body
}
Loading
Loading