Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion providers/paypal/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (

"github.com/amp-labs/connectors/common"
"github.com/amp-labs/connectors/internal/components"
"github.com/amp-labs/connectors/internal/components/operations"
"github.com/amp-labs/connectors/internal/components/reader"
"github.com/amp-labs/connectors/internal/components/schema"
"github.com/amp-labs/connectors/internal/staticschema"
"github.com/amp-labs/connectors/providers"
Expand Down Expand Up @@ -32,6 +34,7 @@ type Connector struct {

// Supported operations
components.SchemaProvider
components.Reader
}

func NewConnector(params common.ConnectorParams) (*Connector, error) {
Expand All @@ -45,8 +48,18 @@ func NewSandboxConnector(params common.ConnectorParams) (*Connector, error) {
func constructor(base *components.Connector) (*Connector, error) {
connector := &Connector{Connector: base}

// Set the metadata provider for the connector
connector.SchemaProvider = schema.NewOpenAPISchemaProvider(common.ModuleRoot, schemas)

connector.Reader = reader.NewHTTPReader(
connector.HTTPClient().Client,
components.NewEmptyEndpointRegistry(),
common.ModuleRoot,
operations.ReadHandlers{
BuildRequest: connector.buildReadRequest,
ParseResponse: connector.parseReadResponse,
ErrorHandler: common.InterpretError,
},
)

return connector, nil
}
97 changes: 97 additions & 0 deletions providers/paypal/handlers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package paypal

import (
"context"
"net/http"
"strconv"
"time"

"github.com/amp-labs/connectors/common"
"github.com/amp-labs/connectors/common/urlbuilder"
"github.com/amp-labs/connectors/internal/datautils"
)

type timeFilter struct {
sinceParam string
untilParam string
}

// objectMaxPageSize maps each object that accepts a page_size query param to its API maximum.
//
//nolint:gochecknoglobals,mnd
var objectMaxPageSize = map[string]int{
"disputes": 50,
"invoices": 100,
"templates": 100,
"plans": 20,
"products": 20,
"transactions": 500,
"webhooks-events": 50,
}

//nolint:gochecknoglobals
var objectTimeFilter = datautils.NewDefaultMap(
map[string]timeFilter{
"disputes": {sinceParam: "update_time_after", untilParam: "update_time_before"},
"transactions": {sinceParam: "start_date", untilParam: "end_date"},
"webhooks-events": {sinceParam: "start_time", untilParam: "end_time"},
"balances": {sinceParam: "as_of_time", untilParam: ""},
},
func(_ string) timeFilter {
return timeFilter{}
},
)

func (c *Connector) buildReadRequest(ctx context.Context, params common.ReadParams) (*http.Request, error) { //nolint:cyclop,lll
if params.NextPage != "" {
return http.NewRequestWithContext(ctx, http.MethodGet, params.NextPage.String(), nil)
}

path, err := schemas.FindURLPath(common.ModuleRoot, params.ObjectName)
if err != nil {
return nil, err
}

url, err := urlbuilder.New(c.ProviderInfo().BaseURL, path)
if err != nil {
return nil, err
}

if maxSize, ok := objectMaxPageSize[params.ObjectName]; ok {
pageSize := maxSize
if params.PageSize > 0 && params.PageSize < maxSize {
pageSize = params.PageSize
}

url.WithQueryParam("page_size", strconv.Itoa(pageSize))
}

tf := objectTimeFilter.Get(params.ObjectName)

if tf.sinceParam != "" && !params.Since.IsZero() {
url.WithQueryParam(tf.sinceParam, params.Since.Format(time.RFC3339))
}

if tf.untilParam != "" && !params.Until.IsZero() {
url.WithQueryParam(tf.untilParam, params.Until.Format(time.RFC3339))
}

return http.NewRequestWithContext(ctx, http.MethodGet, url.String(), nil)
}

func (c *Connector) parseReadResponse(
ctx context.Context,
params common.ReadParams,
request *http.Request,
response *common.JSONHTTPResponse,
) (*common.ReadResult, error) {
responseKey := schemas.LookupArrayFieldName(common.ModuleRoot, params.ObjectName)

return common.ParseResult(
response,
common.ExtractOptionalRecordsFromPath(responseKey),
nextRecordsURL(),
common.GetMarshaledData,
params.Fields,
)
}
35 changes: 35 additions & 0 deletions providers/paypal/parse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package paypal

import (
"github.com/amp-labs/connectors/common"
"github.com/amp-labs/connectors/internal/jsonquery"
"github.com/spyzhov/ajson"
)

// nextRecordsURL extracts the next page URL from PayPal's HATEOAS links array.
// PayPal list responses include a root "links" array; the element with rel=="next"
// carries the full absolute URL for the next page.
func nextRecordsURL() common.NextPageFunc {
return func(node *ajson.Node) (string, error) {
links, err := jsonquery.New(node).ArrayOptional("links")
if err != nil || len(links) == 0 {
return "", nil //nolint:nilerr
}

for _, link := range links {
rel, err := jsonquery.New(link).StringOptional("rel")
if err != nil || rel == nil || *rel != "next" {
continue
}

href, err := jsonquery.New(link).StringOptional("href")
if err != nil || href == nil {
return "", nil //nolint:nilerr
}

return *href, nil
}

return "", nil
}
}
69 changes: 69 additions & 0 deletions test/paypal/read/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package main

import (
"context"
"log/slog"
"os"
"os/signal"
"syscall"
"time"

"github.com/amp-labs/connectors"
"github.com/amp-labs/connectors/common"
paypalTest "github.com/amp-labs/connectors/test/paypal"
"github.com/amp-labs/connectors/test/utils"
)

func main() {
// Handle Ctrl-C gracefully.
ctx, done := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer done()

// Set up slog logging.
utils.SetupLogging()

conn := paypalTest.GetPayPalConnector(ctx)

until := time.Now().UTC()
since := until.AddDate(0, 0, -30)

// Invoices — no time filter required.
res, err := conn.Read(ctx, common.ReadParams{
ObjectName: "invoices",
Fields: connectors.Fields("id", "detail", "status", "amount", "primary_recipients"),
})
if err != nil {
utils.Fail("error reading invoices", "error", err)
}

slog.Info("Reading invoices...")
utils.DumpJSON(res, os.Stdout)

// Disputes — filtered by last-updated time (update_time_after / update_time_before).
res, err = conn.Read(ctx, common.ReadParams{
ObjectName: "disputes",
Fields: connectors.Fields("dispute_id", "status", "reason", "dispute_amount", "create_time", "update_time"),
Since: since,
Until: until,
})
if err != nil {
utils.Fail("error reading disputes", "error", err)
}

slog.Info("Reading disputes...")
utils.DumpJSON(res, os.Stdout)

// Webhook events — filtered by creation time (start_time / end_time).
res, err = conn.Read(ctx, common.ReadParams{
ObjectName: "webhooks-events",
Fields: connectors.Fields("id", "event_type", "resource_type", "summary", "create_time"),
Since: since,
Until: until,
})
if err != nil {
utils.Fail("error reading webhooks-events", "error", err)
}

slog.Info("Reading webhooks-events...")
utils.DumpJSON(res, os.Stdout)
}
Loading