Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 146 additions & 53 deletions providers/salesforce/internal/crm/batch/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,20 @@ import (
"github.com/amp-labs/connectors/internal/datautils"
)

// BatchWrite executes a Salesforce composite create or update request,
// depending on the parameters provided. It validates input, builds the proper
// payload, sends the API request, and parses the results into a BatchWriteResult structure.
// nolint:lll
// BatchWrite executes a Salesforce composite create or update request.
// It validates the input, builds the appropriate payload, sends the API call,
// and parses the response into a BatchWriteResult.
//
// The payload formats for Create and Update endpoints are nearly identical.
// The only notable difference—unused in this implementation—is the optional
// "allOrNone" flag supported by the Update API (default is false).
// See: https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/resources_composite_sobjects_collections_update.htm
//
// Response schemas differ slightly: Create responses wrap records in an
// enclosing object, while Update responses return a list at the root level.
// Each item also varies subtly in shape, represented by distinct Go structs,
// though their nested error formats are identical.
func (a *Adapter) BatchWrite(ctx context.Context, params *common.BatchWriteParam) (*common.BatchWriteResult, error) {
if err := params.ValidateParams(); err != nil {
return nil, err
Expand All @@ -29,74 +40,117 @@ func (a *Adapter) BatchWrite(ctx context.Context, params *common.BatchWriteParam
return nil, err
}

// Choose REST method.
write := a.Client.Post
if params.IsUpdate() {
write = a.Client.Patch
return a.batchWriteUpdate(ctx, url, payload)
}

// Make an API call.
rsp, err := write(ctx, url.String(), payload)
return a.batchWriteCreate(ctx, url, payload)
}

func (a *Adapter) batchWriteCreate(
ctx context.Context, url *urlbuilder.URL, payload *Payload,
) (*common.BatchWriteResult, error) {
rsp, err := a.Client.Post(ctx, url.String(), payload)
if err != nil {
return nil, err
}

// TODO response for the UPDATE endpoint has different schema.

// Parse and process response.
response, err := common.UnmarshalJSON[Response](rsp)
response, err := common.UnmarshalJSON[ResponseCreate](rsp)
if err != nil {
return nil, err
}

if response == nil {
status := common.BatchStatusSuccess
errors := make([]any, 0)

if rsp.Code == http.StatusBadRequest {
// A 400 Bad Request is allowed by implementation, but we always expect a response body.
// Since there is no data, and non-2xx response we cannot determine per-record results,
// so the batch is treated as failed.
status = common.BatchStatusFailure
errors = append(errors, common.ErrEmptyJSONHTTPResponse)
}

return &common.BatchWriteResult{
Status: status,
Errors: errors,
Results: nil,
}, nil
return a.handleEmptyResponse(rsp)
}

// Map indexed by unique reference ids. Created once for the lookup.
items := response.GetItemsMap()

return common.ParseBatchWrite(
return common.ParseBatchWrite[PayloadItem, CreateItem](
payload.Records,
func(index int, payloadItem PayloadItem) *Item {
func(index int, payloadItem PayloadItem) *CreateItem {
return items[payloadItem.Extension.Attributes.ReferenceID]
},
constructWriteResult,
func(payloadItem PayloadItem, respItem *CreateItem) (*common.WriteResult, error) {
if respItem == nil {
return createUnprocessableItem(payloadItem), nil
}

return respItem.ToWriteResult()
},
)
}

func constructWriteResult(payloadItem PayloadItem, respItem *Item) (*common.WriteResult, error) {
if respItem == nil {
// Salesforce didn't return matching response for the record.
// This only means that some other records have failed and no records were processed.
// However, this record was valid.
return &common.WriteResult{
Success: false, // not processed
RecordId: "",
Errors: []any{
common.ErrBatchUnprocessedRecord,
fmt.Sprintf("record's referenceId is %v", payloadItem.Extension.Attributes.ReferenceID),
},
Data: nil,
}, nil
func (a *Adapter) batchWriteUpdate(
ctx context.Context, url *urlbuilder.URL, payload *Payload,
) (*common.BatchWriteResult, error) {
rsp, err := a.Client.Patch(ctx, url.String(), payload)
if err != nil {
return nil, err
}

// Parse and process response.
response, err := common.UnmarshalJSON[ResponseUpdate](rsp)
if err != nil {
return nil, err
}

if response == nil {
return a.handleEmptyResponse(rsp)
}

// nolint:lll
return common.ParseBatchWrite(
payload.Records,
func(index int, payloadItem PayloadItem) *UpdateItem {
// In Salesforce composite update responses, each item corresponds
// positionally to the submitted payload item. Even when a record fails,
// its response entry is still present but may have an empty "id" field.
//
// The index is used to correlate payloads and responses. However, we still
// guard against out-of-range access to ensure robustness if the response
// length is shorter than expected.
//
// From the Salesforce docs:
// https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/resources_composite_sobjects_collections_update.htm
// “Objects are updated in the order they’re listed.
// The SaveResult objects are returned in the same order.”
list := *response
if index < 0 || index >= len(list) {
return nil
}

return &list[index]
},
func(payloadItem PayloadItem, respItem *UpdateItem) (*common.WriteResult, error) {
if respItem == nil {
return createUnprocessableItem(payloadItem), nil
}

return respItem.ToWriteResult()
},
)
}

func (a *Adapter) handleEmptyResponse(rsp *common.JSONHTTPResponse) (*common.BatchWriteResult, error) {
status := common.BatchStatusSuccess
errors := make([]any, 0)

if rsp.Code == http.StatusBadRequest {
// A 400 Bad Request is allowed by implementation, but we always expect a response body.
// Since there is no data, and non-2xx response we cannot determine per-record results,
// so the batch is treated as failed.
status = common.BatchStatusFailure
errors = append(errors, common.ErrEmptyJSONHTTPResponse)
}

return respItem.ToWriteResult()
return &common.BatchWriteResult{
Status: status,
Errors: errors,
Results: nil,
}, nil
}

func (a *Adapter) buildBatchWriteURL(params *common.BatchWriteParam) (*urlbuilder.URL, error) {
Expand Down Expand Up @@ -155,13 +209,23 @@ type RecordAttributes struct {
ReferenceID string `json:"referenceId"`
}

// Response is structure returned by API either for "200 OK" or "400 Bad Request".
type Response struct {
HasErrors bool `json:"hasErrors"`
Results []Item `json:"results"`
// ResponseCreate is structure returned by API either for "200 OK" or "400 Bad Request".
type ResponseCreate struct {
HasErrors bool `json:"hasErrors"`
Results []CreateItem `json:"results"`
}

type Item struct {
// ResponseUpdate is structure retuned by update operation.
// This differs from the creation such that it is a list of objects at top JSON node level.
type ResponseUpdate []UpdateItem

type UpdateItem struct {
Success bool `json:"success"`
ID string `json:"id,omitempty"`
Errors []ItemError `json:"errors"`
}

type CreateItem struct {
ReferenceId string `json:"referenceId"`
ID string `json:"id"`
Errors []ItemError `json:"errors"`
Expand All @@ -173,8 +237,8 @@ type ItemError struct {
Fields []any `json:"fields"`
}

func (r Response) GetItemsMap() map[string]*Item {
mapping := make(map[string]*Item)
func (r ResponseCreate) GetItemsMap() map[string]*CreateItem {
mapping := make(map[string]*CreateItem)

for _, item := range r.Results {
mapping[item.ReferenceId] = &item
Expand All @@ -183,7 +247,21 @@ func (r Response) GetItemsMap() map[string]*Item {
return mapping
}

func (i Item) ToWriteResult() (*common.WriteResult, error) {
func (i CreateItem) ToWriteResult() (*common.WriteResult, error) {
data, err := common.RecordDataToMap(i)
if err != nil {
return nil, err
}

return &common.WriteResult{
Success: len(i.Errors) == 0,
RecordId: i.ID,
Errors: datautils.ToAnySlice(i.Errors),
Data: data,
}, nil
}

func (i UpdateItem) ToWriteResult() (*common.WriteResult, error) {
data, err := common.RecordDataToMap(i)
if err != nil {
return nil, err
Expand All @@ -196,3 +274,18 @@ func (i Item) ToWriteResult() (*common.WriteResult, error) {
Data: data,
}, nil
}

func createUnprocessableItem(payloadItem PayloadItem) *common.WriteResult {
// Salesforce didn't return matching response for the record.
// This only means that some other records have failed and no records were processed.
// However, this record was valid.
return &common.WriteResult{
Success: false, // not processed
RecordId: "",
Errors: []any{
common.ErrBatchUnprocessedRecord,
fmt.Sprintf("record's referenceId is %v", payloadItem.Extension.Attributes.ReferenceID),
},
Data: nil,
}
}
47 changes: 47 additions & 0 deletions test/salesforce/batch/update/contacts/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package main

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"

"github.com/amp-labs/connectors"
connTest "github.com/amp-labs/connectors/test/salesforce"
"github.com/amp-labs/connectors/test/utils"
)

func main() {
// Handle Ctrl-C gracefully.
ctx, done := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer done()

// Set up slog logging.
utils.SetupLogging()

conn := connTest.GetSalesforceConnector(ctx)

res, err := conn.BatchWrite(ctx, &connectors.BatchWriteParam{
ObjectName: "Contact",
Type: connectors.BatchWriteTypeUpdate,
Records: []any{
map[string]any{
"id": "003ak00000jvIfpAAE",
"LastName": "Dyer (updated)",
"FirstName": "Siena (updated)",
},
map[string]any{
"id": "003ak00000jvIfqAAE",
"LastName": "Blevins (updated)",
"FirstName": "Markus (updated)",
},
},
})
if err != nil {
utils.Fail("error reading", "error", err)
}

fmt.Println("Reading..")
utils.DumpJSON(res, os.Stdout)
}