Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions providers/hubspot/internal/search/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,22 @@ func (s Strategy) Search(ctx context.Context, params *common.SearchParams) (*com
)
}

// Search has two execution paths:
// Search has three execution paths:
// - ObjectAPI: for core CRM objects supported by the canonical ObjectAPI endpoint.
// - Non-ObjectAPI: for CRM objects not supported by ObjectAPI, which use separate endpoints (e.g., Lists).
if core.CRMObjectsWithoutPropertiesAPISupport.Has(params.ObjectName) {
// - The rest: no search is supported.
switch {
case core.CRMObjectsWithoutPropertiesAPISupport.Has(params.ObjectName):
return s.searchViaNonstandardSearchAPI(ctx, params)
case core.MarketingObjects.Has(params.ObjectName):
fallthrough
case core.CommunicationObjects.Has(params.ObjectName):
fallthrough
case core.MiscellaneousObjects.Has(params.ObjectName):
return nil, common.ErrObjectNotSupported
default:
return s.searchViaObjectAPI(ctx, params)
}

return s.searchViaObjectAPI(ctx, params)
}

// checkSearchResultsLimit checks if the NextPage token exceeds HubSpot's search results limit.
Expand Down
36 changes: 22 additions & 14 deletions providers/hubspot/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ func (c *Connector) Read(ctx context.Context, params common.ReadParams) (*common
return nil, err
}

//
// Read using regular GET endpoints.
//
switch {
case core.CRMObjectsWithoutPropertiesAPISupport.Has(params.ObjectName):
// Object is part of CRM namespace but outside ObjectAPI.
Expand All @@ -50,43 +53,48 @@ func (c *Connector) Read(ctx context.Context, params common.ReadParams) (*common
}
}

