diff --git a/providers/salesforce/internal/crm/batch/write.go b/providers/salesforce/internal/crm/batch/write.go index f08e42c19a..3229a63069 100644 --- a/providers/salesforce/internal/crm/batch/write.go +++ b/providers/salesforce/internal/crm/batch/write.go @@ -11,9 +11,20 @@ import ( "github.com/amp-labs/connectors/internal/datautils" ) -// BatchWrite executes a Salesforce composite create or update request, -// depending on the parameters provided. It validates input, builds the proper -// payload, sends the API request, and parses the results into a BatchWriteResult structure. +// nolint:lll +// BatchWrite executes a Salesforce composite create or update request. +// It validates the input, builds the appropriate payload, sends the API call, +// and parses the response into a BatchWriteResult. +// +// The payload formats for Create and Update endpoints are nearly identical. +// The only notable difference—unused in this implementation—is the optional +// "allOrNone" flag supported by the Update API (default is false). +// See: https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/resources_composite_sobjects_collections_update.htm +// +// Response schemas differ slightly: Create responses wrap records in an +// enclosing object, while Update responses return a list at the root level. +// Each item also varies subtly in shape, represented by distinct Go structs, +// though their nested error formats are identical. func (a *Adapter) BatchWrite(ctx context.Context, params *common.BatchWriteParam) (*common.BatchWriteResult, error) { if err := params.ValidateParams(); err != nil { return nil, err @@ -29,74 +40,117 @@ func (a *Adapter) BatchWrite(ctx context.Context, params *common.BatchWriteParam return nil, err } - // Choose REST method. - write := a.Client.Post if params.IsUpdate() { - write = a.Client.Patch + return a.batchWriteUpdate(ctx, url, payload) } - // Make an API call. - rsp, err := write(ctx, url.String(), payload) + return a.batchWriteCreate(ctx, url, payload) +} + +func (a *Adapter) batchWriteCreate( + ctx context.Context, url *urlbuilder.URL, payload *Payload, +) (*common.BatchWriteResult, error) { + rsp, err := a.Client.Post(ctx, url.String(), payload) if err != nil { return nil, err } - // TODO response for the UPDATE endpoint has different schema. - // Parse and process response. - response, err := common.UnmarshalJSON[Response](rsp) + response, err := common.UnmarshalJSON[ResponseCreate](rsp) if err != nil { return nil, err } if response == nil { - status := common.BatchStatusSuccess - errors := make([]any, 0) - - if rsp.Code == http.StatusBadRequest { - // A 400 Bad Request is allowed by implementation, but we always expect a response body. - // Since there is no data, and non-2xx response we cannot determine per-record results, - // so the batch is treated as failed. - status = common.BatchStatusFailure - errors = append(errors, common.ErrEmptyJSONHTTPResponse) - } - - return &common.BatchWriteResult{ - Status: status, - Errors: errors, - Results: nil, - }, nil + return a.handleEmptyResponse(rsp) } // Map indexed by unique reference ids. Created once for the lookup. items := response.GetItemsMap() - return common.ParseBatchWrite( + return common.ParseBatchWrite[PayloadItem, CreateItem]( payload.Records, - func(index int, payloadItem PayloadItem) *Item { + func(index int, payloadItem PayloadItem) *CreateItem { return items[payloadItem.Extension.Attributes.ReferenceID] }, - constructWriteResult, + func(payloadItem PayloadItem, respItem *CreateItem) (*common.WriteResult, error) { + if respItem == nil { + return createUnprocessableItem(payloadItem), nil + } + + return respItem.ToWriteResult() + }, ) } -func constructWriteResult(payloadItem PayloadItem, respItem *Item) (*common.WriteResult, error) { - if respItem == nil { - // Salesforce didn't return matching response for the record. - // This only means that some other records have failed and no records were processed. - // However, this record was valid. - return &common.WriteResult{ - Success: false, // not processed - RecordId: "", - Errors: []any{ - common.ErrBatchUnprocessedRecord, - fmt.Sprintf("record's referenceId is %v", payloadItem.Extension.Attributes.ReferenceID), - }, - Data: nil, - }, nil +func (a *Adapter) batchWriteUpdate( + ctx context.Context, url *urlbuilder.URL, payload *Payload, +) (*common.BatchWriteResult, error) { + rsp, err := a.Client.Patch(ctx, url.String(), payload) + if err != nil { + return nil, err + } + + // Parse and process response. + response, err := common.UnmarshalJSON[ResponseUpdate](rsp) + if err != nil { + return nil, err + } + + if response == nil { + return a.handleEmptyResponse(rsp) + } + + // nolint:lll + return common.ParseBatchWrite( + payload.Records, + func(index int, payloadItem PayloadItem) *UpdateItem { + // In Salesforce composite update responses, each item corresponds + // positionally to the submitted payload item. Even when a record fails, + // its response entry is still present but may have an empty "id" field. + // + // The index is used to correlate payloads and responses. However, we still + // guard against out-of-range access to ensure robustness if the response + // length is shorter than expected. + // + // From the Salesforce docs: + // https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/resources_composite_sobjects_collections_update.htm + // “Objects are updated in the order they’re listed. + // The SaveResult objects are returned in the same order.” + list := *response + if index < 0 || index >= len(list) { + return nil + } + + return &list[index] + }, + func(payloadItem PayloadItem, respItem *UpdateItem) (*common.WriteResult, error) { + if respItem == nil { + return createUnprocessableItem(payloadItem), nil + } + + return respItem.ToWriteResult() + }, + ) +} + +func (a *Adapter) handleEmptyResponse(rsp *common.JSONHTTPResponse) (*common.BatchWriteResult, error) { + status := common.BatchStatusSuccess + errors := make([]any, 0) + + if rsp.Code == http.StatusBadRequest { + // A 400 Bad Request is allowed by implementation, but we always expect a response body. + // Since there is no data, and non-2xx response we cannot determine per-record results, + // so the batch is treated as failed. + status = common.BatchStatusFailure + errors = append(errors, common.ErrEmptyJSONHTTPResponse) } - return respItem.ToWriteResult() + return &common.BatchWriteResult{ + Status: status, + Errors: errors, + Results: nil, + }, nil } func (a *Adapter) buildBatchWriteURL(params *common.BatchWriteParam) (*urlbuilder.URL, error) { @@ -155,13 +209,23 @@ type RecordAttributes struct { ReferenceID string `json:"referenceId"` } -// Response is structure returned by API either for "200 OK" or "400 Bad Request". -type Response struct { - HasErrors bool `json:"hasErrors"` - Results []Item `json:"results"` +// ResponseCreate is structure returned by API either for "200 OK" or "400 Bad Request". +type ResponseCreate struct { + HasErrors bool `json:"hasErrors"` + Results []CreateItem `json:"results"` } -type Item struct { +// ResponseUpdate is structure retuned by update operation. +// This differs from the creation such that it is a list of objects at top JSON node level. +type ResponseUpdate []UpdateItem + +type UpdateItem struct { + Success bool `json:"success"` + ID string `json:"id,omitempty"` + Errors []ItemError `json:"errors"` +} + +type CreateItem struct { ReferenceId string `json:"referenceId"` ID string `json:"id"` Errors []ItemError `json:"errors"` @@ -173,8 +237,8 @@ type ItemError struct { Fields []any `json:"fields"` } -func (r Response) GetItemsMap() map[string]*Item { - mapping := make(map[string]*Item) +func (r ResponseCreate) GetItemsMap() map[string]*CreateItem { + mapping := make(map[string]*CreateItem) for _, item := range r.Results { mapping[item.ReferenceId] = &item @@ -183,7 +247,21 @@ func (r Response) GetItemsMap() map[string]*Item { return mapping } -func (i Item) ToWriteResult() (*common.WriteResult, error) { +func (i CreateItem) ToWriteResult() (*common.WriteResult, error) { + data, err := common.RecordDataToMap(i) + if err != nil { + return nil, err + } + + return &common.WriteResult{ + Success: len(i.Errors) == 0, + RecordId: i.ID, + Errors: datautils.ToAnySlice(i.Errors), + Data: data, + }, nil +} + +func (i UpdateItem) ToWriteResult() (*common.WriteResult, error) { data, err := common.RecordDataToMap(i) if err != nil { return nil, err @@ -196,3 +274,18 @@ func (i Item) ToWriteResult() (*common.WriteResult, error) { Data: data, }, nil } + +func createUnprocessableItem(payloadItem PayloadItem) *common.WriteResult { + // Salesforce didn't return matching response for the record. + // This only means that some other records have failed and no records were processed. + // However, this record was valid. + return &common.WriteResult{ + Success: false, // not processed + RecordId: "", + Errors: []any{ + common.ErrBatchUnprocessedRecord, + fmt.Sprintf("record's referenceId is %v", payloadItem.Extension.Attributes.ReferenceID), + }, + Data: nil, + } +} diff --git a/test/salesforce/batch/update/contacts/main.go b/test/salesforce/batch/update/contacts/main.go new file mode 100644 index 0000000000..1a9319c202 --- /dev/null +++ b/test/salesforce/batch/update/contacts/main.go @@ -0,0 +1,47 @@ +package main + +import ( + "context" + "fmt" + "os" + "os/signal" + "syscall" + + "github.com/amp-labs/connectors" + connTest "github.com/amp-labs/connectors/test/salesforce" + "github.com/amp-labs/connectors/test/utils" +) + +func main() { + // Handle Ctrl-C gracefully. + ctx, done := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer done() + + // Set up slog logging. + utils.SetupLogging() + + conn := connTest.GetSalesforceConnector(ctx) + + res, err := conn.BatchWrite(ctx, &connectors.BatchWriteParam{ + ObjectName: "Contact", + Type: connectors.BatchWriteTypeUpdate, + Records: []any{ + map[string]any{ + "id": "003ak00000jvIfpAAE", + "LastName": "Dyer (updated)", + "FirstName": "Siena (updated)", + }, + map[string]any{ + "id": "003ak00000jvIfqAAE", + "LastName": "Blevins (updated)", + "FirstName": "Markus (updated)", + }, + }, + }) + if err != nil { + utils.Fail("error reading", "error", err) + } + + fmt.Println("Reading..") + utils.DumpJSON(res, os.Stdout) +}