From 2769c7d58b040a64fb6718898938067ccac4acd0 Mon Sep 17 00:00:00 2001 From: Cobalt0s Date: Wed, 29 Apr 2026 20:34:48 +0300 Subject: [PATCH] [ENG-3815] feat(microsoft): Webhook Message format --- common/types.go | 3 + providers/hubspot/subscriptionEvent_test.go | 212 +++++++++++------- providers/microsoft/connector.go | 5 + .../microsoft/internal/webhook/changeType.go | 54 +++++ .../microsoft/internal/webhook/message.go | 124 ++++++++++ .../internal/webhook/message_test.go | 57 +++++ .../webhook/test/event-message-created.json | 18 ++ .../webhook/test/event-message-deleted.json | 18 ++ .../webhook/test/event-message-updated.json | 18 ++ 9 files changed, 425 insertions(+), 84 deletions(-) create mode 100644 providers/microsoft/internal/webhook/changeType.go create mode 100644 providers/microsoft/internal/webhook/message.go create mode 100644 providers/microsoft/internal/webhook/message_test.go create mode 100644 providers/microsoft/internal/webhook/test/event-message-created.json create mode 100644 providers/microsoft/internal/webhook/test/event-message-deleted.json create mode 100644 providers/microsoft/internal/webhook/test/event-message-updated.json diff --git a/common/types.go b/common/types.go index 1a030385d4..2e182b5f47 100644 --- a/common/types.go +++ b/common/types.go @@ -145,6 +145,9 @@ var ( // ErrProxyNotApplicable indicates that a proxy cannot be used in the given context. ErrProxyNotApplicable = errors.New("proxy is not applicable in this context") + + // ErrSubscriptionEventList is returned by CollapsedSubscriptionEvent.SubscriptionEventList. + ErrSubscriptionEventList = errors.New("failed creating []common.SubscriptionEvent") ) // ReadParams defines how we are reading data from a SaaS API. diff --git a/providers/hubspot/subscriptionEvent_test.go b/providers/hubspot/subscriptionEvent_test.go index 400e1dd1b7..c8e03dfa57 100644 --- a/providers/hubspot/subscriptionEvent_test.go +++ b/providers/hubspot/subscriptionEvent_test.go @@ -3,93 +3,137 @@ package hubspot import ( "testing" - "github.com/amp-labs/connectors/common" - "gotest.tools/v3/assert" + "github.com/amp-labs/connectors/test/utils/testroutines" + "github.com/amp-labs/connectors/test/utils/testutils" ) -func TestExtractObjectNameFromSubscriptionEvent(t *testing.T) { +func TestSubscriptionEvent(t *testing.T) { t.Parallel() - validEvent := SubscriptionEvent{ - "subscriptionType": "contact.creation", + for _, tt := range []testroutines.SubscriptionEventTestCase{ + { + Name: "Unsupported event", + Input: SubscriptionEvent{ + "subscriptionType": "someObject.creation", + }, + Expected: []testroutines.SubscriptionEventExpected{{ + Data: testroutines.SubscriptionEventExpectedData{ + RawEventName: "someObject.creation", + EventType: "create", + }, + Err: testroutines.SubscriptionEventExpectedErr{ + EventType: nil, + RawEventName: nil, + ObjectName: testutils.StringError("subscription is not supported for the object 'someObject'"), + Workspace: testutils.StringError("key not found"), + RecordId: testutils.StringError("key not found"), + EventTimeStampNano: testutils.StringError("key not found"), + }, + }}, + }, + { + Name: "Empty object name of the event", + Input: SubscriptionEvent{ + "subscriptionType": "", + }, + Expected: []testroutines.SubscriptionEventExpected{{ + Data: testroutines.SubscriptionEventExpectedData{ + EventType: "other", + }, + Err: testroutines.SubscriptionEventExpectedErr{ + EventType: testutils.StringError("unexpected subscription event type: ''"), + RawEventName: nil, + ObjectName: testutils.StringError("subscription is not supported for the object ''"), + Workspace: testutils.StringError("key not found"), + RecordId: testutils.StringError("key not found"), + EventTimeStampNano: testutils.StringError("key not found"), + }, + }}, + }, + { + Name: "Hubspot object type id is mapped to human readable object name", + Input: SubscriptionEvent{ + "objectTypeId": "0-1", + "subscriptionType": "importantContacts.creation", + }, + Expected: []testroutines.SubscriptionEventExpected{{ + Data: testroutines.SubscriptionEventExpectedData{ + EventType: "create", + RawEventName: "importantContacts.creation", + ObjectName: "contact", + }, + Err: testroutines.SubscriptionEventExpectedErr{ + EventType: nil, + RawEventName: nil, + ObjectName: nil, + Workspace: testutils.StringError("key not found"), + RecordId: testutils.StringError("key not found"), + EventTimeStampNano: testutils.StringError("key not found"), + }, + }}, + }, + { + Name: "Contact creation event", + Input: SubscriptionEvent{ + "subscriptionType": "contact.creation", + "objectId": 123, + "occurredAt": 1625097600000, + "portalId": 101, + }, + Expected: []testroutines.SubscriptionEventExpected{{ + Data: testroutines.SubscriptionEventExpectedData{ + EventType: "create", + RawEventName: "contact.creation", + ObjectName: "contact", + RecordId: "123", + Workspace: "101", + EventTimeStampNano: 1625097600000000000, + }, + }}, + }, + { + Name: "Contact property change event", + Input: SubscriptionEvent{ + "subscriptionType": "contact.propertyChange", + "objectId": 456, + "propertyName": "email", + "portalId": 101, + "occurredAt": 1625097600000, + }, + Expected: []testroutines.SubscriptionEventExpected{{ + Data: testroutines.SubscriptionEventExpectedData{ + EventType: "update", + RawEventName: "contact.propertyChange", + ObjectName: "contact", + RecordId: "456", + Workspace: "101", + UpdatedFields: []string{"email"}, + EventTimeStampNano: 1625097600000000000, + }, + }}, + }, + { + Name: "Contact deletion event", + Input: SubscriptionEvent{ + "subscriptionType": "contact.deletion", + "objectId": 789, + "portalId": 101, + "occurredAt": 1625097600000, + }, + Expected: []testroutines.SubscriptionEventExpected{{ + Data: testroutines.SubscriptionEventExpectedData{ + EventType: "delete", + RawEventName: "contact.deletion", + ObjectName: "contact", + Workspace: "101", + RecordId: "789", + EventTimeStampNano: 1625097600000000000, + }, + }}, + }, + } { + t.Run(tt.Name, func(t *testing.T) { + tt.Run(t) + }) } - - objectName, err := validEvent.ObjectName() - if err != nil { - t.Fatalf("error extracting object name from subscription event: %s", err) - } - - assert.Equal(t, objectName, "contact", "object name should be parsedCorrectly") - - unsupportedEvent := SubscriptionEvent{ - "subscriptionType": "someObject.creation", - } - - _, err = unsupportedEvent.ObjectName() - assert.ErrorContains(t, err, "subscription is not supported for the object 'someObject'") - - emptyObjectEvent := &SubscriptionEvent{ - "subscriptionType": "", - } - - _, err = emptyObjectEvent.ObjectName() - assert.ErrorContains(t, err, "subscription is not supported for the object ''") - - withObjectTypeId := SubscriptionEvent{ - "objectTypeId": "0-1", - } - - objectName, err = withObjectTypeId.ObjectName() - assert.NilError(t, err, "error should be nil") - assert.Equal(t, objectName, "contact", "object name should be parsed correctly") -} - -//nolint:funlen -func TestExtractEventTypeFromSubscriptionEvent(t *testing.T) { - t.Parallel() - - createEvent := SubscriptionEvent{ - "subscriptionType": "contact.creation", - } - - evtTypeCreate, err := createEvent.EventType() - if err != nil { - t.Fatalf("error extracting object name from subscription event: %s", err) - } - - assert.Equal(t, evtTypeCreate, common.SubscriptionEventTypeCreate, "event type should be parsed Correctly") - - deleteMessage := SubscriptionEvent{ - "subscriptionType": "contact.deletion", - } - - evtTypeDelete, err := deleteMessage.EventType() - if err != nil { - t.Fatalf("error extracting eventType from subscription event: %s", err) - } - - assert.Equal(t, evtTypeDelete, common.SubscriptionEventTypeDelete, "event type should be parsed correctly") - - updateMessage := SubscriptionEvent{ - "subscriptionType": "contact.propertyChange", - } - - evtTypeUpdate, err := updateMessage.EventType() - if err != nil { - t.Fatalf("error extracting eventType from subscription event: %s", err) - } - - assert.Equal(t, evtTypeUpdate, common.SubscriptionEventTypeUpdate, "event type should be parsed correctly") - - emptyObjectEvent := SubscriptionEvent{ - "subscriptionType": "", - } - - _, err = emptyObjectEvent.EventType() - assert.ErrorIs( - t, - err, - errUnexpectedSubscriptionEventType, - "error should be of type errUnexpectedSubscriptionEventType", - ) } diff --git a/providers/microsoft/connector.go b/providers/microsoft/connector.go index 2ecd9a669a..6d51367198 100644 --- a/providers/microsoft/connector.go +++ b/providers/microsoft/connector.go @@ -13,10 +13,15 @@ import ( "github.com/amp-labs/connectors/providers" "github.com/amp-labs/connectors/providers/microsoft/internal/batch" "github.com/amp-labs/connectors/providers/microsoft/internal/metadata" + "github.com/amp-labs/connectors/providers/microsoft/internal/webhook" ) const apiVersion = "v1.0" +type ( + EventCollection = webhook.EventCollection +) + type Connector struct { // Basic connector *components.Connector diff --git a/providers/microsoft/internal/webhook/changeType.go b/providers/microsoft/internal/webhook/changeType.go new file mode 100644 index 0000000000..6216ff2649 --- /dev/null +++ b/providers/microsoft/internal/webhook/changeType.go @@ -0,0 +1,54 @@ +package webhook + +import ( + "strings" + + "github.com/amp-labs/connectors/common" + "github.com/amp-labs/connectors/internal/datautils" +) + +// ChangeType can be created/updated/deleted or a combination of them. +type ChangeType string + +const ( + ChangeTypeCreated = "created" + ChangeTypeUpdated = "updated" + ChangeTypeDeleted = "deleted" +) + +func NewChangeType(eventTypes []common.SubscriptionEventType) ChangeType { + result := make([]string, 0, 3) // nolint:mnd + requestedEvents := datautils.NewSetFromList(eventTypes) + + for _, item := range []datautils.Pair[common.SubscriptionEventType, string]{ + {Left: common.SubscriptionEventTypeCreate, Right: ChangeTypeCreated}, + {Left: common.SubscriptionEventTypeUpdate, Right: ChangeTypeUpdated}, + {Left: common.SubscriptionEventTypeDelete, Right: ChangeTypeDeleted}, + } { + if requestedEvents.Has(item.Left) { + result = append(result, item.Right) + } + } + + return ChangeType(strings.Join(result, ",")) +} + +func (c ChangeType) EventTypes() []common.SubscriptionEventType { + parts := strings.Split(string(c), ",") + result := make([]common.SubscriptionEventType, len(parts)) + + for index, part := range parts { + switch part { + case ChangeTypeCreated: + result[index] = common.SubscriptionEventTypeCreate + case ChangeTypeUpdated: + result[index] = common.SubscriptionEventTypeUpdate + case ChangeTypeDeleted: + result[index] = common.SubscriptionEventTypeDelete + default: + result[index] = common.SubscriptionEventTypeOther + } + } + + return result +} diff --git a/providers/microsoft/internal/webhook/message.go b/providers/microsoft/internal/webhook/message.go new file mode 100644 index 0000000000..fe690aaa95 --- /dev/null +++ b/providers/microsoft/internal/webhook/message.go @@ -0,0 +1,124 @@ +package webhook + +import ( + "errors" + "fmt" + "maps" + + "github.com/amp-labs/connectors/common" +) + +// EventCollection is a change notification sent by Microsoft Graph to the webhook. +// https://learn.microsoft.com/en-us/graph/change-notifications-delivery-webhooks?tabs=http#change-notification-example +type EventCollection map[string]any + +// Event is a singular notification message within EventCollection. +type Event map[string]any + +var ( + _ common.CollapsedSubscriptionEvent = EventCollection{} + _ common.SubscriptionEvent = Event{} + _ common.SubscriptionUpdateEvent = Event{} + + ErrMissingField = errors.New("missing field") +) + +func (c EventCollection) SubscriptionEventList() ([]common.SubscriptionEvent, error) { + value, ok := c["value"] + if !ok { + return nil, fmt.Errorf("%w: missing key 'value'", common.ErrSubscriptionEventList) + } + + list, ok := value.([]any) + if !ok { + return nil, fmt.Errorf("%w: 'value' is not []any type", common.ErrSubscriptionEventList) + } + + events := make([]common.SubscriptionEvent, len(list)) + for index, item := range list { + if json, ok := item.(map[string]any); !ok { + return nil, fmt.Errorf( + "%w: 'value[%v]' is not map[string]any", common.ErrSubscriptionEventList, index, + ) + } else { + events[index] = Event(json) + } + } + + return events, nil +} + +func (c EventCollection) RawMap() (map[string]any, error) { + return maps.Clone(c), nil +} + +func (e Event) EventType() (common.SubscriptionEventType, error) { + changeTypeStr, err := e.RawEventName() + if err != nil { + return "", err + } + + changeType := ChangeType(changeTypeStr) + + list := changeType.EventTypes() + if len(list) == 0 { + // There should be just one type in the event response. + // However, when creating a subscription multiple types can be supplied, + // hence the list nature of changeType property. + return common.SubscriptionEventTypeOther, nil + } + + return list[0], nil +} + +func (e Event) RawEventName() (string, error) { + changeTypeStr, ok := e["changeType"].(string) + if !ok { + return "", fmt.Errorf("%w: 'changeType'", ErrMissingField) + } + + return changeTypeStr, nil +} + +func (e Event) ObjectName() (string, error) { + objectName, ok := e["clientState"].(string) // TODO before converting must first get the data first. + if !ok { + return "", fmt.Errorf("%w: 'clientState'", ErrMissingField) + } + + return objectName, nil +} + +func (e Event) Workspace() (string, error) { + return "", nil +} + +func (e Event) RecordId() (string, error) { + resourceData, ok := e["resourceData"].(map[string]any) + if !ok { + return "", fmt.Errorf("%w: 'resourceData'", ErrMissingField) + } + + identifier, ok := resourceData["id"].(string) + if !ok { + return "", fmt.Errorf("%w: 'id'", ErrMissingField) + } + + return identifier, nil +} + +func (e Event) EventTimeStampNano() (int64, error) { + return 0, nil +} + +func (e Event) RawMap() (map[string]any, error) { + return maps.Clone(e), nil +} + +func (e Event) PreLoadData(data *common.SubscriptionEventPreLoadData) error { + return nil +} + +func (e Event) UpdatedFields() ([]string, error) { + return nil, nil +} diff --git a/providers/microsoft/internal/webhook/message_test.go b/providers/microsoft/internal/webhook/message_test.go new file mode 100644 index 0000000000..b484a8fe3a --- /dev/null +++ b/providers/microsoft/internal/webhook/message_test.go @@ -0,0 +1,57 @@ +package webhook + +import ( + "testing" + + "github.com/amp-labs/connectors/test/utils/testroutines" + "github.com/amp-labs/connectors/test/utils/testutils" +) + +func TestEvent(t *testing.T) { + responseMessageCreated := testutils.DataFromFileAs[EventCollection](t, "event-message-created.json") + responseMessageUpdated := testutils.DataFromFileAs[EventCollection](t, "event-message-updated.json") + responseMessageDeleted := testutils.DataFromFileAs[EventCollection](t, "event-message-deleted.json") + + for _, tt := range []testroutines.SubscriptionEventTestCase{ + { + Name: "Created event", + Input: responseMessageCreated, + Expected: []testroutines.SubscriptionEventExpected{{ + Data: testroutines.SubscriptionEventExpectedData{ + EventType: "create", + RawEventName: "created", + ObjectName: "me/messages", + RecordId: "message_123", + }, + }}, + }, + { + Name: "Updated event", + Input: responseMessageUpdated, + Expected: []testroutines.SubscriptionEventExpected{{ + Data: testroutines.SubscriptionEventExpectedData{ + EventType: "update", + RawEventName: "updated", + ObjectName: "me/messages", + RecordId: "message_654", + }, + }}, + }, + { + Name: "Deleted event", + Input: responseMessageDeleted, + Expected: []testroutines.SubscriptionEventExpected{{ + Data: testroutines.SubscriptionEventExpectedData{ + EventType: "delete", + RawEventName: "deleted", + ObjectName: "me/messages", + RecordId: "message_798", + }, + }}, + }, + } { + t.Run(tt.Name, func(t *testing.T) { + tt.Run(t) + }) + } +} diff --git a/providers/microsoft/internal/webhook/test/event-message-created.json b/providers/microsoft/internal/webhook/test/event-message-created.json new file mode 100644 index 0000000000..a8a40bc4c8 --- /dev/null +++ b/providers/microsoft/internal/webhook/test/event-message-created.json @@ -0,0 +1,18 @@ +{ + "value": [ + { + "changeType": "created", + "clientState": "me/messages", + "resource": "Users/ae87552f-48fc-4dec-9322-65040bf9fdfd/Messages/message_123", + "resourceData": { + "@odata.etag": "W/\"CQAAABYAAAB8hj1Rtd60SKTngNs3if9RAAEsuETJ\"", + "@odata.id": "Users/ae87552f-48fc-4dec-9322-65040bf9fdfd/Messages/message_123", + "@odata.type": "#Microsoft.Graph.Message", + "id": "message_123" + }, + "subscriptionExpirationDateTime": "2026-04-29T18:23:14+00:00", + "subscriptionId": "3f3a6969-5d52-4c0f-bf4d-a6bb16952289", + "tenantId": "5c6241d0-74cc-48a2-b667-3eb0d738af72" + } + ] +} diff --git a/providers/microsoft/internal/webhook/test/event-message-deleted.json b/providers/microsoft/internal/webhook/test/event-message-deleted.json new file mode 100644 index 0000000000..15dd783f6f --- /dev/null +++ b/providers/microsoft/internal/webhook/test/event-message-deleted.json @@ -0,0 +1,18 @@ +{ + "value": [ + { + "changeType": "deleted", + "clientState": "me/messages", + "resource": "Users/ae87552f-48fc-4dec-9322-65040bf9fdfd/Messages/message_798", + "resourceData": { + "@odata.etag": "W/\"CQAAAA==\"", + "@odata.id": "Users/ae87552f-48fc-4dec-9322-65040bf9fdfd/Messages/message_798", + "@odata.type": "#Microsoft.Graph.Message", + "id": "message_798" + }, + "subscriptionExpirationDateTime": "2026-04-29T18:23:14+00:00", + "subscriptionId": "3f3a6969-5d52-4c0f-bf4d-a6bb16952289", + "tenantId": "5c6241d0-74cc-48a2-b667-3eb0d738af72" + } + ] +} diff --git a/providers/microsoft/internal/webhook/test/event-message-updated.json b/providers/microsoft/internal/webhook/test/event-message-updated.json new file mode 100644 index 0000000000..7e9cfe00ce --- /dev/null +++ b/providers/microsoft/internal/webhook/test/event-message-updated.json @@ -0,0 +1,18 @@ +{ + "value": [ + { + "changeType": "updated", + "clientState": "me/messages", + "resource": "Users/ae87552f-48fc-4dec-9322-65040bf9fdfd/Messages/message_654", + "resourceData": { + "@odata.etag": "W/\"CQAAABYAAAB8hj1Rtd60SKTngNs3if9RAAEsuETL\"", + "@odata.id": "Users/ae87552f-48fc-4dec-9322-65040bf9fdfd/Messages/message_654", + "@odata.type": "#Microsoft.Graph.Message", + "id": "message_654" + }, + "subscriptionExpirationDateTime": "2026-04-29T18:23:14+00:00", + "subscriptionId": "3f3a6969-5d52-4c0f-bf4d-a6bb16952289", + "tenantId": "5c6241d0-74cc-48a2-b667-3eb0d738af72" + } + ] +}