Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions providers/hubspot/internal/marketing/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# HubSpot Marketing

This connector integrates with HubSpot’s Marketing APIs.

## ListObjectMetadata

Object metadata is defined in `schema.json`.

This file is maintained manually based on the official HubSpot documentation.
27 changes: 27 additions & 0 deletions providers/hubspot/internal/marketing/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package marketing

import (
"github.com/amp-labs/connectors/common"
"github.com/amp-labs/connectors/common/urlbuilder"
"github.com/amp-labs/connectors/internal/components"
"github.com/amp-labs/connectors/internal/components/deleter"
"github.com/amp-labs/connectors/internal/components/operations"
Expand All @@ -10,6 +11,7 @@ import (
"github.com/amp-labs/connectors/internal/components/writer"
"github.com/amp-labs/connectors/providers"
"github.com/amp-labs/connectors/providers/hubspot/internal/shared"
"github.com/amp-labs/connectors/test/utils/mockutils"
)

// Adapter handles CRUD operations against HubSpot's Marketing Hub product.
Expand Down Expand Up @@ -72,3 +74,28 @@ func constructor(base *components.Connector) (*Adapter, error) {

return adapter, nil
}

func (a *Adapter) getURL(objectName string) (*urlbuilder.URL, error) {
path, err := Schemas.FindURLPath(a.Module(), objectName)
if err != nil {
return nil, common.ErrOperationNotSupportedForObject
}

return urlbuilder.New(a.ModuleInfo().BaseURL, path, shared.APIVersion2026March)
}

func constructTestAdapter(serverURL string) (*Adapter, error) {
adapter, err := NewAdapter(
&common.ConnectorParams{
AuthenticatedClient: mockutils.NewClient(),
Module: providers.ModuleHubspotMarketing,
},
)
if err != nil {
return nil, err
}

adapter.SetUnitTestMockServerBaseURL(serverURL)

return adapter, nil
}
88 changes: 88 additions & 0 deletions providers/hubspot/internal/marketing/metadata_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package marketing

import (
"testing"

"github.com/amp-labs/connectors"
"github.com/amp-labs/connectors/common"
"github.com/amp-labs/connectors/test/utils/mockutils/mockserver"
"github.com/amp-labs/connectors/test/utils/testroutines"
)

func TestListObjectMetadata(t *testing.T) { // nolint:funlen,gocognit,cyclop
t.Parallel()

tests := []testroutines.Metadata{
{
Name: "At least one object name must be queried",
Input: nil,
Server: mockserver.Dummy(),
ExpectedErrs: []error{common.ErrMissingObjects},
},
{
Name: "Unknown object requested",
Input: []string{"butterflies"},
Server: mockserver.Dummy(),
Comparator: testroutines.ComparatorSubsetMetadata,
Expected: &common.ListObjectMetadataResult{
Errors: map[string]error{
"butterflies": common.ErrObjectNotSupported,
},
},
},
{
Name: "Successfully describe campaigns",
Input: []string{"campaigns"},
Server: mockserver.Dummy(),
Comparator: testroutines.ComparatorSubsetMetadata,
Expected: &common.ListObjectMetadataResult{
Result: map[string]common.ObjectMetadata{
"campaigns": {
DisplayName: "Campaigns",
Fields: map[string]common.FieldMetadata{
"hs_campaign_status": {
DisplayName: "Campaign Status",
ValueType: "singleSelect",
ProviderType: "Enumeration",
Values: common.FieldValues{{
Value: "planned",
DisplayValue: "planned",
}, {
Value: "in_progress",
DisplayValue: "in_progress",
}, {
Value: "active",
DisplayValue: "active",
}, {
Value: "paused",
DisplayValue: "paused",
}, {
Value: "completed",
DisplayValue: "completed",
}},
},
"hs_name": {
DisplayName: "Name",
ValueType: "string",
ProviderType: "String",
},
},
},
},
Errors: nil,
},
ExpectedErrs: nil,
},
}

for _, tt := range tests {
// nolint:varnamelen
t.Run(tt.Name, func(t *testing.T) {
t.Parallel()

tt.Run(t, func() (connectors.ObjectMetadataConnector, error) {
return constructTestAdapter(tt.Server.URL)
})
})
}
}
67 changes: 64 additions & 3 deletions providers/hubspot/internal/marketing/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,80 @@ package marketing
import (
"context"
"net/http"
"strings"
"time"

"github.com/amp-labs/connectors/common"
"github.com/amp-labs/connectors/common/readhelper"
"github.com/amp-labs/connectors/common/urlbuilder"
"github.com/amp-labs/connectors/providers/hubspot/internal/shared"
)

func (a *Adapter) buildReadRequest(ctx context.Context, params common.ReadParams) (*http.Request, error) {
return nil, common.ErrNotImplemented
url, err := a.buildReadURL(params)
if err != nil {
return nil, err
}

return http.NewRequestWithContext(ctx, http.MethodGet, url.String(), nil)
}

// When reading objects in Hubspot you must explicitly request the fields.
// https://developers.hubspot.com/docs/api-reference/latest/marketing/campaigns/guide#campaign-properties
//
// Reading campaigns object:
// https://developers.hubspot.com/docs/api-reference/latest/marketing/campaigns/get-campaigns
// - Incremental reading is not available.
// - Sorting is applied using "updatedAt" field from newest to oldest.
func (a *Adapter) buildReadURL(params common.ReadParams) (*urlbuilder.URL, error) {
if len(params.NextPage) != 0 {
// Next page
return urlbuilder.New(params.NextPage.String())
}

// First page
url, err := a.getURL(params.ObjectName)
if err != nil {
return nil, err
}

url.WithQueryParam("properties", strings.Join(params.Fields.List(), ","))
url.WithQueryParam("limit", readhelper.PageSizeWithDefaultStr(params, shared.DefaultPageSize))
url.WithQueryParam("sort", "-updatedAt") // newest first

return url, nil
}

func (a *Adapter) parseReadResponse(
ctx context.Context,
params common.ReadParams,
request *http.Request,
response *common.JSONHTTPResponse,
resp *common.JSONHTTPResponse,
) (*common.ReadResult, error) {
return nil, common.ErrNotImplemented
return common.ParseResultFiltered(
params,
resp,
common.MakeRecordsFunc("results"),
makeIncrementalFilterFunc(params),
readhelper.MakeMarshaledDataFuncWithId(
common.FlattenNestedFields("properties"),
readhelper.IdFieldQuery{Field: "id"},
),
params.Fields,
)
}

// makeIncrementalFilterFunc embodies connector-side filtering.
// ReverseOrder is used because we request Campaigns sorted from newest to oldest.
func makeIncrementalFilterFunc(params common.ReadParams) common.RecordsFilterFunc {
if params.Since.IsZero() && params.Until.IsZero() {
return readhelper.MakeIdentityFilterFunc(shared.GetNextRecordsURL)
}

return readhelper.MakeTimeFilterFunc(
readhelper.ReverseOrder,
readhelper.NewTimeBoundary(),
"updatedAt", time.RFC3339,
shared.GetNextRecordsURL,
)
}
159 changes: 159 additions & 0 deletions providers/hubspot/internal/marketing/read_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package marketing

import (
"net/http"
"testing"
"time"

"github.com/amp-labs/connectors"
"github.com/amp-labs/connectors/common"
"github.com/amp-labs/connectors/test/utils/mockutils/mockcond"
"github.com/amp-labs/connectors/test/utils/mockutils/mockserver"
"github.com/amp-labs/connectors/test/utils/testroutines"
"github.com/amp-labs/connectors/test/utils/testutils"
)

func TestRead(t *testing.T) {
t.Parallel()

responseCampaignsFirst := testutils.DataFromFile(t, "read/campaigns/1-first-page.json")
responseCampaignsLast := testutils.DataFromFile(t, "read/campaigns/2-last-page.json")

tests := []testroutines.Read{
{
Name: "Read campaigns first page",
Input: common.ReadParams{
ObjectName: "campaigns",
Fields: connectors.Fields("hs_name", "hs_notes", "hs_budget_items_sum_amount"),
},
Server: mockserver.Conditional{
Setup: mockserver.ContentJSON(),
If: mockcond.And{
mockcond.MethodGET(),
mockcond.Path("/marketing/campaigns/2026-03"),
mockcond.QueryParam("limit", "100"),
mockcond.QueryParam("sort", "-updatedAt"),
},
Then: mockserver.Response(http.StatusOK, responseCampaignsFirst),
}.Server(),
Comparator: testroutines.ComparatorSubsetRead,
Expected: &common.ReadResult{
Rows: 2,
Data: []common.ReadResultRow{{
Fields: map[string]any{
"hs_name": "Nurture",
"hs_notes": "Creating campaign from the Dashboard",
"hs_budget_items_sum_amount": "2.0",
},
Raw: map[string]any{
"id": "84f199fa-beb7-4dca-ad94-3d778cdce157",
"properties": map[string]any{
"hs_name": "Nurture",
"hs_notes": "Creating campaign from the Dashboard",
"hs_budget_items_sum_amount": "2.0",
},
"createdAt": "2026-05-05T23:41:20.330Z",
"updatedAt": "2026-05-05T23:45:04.200Z",
},
Id: "84f199fa-beb7-4dca-ad94-3d778cdce157",
}, {
Fields: map[string]any{
"hs_name": "Kiwi",
},
Raw: map[string]any{
"id": "fc4583f7-5cfc-4773-8fa4-076cd4f4ae6d",
"properties": map[string]any{
"hs_name": "Kiwi",
},
"createdAt": "2026-05-05T23:09:27.549Z",
"updatedAt": "2026-05-05T23:09:27.713Z",
},
Id: "fc4583f7-5cfc-4773-8fa4-076cd4f4ae6d",
}},
NextPage: "https://api.hubapi.com/marketing/campaigns/2026-03?limit=2&sort=-updatedAt&properties=hs_name%2Chs_notes%2Chs_budget_items_sum_amount&after=Mg%3D%3D",
Done: false,
},
},
{
Name: "Read campaigns with connector side filtering",
Input: common.ReadParams{
ObjectName: "campaigns",
Fields: connectors.Fields("hs_name"),
// The first item will be returned, last filtered out.
// Due to the sort order there will be no next page.
// The record which is excluded has this timestamp: 2026-05-05T23:09:27.713Z
Since: time.Date(2026, 5, 5, 23, 10, 0, 0, time.UTC),
},
Server: mockserver.Conditional{
Setup: mockserver.ContentJSON(),
If: mockcond.And{
mockcond.MethodGET(),
mockcond.Path("/marketing/campaigns/2026-03"),
},
Then: mockserver.Response(http.StatusOK, responseCampaignsFirst),
}.Server(),
Comparator: testroutines.ComparatorSubsetRead,
Expected: &common.ReadResult{
Rows: 1,
Data: []common.ReadResultRow{
{
Fields: map[string]any{
"hs_name": "Nurture",
},
Raw: map[string]any{
"updatedAt": "2026-05-05T23:45:04.200Z",
},
Id: "84f199fa-beb7-4dca-ad94-3d778cdce157",
},
},
NextPage: "",
Done: true,
},
},
{
Name: "Read campaigns last page",
Input: common.ReadParams{
ObjectName: "campaigns",
Fields: connectors.Fields("hs_name"),
NextPage: testroutines.URLTestServer + "/marketing/campaigns/2026-03?limit=2&sort=-updatedAt&properties=hs_name%2Chs_notes%2Chs_budget_items_sum_amount&after=Mg%3D%3D",
},
Server: mockserver.Conditional{
Setup: mockserver.ContentJSON(),
If: mockcond.And{
mockcond.MethodGET(),
mockcond.Path("/marketing/campaigns/2026-03"),
mockcond.QueryParam("after", "Mg=="),
},
Then: mockserver.Response(http.StatusOK, responseCampaignsLast),
}.Server(),
Comparator: testroutines.ComparatorSubsetRead,
Expected: &common.ReadResult{
Rows: 1,
Data: []common.ReadResultRow{
{
Fields: map[string]any{
"hs_name": "Inbound",
},
Raw: map[string]any{
"createdAt": "2026-05-05T23:07:11.797Z",
"updatedAt": "2026-05-05T23:07:12.040Z",
},
Id: "5f7bff76-193f-43af-968b-f13c6576ca76",
},
},
NextPage: "",
Done: true,
},
},
}

for _, tt := range tests {
t.Run(tt.Name, func(t *testing.T) {
t.Parallel()

tt.Run(t, func() (connectors.ReadConnector, error) {
return constructTestAdapter(tt.Server.URL)
})
})
}
}
Loading
Loading