diff --git a/providers/paypal/connector.go b/providers/paypal/connector.go index 451f09336..8d9505f84 100644 --- a/providers/paypal/connector.go +++ b/providers/paypal/connector.go @@ -5,6 +5,8 @@ import ( "github.com/amp-labs/connectors/common" "github.com/amp-labs/connectors/internal/components" + "github.com/amp-labs/connectors/internal/components/operations" + "github.com/amp-labs/connectors/internal/components/reader" "github.com/amp-labs/connectors/internal/components/schema" "github.com/amp-labs/connectors/internal/staticschema" "github.com/amp-labs/connectors/providers" @@ -32,6 +34,7 @@ type Connector struct { // Supported operations components.SchemaProvider + components.Reader } func NewConnector(params common.ConnectorParams) (*Connector, error) { @@ -45,8 +48,18 @@ func NewSandboxConnector(params common.ConnectorParams) (*Connector, error) { func constructor(base *components.Connector) (*Connector, error) { connector := &Connector{Connector: base} - // Set the metadata provider for the connector connector.SchemaProvider = schema.NewOpenAPISchemaProvider(common.ModuleRoot, schemas) + connector.Reader = reader.NewHTTPReader( + connector.HTTPClient().Client, + components.NewEmptyEndpointRegistry(), + common.ModuleRoot, + operations.ReadHandlers{ + BuildRequest: connector.buildReadRequest, + ParseResponse: connector.parseReadResponse, + ErrorHandler: common.InterpretError, + }, + ) + return connector, nil } diff --git a/providers/paypal/handlers.go b/providers/paypal/handlers.go new file mode 100644 index 000000000..67d77f651 --- /dev/null +++ b/providers/paypal/handlers.go @@ -0,0 +1,97 @@ +package paypal + +import ( + "context" + "net/http" + "strconv" + "time" + + "github.com/amp-labs/connectors/common" + "github.com/amp-labs/connectors/common/urlbuilder" + "github.com/amp-labs/connectors/internal/datautils" +) + +type timeFilter struct { + sinceParam string + untilParam string +} + +// objectMaxPageSize maps each object that accepts a page_size query param to its API maximum. +// +//nolint:gochecknoglobals,mnd +var objectMaxPageSize = map[string]int{ + "disputes": 50, + "invoices": 100, + "templates": 100, + "plans": 20, + "products": 20, + "transactions": 500, + "webhooks-events": 50, +} + +//nolint:gochecknoglobals +var objectTimeFilter = datautils.NewDefaultMap( + map[string]timeFilter{ + "disputes": {sinceParam: "update_time_after", untilParam: "update_time_before"}, + "transactions": {sinceParam: "start_date", untilParam: "end_date"}, + "webhooks-events": {sinceParam: "start_time", untilParam: "end_time"}, + "balances": {sinceParam: "as_of_time", untilParam: ""}, + }, + func(_ string) timeFilter { + return timeFilter{} + }, +) + +func (c *Connector) buildReadRequest(ctx context.Context, params common.ReadParams) (*http.Request, error) { //nolint:cyclop,lll + if params.NextPage != "" { + return http.NewRequestWithContext(ctx, http.MethodGet, params.NextPage.String(), nil) + } + + path, err := schemas.FindURLPath(common.ModuleRoot, params.ObjectName) + if err != nil { + return nil, err + } + + url, err := urlbuilder.New(c.ProviderInfo().BaseURL, path) + if err != nil { + return nil, err + } + + if maxSize, ok := objectMaxPageSize[params.ObjectName]; ok { + pageSize := maxSize + if params.PageSize > 0 && params.PageSize < maxSize { + pageSize = params.PageSize + } + + url.WithQueryParam("page_size", strconv.Itoa(pageSize)) + } + + tf := objectTimeFilter.Get(params.ObjectName) + + if tf.sinceParam != "" && !params.Since.IsZero() { + url.WithQueryParam(tf.sinceParam, params.Since.Format(time.RFC3339)) + } + + if tf.untilParam != "" && !params.Until.IsZero() { + url.WithQueryParam(tf.untilParam, params.Until.Format(time.RFC3339)) + } + + return http.NewRequestWithContext(ctx, http.MethodGet, url.String(), nil) +} + +func (c *Connector) parseReadResponse( + ctx context.Context, + params common.ReadParams, + request *http.Request, + response *common.JSONHTTPResponse, +) (*common.ReadResult, error) { + responseKey := schemas.LookupArrayFieldName(common.ModuleRoot, params.ObjectName) + + return common.ParseResult( + response, + common.ExtractOptionalRecordsFromPath(responseKey), + nextRecordsURL(), + common.GetMarshaledData, + params.Fields, + ) +} diff --git a/providers/paypal/parse.go b/providers/paypal/parse.go new file mode 100644 index 000000000..ff1fd5794 --- /dev/null +++ b/providers/paypal/parse.go @@ -0,0 +1,35 @@ +package paypal + +import ( + "github.com/amp-labs/connectors/common" + "github.com/amp-labs/connectors/internal/jsonquery" + "github.com/spyzhov/ajson" +) + +// nextRecordsURL extracts the next page URL from PayPal's HATEOAS links array. +// PayPal list responses include a root "links" array; the element with rel=="next" +// carries the full absolute URL for the next page. +func nextRecordsURL() common.NextPageFunc { + return func(node *ajson.Node) (string, error) { + links, err := jsonquery.New(node).ArrayOptional("links") + if err != nil || len(links) == 0 { + return "", nil //nolint:nilerr + } + + for _, link := range links { + rel, err := jsonquery.New(link).StringOptional("rel") + if err != nil || rel == nil || *rel != "next" { + continue + } + + href, err := jsonquery.New(link).StringOptional("href") + if err != nil || href == nil { + return "", nil //nolint:nilerr + } + + return *href, nil + } + + return "", nil + } +} diff --git a/test/paypal/read/main.go b/test/paypal/read/main.go new file mode 100644 index 000000000..6169ae7c1 --- /dev/null +++ b/test/paypal/read/main.go @@ -0,0 +1,69 @@ +package main + +import ( + "context" + "log/slog" + "os" + "os/signal" + "syscall" + "time" + + "github.com/amp-labs/connectors" + "github.com/amp-labs/connectors/common" + paypalTest "github.com/amp-labs/connectors/test/paypal" + "github.com/amp-labs/connectors/test/utils" +) + +func main() { + // Handle Ctrl-C gracefully. + ctx, done := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer done() + + // Set up slog logging. + utils.SetupLogging() + + conn := paypalTest.GetPayPalConnector(ctx) + + until := time.Now().UTC() + since := until.AddDate(0, 0, -30) + + // Invoices — no time filter required. + res, err := conn.Read(ctx, common.ReadParams{ + ObjectName: "invoices", + Fields: connectors.Fields("id", "detail", "status", "amount", "primary_recipients"), + }) + if err != nil { + utils.Fail("error reading invoices", "error", err) + } + + slog.Info("Reading invoices...") + utils.DumpJSON(res, os.Stdout) + + // Disputes — filtered by last-updated time (update_time_after / update_time_before). + res, err = conn.Read(ctx, common.ReadParams{ + ObjectName: "disputes", + Fields: connectors.Fields("dispute_id", "status", "reason", "dispute_amount", "create_time", "update_time"), + Since: since, + Until: until, + }) + if err != nil { + utils.Fail("error reading disputes", "error", err) + } + + slog.Info("Reading disputes...") + utils.DumpJSON(res, os.Stdout) + + // Webhook events — filtered by creation time (start_time / end_time). + res, err = conn.Read(ctx, common.ReadParams{ + ObjectName: "webhooks-events", + Fields: connectors.Fields("id", "event_type", "resource_type", "summary", "create_time"), + Since: since, + Until: until, + }) + if err != nil { + utils.Fail("error reading webhooks-events", "error", err) + } + + slog.Info("Reading webhooks-events...") + utils.DumpJSON(res, os.Stdout) +}