// CRM objects can be read using two ways.
// - If there are Since/Until parameters it will use:
// https://api.hubapi.com/crm/objects/2026-03/{objectType}/search
// - Otherwise, the Objects API endpoint is used:
// https://api.hubapi.com/crm/objects/2026-03/{objectType}
func (c *Connector) readCRMObjectsAPI(
ctx context.Context, config common.ReadParams,
ctx context.Context, params common.ReadParams,
) (*common.ReadResult, error) { //nolint:funlen
// If filtering is required, then we have to use the search endpoint.
// The Search endpoint has a 10K record limit. In case this limit is reached,
// the sorting allows the caller to continue in another call by offsetting
// until the ID of the last record that was successfully fetched.
filters := make(Filters, 0)
if !config.Since.IsZero() {
filters = append(filters, BuildLastModifiedFilterGroup(&config))
if !params.Since.IsZero() {
filters = append(filters, BuildLastModifiedFilterGroup(&params))
}

if !config.Until.IsZero() {
filters = append(filters, BuildUntilTimestampFilterGroup(&config))
if !params.Until.IsZero() {
filters = append(filters, BuildUntilTimestampFilterGroup(&params))
}

filters = append(filters, BuildBuilderFilters(config.BuilderFilter)...)
filters = append(filters, BuildBuilderFilters(params.BuilderFilter)...)

if len(filters) != 0 {
searchParams := SearchParams{
ObjectName: config.ObjectName,
ObjectName: params.ObjectName,
FilterGroups: []FilterGroup{{
Filters: filters,
// Add more filter groups to OR them together
}},
SortBy: []SortBy{
BuildSort(ObjectFieldHsObjectId, SortDirectionAsc),
},
NextPage: config.NextPage,
Fields: config.Fields,
AssociatedObjects: config.AssociatedObjects,
NextPage: params.NextPage,
Fields: params.Fields,
AssociatedObjects: params.AssociatedObjects,
}

return c.ReadUsingSearchAPI(ctx, searchParams)
}

url, err := c.buildReadURL(config)
url, err := c.buildCRMReadURL(params)
if err != nil {
return nil, err
}
Expand All @@ -101,12 +109,12 @@ func (c *Connector) readCRMObjectsAPI(
core.GetRecords,
core.GetNextRecordsURL,
associations.CreateDataMarshallerWithAssociations(
ctx, c.associationsFiller, config.ObjectName, config.AssociatedObjects),
config.Fields,
ctx, c.associationsFiller, params.ObjectName, params.AssociatedObjects),
params.Fields,
)
}

func (c *Connector) buildReadURL(params common.ReadParams) (string, error) {
func (c *Connector) buildCRMReadURL(params common.ReadParams) (string, error) {
if len(params.NextPage) != 0 {
// If NextPage is set, then we're reading the next page of results.
// All that matters is the NextPage URL, the fields are ignored.
Expand Down
99 changes: 89 additions & 10 deletions providers/hubspot/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,39 +34,62 @@ func (c *Connector) Search(ctx context.Context, params *common.SearchParams) (*c
// This endpoint paginates using paging.next.after which is to be used as an offset.
// Archived results do not appear in search results.
// Read more @ https://developers.hubspot.com/docs/api/crm/search
func (c *Connector) ReadUsingSearchAPI(ctx context.Context, config SearchParams) (*common.ReadResult, error) {
func (c *Connector) ReadUsingSearchAPI( // nolint:cyclop
ctx context.Context, params SearchParams,
) (*common.ReadResult, error) {
ctx = logging.With(ctx, "connector", "hubspot")

if err := config.ValidateParams(); err != nil {
if err := params.ValidateParams(); err != nil {
return nil, err
}

// Check if the NextPage token exceeds the search results limit.
// HubSpot's search API returns a 400 error if you try to paginate beyond 10,000 records.
// By detecting this proactively, we can return a specific error that callers can handle.
if err := checkSearchResultsLimit(config.NextPage); err != nil {
if err := checkSearchResultsLimit(params.NextPage); err != nil {
return nil, fmt.Errorf(
"%w: requested offset %s exceeds limit %d",
common.ErrResultsLimitExceeded,
config.NextPage,
params.NextPage,
searchResultsLimit,
)
}

if core.CRMObjectsWithoutPropertiesAPISupport.Has(config.ObjectName) {
params = params.applyTimestampFilters()

//
// Read using POST endpoints intended for Search.
//

switch {
case core.CRMObjectsWithoutPropertiesAPISupport.Has(params.ObjectName):
// Objects outside ObjectAPI have different endpoint while both are part of CRM module.
// For instance such object is Lists.
return c.searchCRM(ctx, searchCRMParams{
SearchParams: config,
SearchParams: params,
})
case core.MarketingObjects.Has(params.ObjectName):
// Search for marketing is not implemented, using simple read.
return c.readMarketing(ctx, makeReadParamsFromSearchParams(params), core.MarketingObjects[params.ObjectName])
case core.CommunicationObjects.Has(params.ObjectName):
// Search for communication is not implemented, using simple read.
return c.readCommunications(ctx, makeReadParamsFromSearchParams(params), core.CommunicationObjects[params.ObjectName])
case core.MiscellaneousObjects.Has(params.ObjectName):
// Search for misc objects is not implemented, using simple read.
return c.readMiscAPI(ctx, makeReadParamsFromSearchParams(params), core.MiscellaneousObjects[params.ObjectName])
default:
// Otherwise object belongs to Hubspot Objects API (sub-category of CRM namespace).
return c.searchCRMObjectsAPI(ctx, params)
}
}

url, err := c.getCRMObjectsSearchURL(config.ObjectName)
func (c *Connector) searchCRMObjectsAPI(ctx context.Context, params SearchParams) (*common.ReadResult, error) {
url, err := c.getCRMObjectsSearchURL(params.ObjectName)
if err != nil {
return nil, err
}

rsp, err := c.JSONHTTPClient().Post(ctx, url.String(), makeFilterBody(config))
rsp, err := c.JSONHTTPClient().Post(ctx, url.String(), makeFilterBody(params))
if err != nil {
return nil, err
}
Expand All @@ -76,8 +99,8 @@ func (c *Connector) ReadUsingSearchAPI(ctx context.Context, config SearchParams)
core.GetRecords,
core.GetNextRecordsAfter,
associations.CreateDataMarshallerWithAssociations(
ctx, c.associationsFiller, config.ObjectName, config.AssociatedObjects),
config.Fields,
ctx, c.associationsFiller, params.ObjectName, params.AssociatedObjects),
params.Fields,
)
}

Expand Down Expand Up @@ -191,6 +214,62 @@ func BuildIdFilterGroup(id string) Filter {
}
}

func makeReadParamsFromSearchParams(search SearchParams) common.ReadParams {
return common.ReadParams{
ObjectName: search.ObjectName,
Fields: search.Fields,
NextPage: search.NextPage,
AssociatedObjects: search.AssociatedObjects,
Since: search.Since,
Until: search.Until,
}
}

func (p SearchParams) applyTimestampFilters() SearchParams {
params := makeReadParamsFromSearchParams(p)

sinceFilter := BuildLastModifiedFilterGroup(&params)
untilFilter := BuildUntilTimestampFilterGroup(&params)

if len(p.FilterGroups) == 0 && (sinceFilter != (Filter{}) || untilFilter != (Filter{})) {
// Initialize group because either since or until (or both) should be populated.
p.FilterGroups = []FilterGroup{{
Filters: Filters{},
}}
}

// Every group is "OR"ed together.
// If there are multiple groups it is logically ok to insert since/until in each.
for index, group := range p.FilterGroups {
if !group.hasFilter(sinceFilter) {
group.Filters = append(group.Filters, sinceFilter)
}

if !group.hasFilter(untilFilter) {
group.Filters = append(group.Filters, untilFilter)
}

p.FilterGroups[index] = group
}

return p
}

func (g FilterGroup) hasFilter(target Filter) bool {
if target == (Filter{}) {
return true
}

for _, filter := range g.Filters {
if filter.FieldName == target.FieldName &&
filter.Operator == target.Operator {
return true
}
}

return false
}

// BuildSort builds a sort by clause for the given field and direction.
func BuildSort(field ObjectField, dir SortDirection) SortBy {
return SortBy{
Expand Down
Loading
Loading