diff --git a/providers/hubspot/internal/search/search.go b/providers/hubspot/internal/search/search.go index c1f6a4e2ad..d9b9558a1c 100644 --- a/providers/hubspot/internal/search/search.go +++ b/providers/hubspot/internal/search/search.go @@ -36,14 +36,22 @@ func (s Strategy) Search(ctx context.Context, params *common.SearchParams) (*com ) } - // Search has two execution paths: + // Search has three execution paths: // - ObjectAPI: for core CRM objects supported by the canonical ObjectAPI endpoint. // - Non-ObjectAPI: for CRM objects not supported by ObjectAPI, which use separate endpoints (e.g., Lists). - if core.CRMObjectsWithoutPropertiesAPISupport.Has(params.ObjectName) { + // - The rest: no search is supported. + switch { + case core.CRMObjectsWithoutPropertiesAPISupport.Has(params.ObjectName): return s.searchViaNonstandardSearchAPI(ctx, params) + case core.MarketingObjects.Has(params.ObjectName): + fallthrough + case core.CommunicationObjects.Has(params.ObjectName): + fallthrough + case core.MiscellaneousObjects.Has(params.ObjectName): + return nil, common.ErrObjectNotSupported + default: + return s.searchViaObjectAPI(ctx, params) } - - return s.searchViaObjectAPI(ctx, params) } // checkSearchResultsLimit checks if the NextPage token exceeds HubSpot's search results limit. diff --git a/providers/hubspot/read.go b/providers/hubspot/read.go index 28c6b644a9..2db5939ab6 100644 --- a/providers/hubspot/read.go +++ b/providers/hubspot/read.go @@ -26,6 +26,9 @@ func (c *Connector) Read(ctx context.Context, params common.ReadParams) (*common return nil, err } + // + // Read using regular GET endpoints. + // switch { case core.CRMObjectsWithoutPropertiesAPISupport.Has(params.ObjectName): // Object is part of CRM namespace but outside ObjectAPI. @@ -50,27 +53,32 @@ func (c *Connector) Read(ctx context.Context, params common.ReadParams) (*common } } +// CRM objects can be read using two ways. +// - If there are Since/Until parameters it will use: +// https://api.hubapi.com/crm/objects/2026-03/{objectType}/search +// - Otherwise, the Objects API endpoint is used: +// https://api.hubapi.com/crm/objects/2026-03/{objectType} func (c *Connector) readCRMObjectsAPI( - ctx context.Context, config common.ReadParams, + ctx context.Context, params common.ReadParams, ) (*common.ReadResult, error) { //nolint:funlen // If filtering is required, then we have to use the search endpoint. // The Search endpoint has a 10K record limit. In case this limit is reached, // the sorting allows the caller to continue in another call by offsetting // until the ID of the last record that was successfully fetched. filters := make(Filters, 0) - if !config.Since.IsZero() { - filters = append(filters, BuildLastModifiedFilterGroup(&config)) + if !params.Since.IsZero() { + filters = append(filters, BuildLastModifiedFilterGroup(¶ms)) } - if !config.Until.IsZero() { - filters = append(filters, BuildUntilTimestampFilterGroup(&config)) + if !params.Until.IsZero() { + filters = append(filters, BuildUntilTimestampFilterGroup(¶ms)) } - filters = append(filters, BuildBuilderFilters(config.BuilderFilter)...) + filters = append(filters, BuildBuilderFilters(params.BuilderFilter)...) if len(filters) != 0 { searchParams := SearchParams{ - ObjectName: config.ObjectName, + ObjectName: params.ObjectName, FilterGroups: []FilterGroup{{ Filters: filters, // Add more filter groups to OR them together @@ -78,15 +86,15 @@ func (c *Connector) readCRMObjectsAPI( SortBy: []SortBy{ BuildSort(ObjectFieldHsObjectId, SortDirectionAsc), }, - NextPage: config.NextPage, - Fields: config.Fields, - AssociatedObjects: config.AssociatedObjects, + NextPage: params.NextPage, + Fields: params.Fields, + AssociatedObjects: params.AssociatedObjects, } return c.ReadUsingSearchAPI(ctx, searchParams) } - url, err := c.buildReadURL(config) + url, err := c.buildCRMReadURL(params) if err != nil { return nil, err } @@ -101,12 +109,12 @@ func (c *Connector) readCRMObjectsAPI( core.GetRecords, core.GetNextRecordsURL, associations.CreateDataMarshallerWithAssociations( - ctx, c.associationsFiller, config.ObjectName, config.AssociatedObjects), - config.Fields, + ctx, c.associationsFiller, params.ObjectName, params.AssociatedObjects), + params.Fields, ) } -func (c *Connector) buildReadURL(params common.ReadParams) (string, error) { +func (c *Connector) buildCRMReadURL(params common.ReadParams) (string, error) { if len(params.NextPage) != 0 { // If NextPage is set, then we're reading the next page of results. // All that matters is the NextPage URL, the fields are ignored. diff --git a/providers/hubspot/search.go b/providers/hubspot/search.go index 5b37b80237..5e6c43d302 100644 --- a/providers/hubspot/search.go +++ b/providers/hubspot/search.go @@ -34,39 +34,62 @@ func (c *Connector) Search(ctx context.Context, params *common.SearchParams) (*c // This endpoint paginates using paging.next.after which is to be used as an offset. // Archived results do not appear in search results. // Read more @ https://developers.hubspot.com/docs/api/crm/search -func (c *Connector) ReadUsingSearchAPI(ctx context.Context, config SearchParams) (*common.ReadResult, error) { +func (c *Connector) ReadUsingSearchAPI( // nolint:cyclop + ctx context.Context, params SearchParams, +) (*common.ReadResult, error) { ctx = logging.With(ctx, "connector", "hubspot") - if err := config.ValidateParams(); err != nil { + if err := params.ValidateParams(); err != nil { return nil, err } // Check if the NextPage token exceeds the search results limit. // HubSpot's search API returns a 400 error if you try to paginate beyond 10,000 records. // By detecting this proactively, we can return a specific error that callers can handle. - if err := checkSearchResultsLimit(config.NextPage); err != nil { + if err := checkSearchResultsLimit(params.NextPage); err != nil { return nil, fmt.Errorf( "%w: requested offset %s exceeds limit %d", common.ErrResultsLimitExceeded, - config.NextPage, + params.NextPage, searchResultsLimit, ) } - if core.CRMObjectsWithoutPropertiesAPISupport.Has(config.ObjectName) { + params = params.applyTimestampFilters() + + // + // Read using POST endpoints intended for Search. + // + + switch { + case core.CRMObjectsWithoutPropertiesAPISupport.Has(params.ObjectName): // Objects outside ObjectAPI have different endpoint while both are part of CRM module. // For instance such object is Lists. return c.searchCRM(ctx, searchCRMParams{ - SearchParams: config, + SearchParams: params, }) + case core.MarketingObjects.Has(params.ObjectName): + // Search for marketing is not implemented, using simple read. + return c.readMarketing(ctx, makeReadParamsFromSearchParams(params), core.MarketingObjects[params.ObjectName]) + case core.CommunicationObjects.Has(params.ObjectName): + // Search for communication is not implemented, using simple read. + return c.readCommunications(ctx, makeReadParamsFromSearchParams(params), core.CommunicationObjects[params.ObjectName]) + case core.MiscellaneousObjects.Has(params.ObjectName): + // Search for misc objects is not implemented, using simple read. + return c.readMiscAPI(ctx, makeReadParamsFromSearchParams(params), core.MiscellaneousObjects[params.ObjectName]) + default: + // Otherwise object belongs to Hubspot Objects API (sub-category of CRM namespace). + return c.searchCRMObjectsAPI(ctx, params) } +} - url, err := c.getCRMObjectsSearchURL(config.ObjectName) +func (c *Connector) searchCRMObjectsAPI(ctx context.Context, params SearchParams) (*common.ReadResult, error) { + url, err := c.getCRMObjectsSearchURL(params.ObjectName) if err != nil { return nil, err } - rsp, err := c.JSONHTTPClient().Post(ctx, url.String(), makeFilterBody(config)) + rsp, err := c.JSONHTTPClient().Post(ctx, url.String(), makeFilterBody(params)) if err != nil { return nil, err } @@ -76,8 +99,8 @@ func (c *Connector) ReadUsingSearchAPI(ctx context.Context, config SearchParams) core.GetRecords, core.GetNextRecordsAfter, associations.CreateDataMarshallerWithAssociations( - ctx, c.associationsFiller, config.ObjectName, config.AssociatedObjects), - config.Fields, + ctx, c.associationsFiller, params.ObjectName, params.AssociatedObjects), + params.Fields, ) } @@ -191,6 +214,62 @@ func BuildIdFilterGroup(id string) Filter { } } +func makeReadParamsFromSearchParams(search SearchParams) common.ReadParams { + return common.ReadParams{ + ObjectName: search.ObjectName, + Fields: search.Fields, + NextPage: search.NextPage, + AssociatedObjects: search.AssociatedObjects, + Since: search.Since, + Until: search.Until, + } +} + +func (p SearchParams) applyTimestampFilters() SearchParams { + params := makeReadParamsFromSearchParams(p) + + sinceFilter := BuildLastModifiedFilterGroup(¶ms) + untilFilter := BuildUntilTimestampFilterGroup(¶ms) + + if len(p.FilterGroups) == 0 && (sinceFilter != (Filter{}) || untilFilter != (Filter{})) { + // Initialize group because either since or until (or both) should be populated. + p.FilterGroups = []FilterGroup{{ + Filters: Filters{}, + }} + } + + // Every group is "OR"ed together. + // If there are multiple groups it is logically ok to insert since/until in each. + for index, group := range p.FilterGroups { + if !group.hasFilter(sinceFilter) { + group.Filters = append(group.Filters, sinceFilter) + } + + if !group.hasFilter(untilFilter) { + group.Filters = append(group.Filters, untilFilter) + } + + p.FilterGroups[index] = group + } + + return p +} + +func (g FilterGroup) hasFilter(target Filter) bool { + if target == (Filter{}) { + return true + } + + for _, filter := range g.Filters { + if filter.FieldName == target.FieldName && + filter.Operator == target.Operator { + return true + } + } + + return false +} + // BuildSort builds a sort by clause for the given field and direction. func BuildSort(field ObjectField, dir SortDirection) SortBy { return SortBy{ diff --git a/providers/hubspot/search_test.go b/providers/hubspot/search_test.go index 80090098a9..902aba3e1b 100644 --- a/providers/hubspot/search_test.go +++ b/providers/hubspot/search_test.go @@ -4,6 +4,7 @@ import ( "errors" "net/http" "testing" + "time" "github.com/amp-labs/connectors" "github.com/amp-labs/connectors/common" @@ -281,3 +282,182 @@ func TestSearchResultsLimitConstant(t *testing.T) { t.Errorf("searchResultsLimit should be 10000, got %d", searchResultsLimit) } } + +func TestReadUsingSearchAPI(t *testing.T) { + t.Parallel() + + payloadContacts := testutils.DataFromFile(t, "read-via-search/contacts-payload.json") + responseContacts := testutils.DataFromFile(t, "read-via-search/contacts-response.json") + responseCampaigns := testutils.DataFromFile(t, "read-via-search/campaigns-response.json") + + contactsOutput := &common.ReadResult{ + Rows: 1, + Data: []common.ReadResultRow{{ + Id: "220006890315", + Fields: map[string]any{ + "email": "effieklestz@yahoo.com", + }, + Raw: map[string]any{ + "id": "220006890315", + "properties": map[string]any{ + "company": nil, + "createdate": "2026-05-06T11:48:03.785Z", + "email": "effieklestz@yahoo.com", + "firstname": nil, + "hs_object_id": "220006890315", + "lastmodifieddate": "2026-05-06T19:26:18.036Z", + "lastname": nil, + "phone": nil, + "website": nil, + }, + "createdAt": "2026-05-06T11:48:03.785Z", + "updatedAt": "2026-05-06T19:26:18.036Z", + "archived": false, + "url": "https://app.hubspot.com/contacts/44623425/record/0-1/220006890315", + }, + }}, + Done: true, + } + + tests := []SearchViaRead{ + { + Name: "Read contacts using search API without filters", + Input: SearchParams{ + ObjectName: "contacts", + Fields: connectors.Fields("email"), + Since: time.Date(2026, 5, 5, 23, 10, 0, 0, time.UTC), + }, + Server: mockserver.Conditional{ + Setup: mockserver.ContentJSON(), + If: mockcond.And{ + mockcond.MethodPOST(), + mockcond.Path("/crm/v3/objects/contacts/search"), + mockcond.BodyBytes(payloadContacts), + }, + Then: mockserver.Response(http.StatusOK, responseContacts), + }.Server(), + Expected: contactsOutput, + }, + { + Name: "Read contacts using search API with filters and since", + Input: SearchParams{ + ObjectName: "contacts", + Fields: connectors.Fields("email"), + Since: time.Date(2026, 5, 5, 23, 10, 0, 0, time.UTC), + FilterGroups: []FilterGroup{{ + Filters: []Filter{ + BuildLastModifiedFilterGroup(&common.ReadParams{ + ObjectName: "contacts", + Since: time.Date(2026, 5, 5, 23, 10, 0, 0, time.UTC), + }), + }, + }}, + }, + Server: mockserver.Conditional{ + Setup: mockserver.ContentJSON(), + If: mockcond.And{ + mockcond.MethodPOST(), + mockcond.Path("/crm/v3/objects/contacts/search"), + mockcond.BodyBytes(payloadContacts), + }, + Then: mockserver.Response(http.StatusOK, responseContacts), + }.Server(), + Expected: contactsOutput, + }, + { + Name: "Read contacts using search API with filters and no since", + Input: SearchParams{ + ObjectName: "contacts", + Fields: connectors.Fields("email"), + FilterGroups: []FilterGroup{{ + Filters: []Filter{ + BuildLastModifiedFilterGroup(&common.ReadParams{ + ObjectName: "contacts", + Since: time.Date(2026, 5, 5, 23, 10, 0, 0, time.UTC), + }), + }, + }}, + }, + Server: mockserver.Conditional{ + Setup: mockserver.ContentJSON(), + If: mockcond.And{ + mockcond.MethodPOST(), + mockcond.Path("/crm/v3/objects/contacts/search"), + mockcond.BodyBytes(payloadContacts), + }, + Then: mockserver.Response(http.StatusOK, responseContacts), + }.Server(), + Expected: contactsOutput, + }, + { + Name: "Read marketing campaigns via Search", + Input: SearchParams{ + ObjectName: "campaigns", + Fields: connectors.Fields("hs_budget_items_sum_amount", "hs_name", "hs_notes"), + Since: time.Date(2026, 5, 5, 23, 10, 0, 0, time.UTC), + }, + Server: mockserver.Conditional{ + Setup: mockserver.ContentJSON(), + If: mockcond.And{ + mockcond.MethodGET(), + mockcond.Path("/marketing/campaigns/2026-03"), + mockcond.BodyBytes(nil), + }, + Then: mockserver.Response(http.StatusOK, responseCampaigns), + }.Server(), + Expected: &common.ReadResult{ + Rows: 1, + Data: []common.ReadResultRow{{ + Id: "84f199fa-beb7-4dca-ad94-3d778cdce157", + Fields: map[string]any{ + "hs_budget_items_sum_amount": "2.0", + "hs_name": "Nurture", + "hs_notes": "Creating campaign from the Dashboard", + }, + Raw: map[string]any{ + "id": "84f199fa-beb7-4dca-ad94-3d778cdce157", + "properties": map[string]any{ + "hs_budget_items_sum_amount": "2.0", + "hs_name": "Nurture", + "hs_notes": "Creating campaign from the Dashboard", + }, + "createdAt": "2026-05-05T23:41:20.330Z", + "updatedAt": "2026-05-05T23:45:04.200Z", + "businessUnits": []any{ + map[string]any{"id": float64(0)}, + }, + }, + }}, + Done: true, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.Name, func(t *testing.T) { + t.Parallel() + + tt.Run(t, func() (*Connector, error) { + return constructTestConnector(tt.Server.URL) + }) + }) + } +} + +type ( + SearchViaReadType = testroutines.TestCase[SearchParams, *common.ReadResult] + SearchViaRead SearchViaReadType +) + +func (r SearchViaRead) Run(t *testing.T, builder testroutines.ConnectorBuilder[*Connector]) { + t.Helper() + + t.Cleanup(func() { + SearchViaReadType(r).Close() + }) + + conn := builder.Build(t, r.Name) + output, err := conn.ReadUsingSearchAPI(t.Context(), r.Input) + + SearchViaReadType(r).Validate(t, err, output) +} diff --git a/providers/hubspot/test/read-via-search/campaigns-response.json b/providers/hubspot/test/read-via-search/campaigns-response.json new file mode 100644 index 0000000000..be071731fc --- /dev/null +++ b/providers/hubspot/test/read-via-search/campaigns-response.json @@ -0,0 +1,20 @@ +{ + "total": 6, + "results": [ + { + "id": "84f199fa-beb7-4dca-ad94-3d778cdce157", + "properties": { + "hs_budget_items_sum_amount": "2.0", + "hs_name": "Nurture", + "hs_notes": "Creating campaign from the Dashboard" + }, + "createdAt": "2026-05-05T23:41:20.330Z", + "updatedAt": "2026-05-05T23:45:04.200Z", + "businessUnits": [ + { + "id": 0 + } + ] + } + ] +} diff --git a/providers/hubspot/test/read-via-search/contacts-payload.json b/providers/hubspot/test/read-via-search/contacts-payload.json new file mode 100644 index 0000000000..0f5ac88cc2 --- /dev/null +++ b/providers/hubspot/test/read-via-search/contacts-payload.json @@ -0,0 +1,17 @@ +{ + "filterGroups": [ + { + "filters": [ + { + "propertyName": "lastmodifieddate", + "operator": "GTE", + "value": "2026-05-05T23:10:00Z" + } + ] + } + ], + "limit": "200", + "properties": [ + "email" + ] +} diff --git a/providers/hubspot/test/read-via-search/contacts-response.json b/providers/hubspot/test/read-via-search/contacts-response.json new file mode 100644 index 0000000000..162123c42c --- /dev/null +++ b/providers/hubspot/test/read-via-search/contacts-response.json @@ -0,0 +1,23 @@ +{ + "total": 115, + "results": [ + { + "id": "220006890315", + "properties": { + "company": null, + "createdate": "2026-05-06T11:48:03.785Z", + "email": "effieklestz@yahoo.com", + "firstname": null, + "hs_object_id": "220006890315", + "lastmodifieddate": "2026-05-06T19:26:18.036Z", + "lastname": null, + "phone": null, + "website": null + }, + "createdAt": "2026-05-06T11:48:03.785Z", + "updatedAt": "2026-05-06T19:26:18.036Z", + "archived": false, + "url": "https://app.hubspot.com/contacts/44623425/record/0-1/220006890315" + } + ] +} diff --git a/providers/hubspot/types.go b/providers/hubspot/types.go index a86a0b925e..e1ceeefcc5 100644 --- a/providers/hubspot/types.go +++ b/providers/hubspot/types.go @@ -3,6 +3,7 @@ package hubspot import ( "fmt" "strconv" + "time" "github.com/amp-labs/connectors/common" "github.com/amp-labs/connectors/internal/datautils" @@ -10,6 +11,12 @@ import ( ) type SearchParams struct { + // Since limits results to records created or updated at or after this timestamp. + // Zero value means no lower bound. + Since time.Time + // Until limits results to records created or updated before this timestamp. + // Zero value means no upper bound. + Until time.Time // The name of the object we are reading, e.g. "Account" ObjectName string // required // NextPage is an opaque token that can be used to get the next page of results. diff --git a/providers/livestorm/jsonapi.go b/providers/livestorm/jsonapi.go index 31250358e9..6e155535ab 100644 --- a/providers/livestorm/jsonapi.go +++ b/providers/livestorm/jsonapi.go @@ -1,6 +1,8 @@ package livestorm import ( + "maps" + "github.com/amp-labs/connectors/internal/jsonquery" "github.com/spyzhov/ajson" ) @@ -70,9 +72,7 @@ func flattenJSONAPIResourceForFields(n *ajson.Node) (map[string]any, error) { return nil, err } - for k, v := range attrMap { - merged[k] = v - } + maps.Copy(merged, attrMap) return merged, nil } diff --git a/test/hubspot/read-via-search/campaigns/main.go b/test/hubspot/read-via-search/campaigns/main.go new file mode 100644 index 0000000000..345c2f53c1 --- /dev/null +++ b/test/hubspot/read-via-search/campaigns/main.go @@ -0,0 +1,35 @@ +package main + +import ( + "context" + "fmt" + "os" + "os/signal" + "syscall" + "time" + + "github.com/amp-labs/connectors" + "github.com/amp-labs/connectors/providers/hubspot" + connTest "github.com/amp-labs/connectors/test/hubspot" + "github.com/amp-labs/connectors/test/utils" +) + +func main() { + // Handle Ctrl-C gracefully. + ctx, done := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer done() + + conn := connTest.GetHubspotConnector(ctx) + + res, err := conn.ReadUsingSearchAPI(ctx, hubspot.SearchParams{ + ObjectName: "campaigns", + Fields: connectors.Fields("hs_name", "hs_notes", "hs_budget_items_sum_amount"), + Since: time.Date(2026, 5, 5, 23, 10, 0, 0, time.UTC), + }) + if err != nil { + utils.Fail("error reading from connector", "error", err) + } + + fmt.Println("Reading...") + utils.DumpJSON(res, os.Stdout) +} diff --git a/test/hubspot/read-via-search/contacts/main.go b/test/hubspot/read-via-search/contacts/main.go index 3c1f9eafe0..aca2b810be 100644 --- a/test/hubspot/read-via-search/contacts/main.go +++ b/test/hubspot/read-via-search/contacts/main.go @@ -6,8 +6,10 @@ import ( "os" "os/signal" "syscall" + "time" "github.com/amp-labs/connectors" + "github.com/amp-labs/connectors/common" "github.com/amp-labs/connectors/providers/hubspot" connTest "github.com/amp-labs/connectors/test/hubspot" "github.com/amp-labs/connectors/test/utils" @@ -29,6 +31,15 @@ func main() { AssociatedObjects: []string{ "companies", }, + Since: time.Date(2026, 5, 5, 23, 10, 0, 0, time.UTC), + FilterGroups: []hubspot.FilterGroup{{ + Filters: []hubspot.Filter{ + hubspot.BuildLastModifiedFilterGroup(&common.ReadParams{ + ObjectName: "contacts", + Since: time.Date(2026, 5, 5, 23, 10, 0, 0, time.UTC), + }), + }, + }}, }) if err != nil { utils.Fail("error reading from Hubspot", "error", err)