Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions internal/datautils/pair.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,3 @@ type Pair[L, R any] struct {
Left L
Right R
}

func NewPair[L, R any](left L, right R) *Pair[L, R] {
return &Pair[L, R]{
Left: left,
Right: right,
}
}
6 changes: 3 additions & 3 deletions providers/livestorm/jsonapi.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package livestorm

import (
"maps"

"github.com/amp-labs/connectors/internal/jsonquery"
"github.com/spyzhov/ajson"
)
Expand Down Expand Up @@ -70,9 +72,7 @@ func flattenJSONAPIResourceForFields(n *ajson.Node) (map[string]any, error) {
return nil, err
}

for k, v := range attrMap {
merged[k] = v
}
maps.Copy(merged, attrMap)

return merged, nil
}
17 changes: 17 additions & 0 deletions providers/microsoft/internal/subscriber/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,20 @@ func newPayloadCreateSubscription(

return body
}

// Update is only relevant for extending the subscription expiration.
// > Updates a subscription expiration time for renewal and/or updates the notificationUrl for delivery.
// https://learn.microsoft.com/en-us/graph/api/resources/subscription?view=graph-rest-1.0#methods
func newPayloadRefreshSubscription(
clock components.Clock,
webhookURL string,
) SubscriptionResource {
fiveHoursFromNow := clock.Now().Add(subscriptionExpirationWindow)
body := SubscriptionResource{
WebhookURL: webhookURL,
ExpirationDateTime: fiveHoursFromNow,
IncludeResourceData: false,
}

return body
}
97 changes: 97 additions & 0 deletions providers/microsoft/internal/subscriber/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package subscriber

import (
"context"
"sort"

"github.com/amp-labs/connectors/common"
"github.com/amp-labs/connectors/internal/datautils"
"github.com/amp-labs/connectors/providers/microsoft/internal/webhook"
)

type (
Expand All @@ -20,6 +22,22 @@ type (
// with State, and the connector returns the same type reflecting what actually
// happened (which may differ on failure or rollback).
State map[ObjectName]common.ObjectEvents

// ReconciliationPlan partitions operations needed to align RemoteSubscriptions with State.
// Groups creates/refreshes/deletes for efficient batching against Microsoft Graph API.
ReconciliationPlan struct {
// Create: objects missing subscriptions -> create new ones with desired ObjectEvents.
Create map[common.ObjectName]common.ObjectEvents
// Refresh: existing subscriptions nearing expiry -> renew before expirationTime.
Refresh map[common.ObjectName]SubscriptionID
// DeleteSubscriptions: objects removed from State -> delete all their remote subscriptions.
// Contains exact current remote state where each object should be removed.
DeleteSubscriptions RemoteSubscriptions
// Extra holds SubscriptionID surplus beyond desired state (Microsoft Graph specific).
// Occurs when single resource has multiple subscriptions; excess should be cleaned up for consistency.
// These are standalone deletes (not object-grouped like DeleteSubscriptions).
Extra []SubscriptionID
}
)

func newState(objectNames []ObjectName) State {
Expand All @@ -31,6 +49,85 @@ func newState(objectNames []ObjectName) State {
return events
}

// Equals reports if the subscription configuration matches between both states for the given object.
func (s State) Equals(other State, objectName ObjectName) bool {
return webhook.NewChangeType(s[objectName].Events) == webhook.NewChangeType(other[objectName].Events)
}

// ReconcileTo partitions objects into three disjoint sets by comparing desired
// and actual (remote) state:
//
// - objectsToRemove: present remotely but not desired → DELETE all subscriptions for the object
// - objectsToCreate: desired but not present remotely → CREATE subscriptions for the object
// - objectsToUpdate: present in both → UPDATE existing subscriptions
//
// For Microsoft Graph, intersecting objects are always updated.
// Subscription.ExpirationDateTime must be continuously renewed,
// so there is no no-op path—every existing subscription requires an update.
//
// TODO The subscription may disappear under the hood as Microsoft auto cleans. Need to create a handler for this case.
func (r RemoteSubscriptions) ReconcileTo(state State) ReconciliationPlan {
remoteObjects := datautils.NewSetFromList(RemoteSubsType(r).GetBuckets())
desiredObjects := datautils.FromMap(state).KeySet()

objectsToCreate := desiredObjects.Subtract(remoteObjects)
objectsToRemove := remoteObjects.Subtract(desiredObjects)
objectsToUpdate := remoteObjects.Intersection(desiredObjects)

plan := ReconciliationPlan{
Create: make(map[common.ObjectName]common.ObjectEvents),
Refresh: make(map[common.ObjectName]SubscriptionID),
DeleteSubscriptions: r.subset(objectsToRemove),
Extra: make([]SubscriptionID, 0),
}

for _, name := range objectsToCreate {
plan.Create[name] = state[name]
}

remoteState := r.toState()
for _, name := range objectsToUpdate {
subscriptions := r[name]

if len(subscriptions) == 0 {
// Impossible. Remote state by definition must have at least one subscription for an object.
continue
}

if len(subscriptions) > 1 {
sort.Slice(subscriptions, func(i, j int) bool {
return subscriptions[i].ExpirationDateTime.After(subscriptions[j].ExpirationDateTime)
})
}

// Keep the first
subscription := subscriptions[0]

// Remove the rest
for _, extra := range subscriptions[1:] {
// There are multiple subscriptions associated with this object.
// Keep only one of them, others must be removed.
// This could happen due to user manually altering subscriptions
// or any invalid state the connector has put the provider in.
// This is highly unlikely but such possibility is left open.
// Too many subscriptions for a single object. Remove the excess.
plan.Extra = append(plan.Extra, extra.ID)
}

// Replace subscription with a more desired version.
if remoteState.Equals(state, name) {
plan.Refresh[name] = subscription.ID
} else {
// Create a new subscription which is different from the original.
plan.Create[name] = state[name]
// Mark old subscription for a cleanup.
plan.Extra = append(plan.Extra, subscription.ID)
}
}

return plan
}

func (r RemoteSubscriptions) toState() State {
result := make(State)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"id": "7dac694c-f614-4496-878e-fa625196149d",
"method": "DELETE",
"url": "/subscriptions/7dac694c-f614-4496-878e-fa625196149d",
"headers": {
"Content-Type": "application/json"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"responses": [
{
"id": "me/events",
"status": 200,
"headers": {
"Cache-Control": "no-cache",
"OData-Version": "4.0",
"Content-Type": "application/json;odata.metadata=minimal;odata.streaming=true;IEEE754Compatible=false;charset=utf-8"
},
"body": {
"@odata.context": "https://graph.microsoft.com/v1.0/$metadata#subscriptions/$entity",
"id": "63b01115-ba3f-4db6-a1ef-793797ec340a",
"resource": "me/events",
"applicationId": "39d7eec8-2032-4dd3-959f-d92d8a4e2ad3",
"changeType": "created,deleted",
"clientState": "me/events",
"notificationUrl": "https://651c-46-150-81-5.ngrok-free.app/",
"notificationQueryOptions": null,
"lifecycleNotificationUrl": null,
"expirationDateTime": "2026-05-01T00:09:21.6065208Z",
"creatorId": "ae87552f-48fc-4dec-9322-65040bf9fdfd",
"includeResourceData": null,
"latestSupportedTlsVersion": "v1_2",
"encryptionCertificate": null,
"encryptionCertificateId": null,
"notificationUrlAppId": null
}
}
]
}
Loading
Loading