Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 22 additions & 14 deletions providers/hubspot/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ func (c *Connector) Read(ctx context.Context, params common.ReadParams) (*common
return nil, err
}

//
// Read using regular GET endpoints.
//
switch {
case core.CRMObjectsWithoutPropertiesAPISupport.Has(params.ObjectName):
// Object is part of CRM namespace but outside ObjectAPI.
Expand All @@ -50,43 +53,48 @@ func (c *Connector) Read(ctx context.Context, params common.ReadParams) (*common
}
}

// CRM objects can be read using two ways.
// - If there are Since/Until parameters it will use:
// https://api.hubapi.com/crm/objects/2026-03/{objectType}/search
// - Otherwise, the Objects API endpoint is used:
// https://api.hubapi.com/crm/objects/2026-03/{objectType}
func (c *Connector) readCRMObjectsAPI(
ctx context.Context, config common.ReadParams,
ctx context.Context, params common.ReadParams,
) (*common.ReadResult, error) { //nolint:funlen
// If filtering is required, then we have to use the search endpoint.
// The Search endpoint has a 10K record limit. In case this limit is reached,
// the sorting allows the caller to continue in another call by offsetting
// until the ID of the last record that was successfully fetched.
filters := make(Filters, 0)
if !config.Since.IsZero() {
filters = append(filters, BuildLastModifiedFilterGroup(&config))
if !params.Since.IsZero() {
filters = append(filters, BuildLastModifiedFilterGroup(&params))
}

if !config.Until.IsZero() {
filters = append(filters, BuildUntilTimestampFilterGroup(&config))
if !params.Until.IsZero() {
filters = append(filters, BuildUntilTimestampFilterGroup(&params))
}

filters = append(filters, BuildBuilderFilters(config.BuilderFilter)...)
filters = append(filters, BuildBuilderFilters(params.BuilderFilter)...)

if len(filters) != 0 {
searchParams := SearchParams{
ObjectName: config.ObjectName,
ObjectName: params.ObjectName,
FilterGroups: []FilterGroup{{
Filters: filters,
// Add more filter groups to OR them together
}},
SortBy: []SortBy{
BuildSort(ObjectFieldHsObjectId, SortDirectionAsc),
},
NextPage: config.NextPage,
Fields: config.Fields,
AssociatedObjects: config.AssociatedObjects,
NextPage: params.NextPage,
Fields: params.Fields,
AssociatedObjects: params.AssociatedObjects,
}

return c.ReadUsingSearchAPI(ctx, searchParams)
}

url, err := c.buildReadURL(config)
url, err := c.buildCRMReadURL(params)
if err != nil {
return nil, err
}
Expand All @@ -101,12 +109,12 @@ func (c *Connector) readCRMObjectsAPI(
core.GetRecords,
core.GetNextRecordsURL,
associations.CreateDataMarshallerWithAssociations(
ctx, c.associationsFiller, config.ObjectName, config.AssociatedObjects),
config.Fields,
ctx, c.associationsFiller, params.ObjectName, params.AssociatedObjects),
params.Fields,
)
}

func (c *Connector) buildReadURL(params common.ReadParams) (string, error) {
func (c *Connector) buildCRMReadURL(params common.ReadParams) (string, error) {
if len(params.NextPage) != 0 {
// If NextPage is set, then we're reading the next page of results.
// All that matters is the NextPage URL, the fields are ignored.
Expand Down
86 changes: 76 additions & 10 deletions providers/hubspot/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,39 +34,72 @@ func (c *Connector) Search(ctx context.Context, params *common.SearchParams) (*c
// This endpoint paginates using paging.next.after which is to be used as an offset.
// Archived results do not appear in search results.
// Read more @ https://developers.hubspot.com/docs/api/crm/search
func (c *Connector) ReadUsingSearchAPI(ctx context.Context, config SearchParams) (*common.ReadResult, error) {
func (c *Connector) ReadUsingSearchAPI( // nolint:cyclop
ctx context.Context, params SearchParams,
) (*common.ReadResult, error) {
ctx = logging.With(ctx, "connector", "hubspot")

if err := config.ValidateParams(); err != nil {
if err := params.ValidateParams(); err != nil {
return nil, err
}

// Check if the NextPage token exceeds the search results limit.
// HubSpot's search API returns a 400 error if you try to paginate beyond 10,000 records.
// By detecting this proactively, we can return a specific error that callers can handle.
if err := checkSearchResultsLimit(config.NextPage); err != nil {
if err := checkSearchResultsLimit(params.NextPage); err != nil {
return nil, fmt.Errorf(
"%w: requested offset %s exceeds limit %d",
common.ErrResultsLimitExceeded,
config.NextPage,
params.NextPage,
searchResultsLimit,
)
}

if core.CRMObjectsWithoutPropertiesAPISupport.Has(config.ObjectName) {
//
// Read using POST endpoints intended for Search.
//

switch {
case core.CRMObjectsWithoutPropertiesAPISupport.Has(params.ObjectName):
// Objects outside ObjectAPI have different endpoint while both are part of CRM module.
// For instance such object is Lists.
return c.searchCRM(ctx, searchCRMParams{
SearchParams: config,
SearchParams: params,
})
case core.MarketingObjects.Has(params.ObjectName):
readParams, err := makeReadParamsFromSearchParams(params)
if err != nil {
return nil, err
}
// Search for marketing is not implemented, using simple read.
return c.readMarketing(ctx, *readParams, core.MarketingObjects[params.ObjectName])
case core.CommunicationObjects.Has(params.ObjectName):
readParams, err := makeReadParamsFromSearchParams(params)
if err != nil {
return nil, err
}
// Search for communication is not implemented, using simple read.
return c.readCommunications(ctx, *readParams, core.CommunicationObjects[params.ObjectName])
case core.MiscellaneousObjects.Has(params.ObjectName):
readParams, err := makeReadParamsFromSearchParams(params)
if err != nil {
return nil, err
}
// Search for misc objects is not implemented, using simple read.
return c.readMiscAPI(ctx, *readParams, core.MiscellaneousObjects[params.ObjectName])
default:
// Otherwise object belongs to Hubspot Objects API (sub-category of CRM namespace).
return c.searchCRMObjectsAPI(ctx, params)
}
}

url, err := c.getCRMObjectsSearchURL(config.ObjectName)
func (c *Connector) searchCRMObjectsAPI(ctx context.Context, params SearchParams) (*common.ReadResult, error) {
url, err := c.getCRMObjectsSearchURL(params.ObjectName)
if err != nil {
return nil, err
}

rsp, err := c.JSONHTTPClient().Post(ctx, url.String(), makeFilterBody(config))
rsp, err := c.JSONHTTPClient().Post(ctx, url.String(), makeFilterBody(params))
if err != nil {
return nil, err
}
Expand All @@ -76,8 +109,8 @@ func (c *Connector) ReadUsingSearchAPI(ctx context.Context, config SearchParams)
core.GetRecords,
core.GetNextRecordsAfter,
associations.CreateDataMarshallerWithAssociations(
ctx, c.associationsFiller, config.ObjectName, config.AssociatedObjects),
config.Fields,
ctx, c.associationsFiller, params.ObjectName, params.AssociatedObjects),
params.Fields,
)
}

Expand Down Expand Up @@ -160,6 +193,7 @@ func BuildLastModifiedFilterGroup(params *common.ReadParams) Filter {
FieldName: string(lastModifiedField),
Operator: FilterOperatorTypeGTE,
Value: params.Since.Format(time.RFC3339),
isSince: true,
}
}

Expand All @@ -179,6 +213,7 @@ func BuildUntilTimestampFilterGroup(params *common.ReadParams) Filter {
FieldName: string(lastModifiedField),
Operator: FilterOperatorTypeLTE,
Value: params.Until.Format(time.RFC3339),
isUntil: true,
}
}

Expand All @@ -191,6 +226,37 @@ func BuildIdFilterGroup(id string) Filter {
}
}

func makeReadParamsFromSearchParams(search SearchParams) (*common.ReadParams, error) {
params := &common.ReadParams{
ObjectName: search.ObjectName,
Fields: search.Fields,
NextPage: search.NextPage,
AssociatedObjects: search.AssociatedObjects,
}

var err error

for _, group := range search.FilterGroups {
for _, filter := range group.Filters {
if filter.isSince {
params.Since, err = time.Parse(time.RFC3339, filter.Value)
if err != nil {
return nil, err
}
}

if filter.isUntil {
params.Until, err = time.Parse(time.RFC3339, filter.Value)
if err != nil {
return nil, err
}
}
}
}

return params, nil
}

// BuildSort builds a sort by clause for the given field and direction.
func BuildSort(field ObjectField, dir SortDirection) SortBy {
return SortBy{
Expand Down
3 changes: 3 additions & 0 deletions providers/hubspot/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ type Filter struct {
FieldName string `json:"propertyName,omitempty"`
Operator FilterOperatorType `json:"operator,omitempty"`
Value string `json:"value,omitempty"`

isSince bool
Comment thread
Cobalt0s marked this conversation as resolved.
Outdated
isUntil bool
}

type (
Expand Down
6 changes: 3 additions & 3 deletions providers/livestorm/jsonapi.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package livestorm

import (
"maps"

"github.com/amp-labs/connectors/internal/jsonquery"
"github.com/spyzhov/ajson"
)
Expand Down Expand Up @@ -70,9 +72,7 @@ func flattenJSONAPIResourceForFields(n *ajson.Node) (map[string]any, error) {
return nil, err
}

for k, v := range attrMap {
merged[k] = v
}
maps.Copy(merged, attrMap)

return merged, nil
}
42 changes: 42 additions & 0 deletions test/hubspot/read-via-search/campaigns/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package main

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"

"github.com/amp-labs/connectors"
"github.com/amp-labs/connectors/common"
"github.com/amp-labs/connectors/providers/hubspot"
connTest "github.com/amp-labs/connectors/test/hubspot"
"github.com/amp-labs/connectors/test/utils"
)

func main() {
// Handle Ctrl-C gracefully.
ctx, done := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer done()

conn := connTest.GetHubspotConnector(ctx)

res, err := conn.ReadUsingSearchAPI(ctx, hubspot.SearchParams{
ObjectName: "campaigns",
Fields: connectors.Fields("hs_name", "hs_notes", "hs_budget_items_sum_amount"),
FilterGroups: []hubspot.FilterGroup{{
Filters: []hubspot.Filter{
hubspot.BuildLastModifiedFilterGroup(&common.ReadParams{
Since: time.Date(2026, 5, 5, 23, 10, 0, 0, time.UTC),
}),
},
}},
})
if err != nil {
utils.Fail("error reading from connector", "error", err)
}

fmt.Println("Reading...")
utils.DumpJSON(res, os.Stdout)
}
Loading