diff --git a/common/http.go b/common/http.go index f15024d3c2..5e1cbd696c 100644 --- a/common/http.go +++ b/common/http.go @@ -110,12 +110,26 @@ type ErrorHandler func(rsp *http.Response, body []byte) error type ResponseHandler func(rsp *http.Response) (*http.Response, error) -// HTTPClient is an HTTP client that handles OAuth access token refreshes. +// ShouldHandleError determines whether the default or custom ErrorHandler +// should be invoked for a given HTTP response. +// Returning true indicates that the response represents an error that requires handling. +type ShouldHandleError func(response *http.Response) bool + +// HTTPClient is an HTTP client that handles OAuth access token refreshes +// and provides hooks for custom error and response handling. type HTTPClient struct { - Base string // optional base URL. If not set, then all URLs must be absolute. - Client AuthenticatedHTTPClient // underlying HTTP client. Required. - ErrorHandler ErrorHandler // optional error handler. If not set, then the default error handler is used. - ResponseHandler ResponseHandler // optional, Allows mutation of the http.Response from the Saas API response. + // [Deprecated] URL endpoints are not the responsibility of HTTPClient. + // NOTE: to avoid linter errors the deprecation comment is not of correct golang formatting. + // Optional base URL. If unset, all request URLs must be absolute. + Base string + // Underlying HTTP client. Required. + Client AuthenticatedHTTPClient + // Optional ErrorHandler. If not set, then the default error handler is used. + ErrorHandler ErrorHandler + // Optional ResponseHandler, allowing mutation of the http.Response returned by the SaaS API. + ResponseHandler ResponseHandler + // Optional predicate deciding whether the ErrorHandler should be invoked. + ShouldHandleError ShouldHandleError } // getURL returns the base prefixed URL. @@ -597,15 +611,26 @@ func (h *HTTPClient) sendRequest(req *http.Request) (*http.Response, []byte, err return nil, nil, fmt.Errorf("error reading response body: %w", err) } - // Check the response status code - if res.StatusCode < 200 || res.StatusCode > 299 { + shouldHandleError := h.ShouldHandleError + if shouldHandleError == nil { + // Default predicate: treat "non-2xx" responses as requiring error handling. + shouldHandleError = func(response *http.Response) bool { + return response.StatusCode < 200 || response.StatusCode > 299 + } + } + + if shouldHandleError(res) { if h.ErrorHandler != nil { + // Invoke the custom error handler. return res, body, h.ErrorHandler(res, body) } + // Fallback to generic error interpretation. return res, body, InterpretError(res, body) } + // Response may indicate a logical failure at the API level (e.g., a record-level error), + // but it is not a fatal HTTP error. Connectors can handle it according to their contract. return res, body, nil } diff --git a/common/types.go b/common/types.go index e409bb839c..018ba88217 100644 --- a/common/types.go +++ b/common/types.go @@ -193,7 +193,14 @@ type WriteParams struct { Associations any // optional } +func (p WriteParams) GetRecord() (Record, error) { + return RecordDataToMap(p.RecordData) +} + // RecordDataToMap converts WriteParams.RecordData into a map[string]any. +// +// When possible use WriteParams.GetRecord instead. +// // If RecordData is already a map, it is returned directly. // Otherwise, it is serialized to JSON and then deserialized back into a map. func RecordDataToMap(recordData any) (map[string]any, error) { @@ -318,6 +325,22 @@ type BatchWriteParam struct { Records []any } +func (p BatchWriteParam) IsCreate() bool { + return p.Type == BatchWriteTypeCreate +} + +func (p BatchWriteParam) IsUpdate() bool { + return p.Type == BatchWriteTypeUpdate +} + +type Record map[string]any + +func (p BatchWriteParam) GetRecords() ([]Record, error) { + return datautils.ForEachWithErr(p.Records, func(record any) (Record, error) { + return RecordDataToMap(record) + }) +} + // BatchWriteResult aggregates the outcome of a synchronous batch write operation. // It reports an overall batch status, any top-level errors, and the per-record // results for each record processed in the batch. @@ -586,6 +609,10 @@ type ObjectEvents struct { type ObjectName string +func (n ObjectName) String() string { + return string(n) +} + type SubscribeParams struct { Request any // RegistrationResult is the result of the Connector.Register call. diff --git a/common/validation.go b/common/validation.go index 3c4bf361a0..23aedd3b61 100644 --- a/common/validation.go +++ b/common/validation.go @@ -69,7 +69,12 @@ func (p DeleteParams) ValidateParams() error { return nil } -var ErrUnknownBatchWriteType = errors.New("unknown batch write type") +var ( + // ErrUnknownBatchWriteType is returned when enum option for the write type is invalid. + ErrUnknownBatchWriteType = errors.New("unknown batch write type") + // ErrUnsupportedBatchWriteType is returned when connector doesn't implement batch write type. + ErrUnsupportedBatchWriteType = errors.New("batch write type is not supported") +) func (p BatchWriteParam) ValidateParams() error { if len(p.ObjectName) == 0 { diff --git a/internal/codec/record.go b/internal/codec/record.go new file mode 100644 index 0000000000..d1ba06e2cb --- /dev/null +++ b/internal/codec/record.go @@ -0,0 +1,64 @@ +package codec + +import ( + "encoding/json" + "fmt" + + "github.com/amp-labs/connectors/common" + "github.com/amp-labs/connectors/internal/datautils" +) + +// DecoratedRecord merges a dynamic record with a structured extension, +// producing a single flattened JSON object when marshaled. +// +// It embeds a [common.Record] containing user-defined key–value pairs +// and a typed struct [T] representing schema-bound fields that must +// coexist with those user-supplied values in the API payload. +// +// When marshaled, fields from both Record and Extension are serialized +// at the same level in the resulting JSON. This allows connectors to +// enrich arbitrary record data with well-defined metadata or attributes. +// +// Example: +// +// type MyPayloadForRecord = codec.DecoratedRecord[RecordExtension] +// +// type RecordExtension struct { +// ObjectName string `json:"objectName"` +// } +// +// record := common.Record{"name": "Bob"} +// extension := RecordExtension{ObjectName: "users"} +// item := codec.DecoratedRecord[RecordExtension]{Record: record, Extension: extension} +// +// // Output: +// // {"name": "Bob", "objectName": "users"} +type DecoratedRecord[T any] struct { + common.Record + Extension T +} + +func (d *DecoratedRecord[T]) MarshalJSON() ([]byte, error) { + // Create a copy of records. + jsonProperties, err := datautils.FromMap(d.Record).DeepCopy() + if err != nil { + return nil, err + } + + // Marshal the extension struct. + extBytes, err := json.Marshal(d.Extension) + if err != nil { + return nil, fmt.Errorf("marshal extension: %w", err) + } + + var additionalProperties map[string]any + if err = json.Unmarshal(extBytes, &additionalProperties); err != nil { + return nil, fmt.Errorf("unmarshal extension: %w", err) + } + + // Enhance final JSON map with properties from extension. + datautils.FromMap(jsonProperties).AddMapValues(additionalProperties) + + // Marshall combined map. + return json.Marshal(jsonProperties) +}