Skip to content

Commit 7a227bd

Browse files
committed
[ENG-2914] feat(salesforce): Batch Create
1 parent 483ce73 commit 7a227bd

3 files changed

Lines changed: 291 additions & 7 deletions

File tree

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,74 @@
11
package batch
22

33
import (
4+
"net/http"
5+
46
"github.com/amp-labs/connectors/common"
7+
"github.com/amp-labs/connectors/common/urlbuilder"
8+
"github.com/amp-labs/connectors/internal/httpkit"
59
"github.com/amp-labs/connectors/providers"
610
)
711

12+
const (
13+
apiVersion = "60.0"
14+
versionPrefix = "v"
15+
version = versionPrefix + apiVersion
16+
restAPISuffix = "/services/data/" + version
17+
)
18+
19+
// Adapter handles batched record operations (create/update) against Salesforce's REST API.
20+
// It abstracts endpoint construction, versioning, and JSON response handling for the Batch feature.
821
type Adapter struct {
922
Client *common.JSONHTTPClient
1023
moduleInfo *providers.ModuleInfo
1124
}
1225

13-
func NewAdapter(httpClient *common.HTTPClient, moduleInfo *providers.ModuleInfo) *Adapter {
14-
return &Adapter{
15-
Client: &common.JSONHTTPClient{
16-
HTTPClient: httpClient,
26+
// NewAdapter creates a new batch Adapter configured to work with Salesforce's composite APIs.
27+
//
28+
// Salesforce CRM client is used as a prototype a copy of which treats 400 BadRequest as permittable.
29+
func NewAdapter(salesforceCRMClient *common.HTTPClient, moduleInfo *providers.ModuleInfo) *Adapter {
30+
shouldHandleError := func(response *http.Response) bool {
31+
// 2xx is allowed as well as 400 BadRequest.
32+
// All other responses need error handling.
33+
return !httpkit.Status2xx(response.StatusCode) && response.StatusCode != http.StatusBadRequest
34+
}
35+
36+
jsonHTTPClient := &common.JSONHTTPClient{
37+
HTTPClient: &common.HTTPClient{
38+
Client: salesforceCRMClient.Client, // same authentication as Salesforce CRM
39+
ErrorHandler: salesforceCRMClient.ErrorHandler, // same understanding of error format as Salesforce CRM
40+
ShouldHandleError: shouldHandleError, // differs from CRM
1741
},
42+
}
43+
44+
return &Adapter{
45+
Client: jsonHTTPClient,
1846
moduleInfo: moduleInfo,
1947
}
2048
}
49+
50+
func (a *Adapter) getModuleURL() string {
51+
return a.moduleInfo.BaseURL
52+
}
53+
54+
// getCreateURL builds the endpoint for creating multiple records of the same object type.
55+
//
56+
// Object name is required as a suffix of the URL.
57+
// Only one type of objects can be created at a time, this is by Salesforce API design.
58+
//
59+
// nolint:lll
60+
// https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/dome_composite_sobject_tree_flat.htm
61+
func (a *Adapter) getCreateURL(objectName common.ObjectName) (*urlbuilder.URL, error) {
62+
return urlbuilder.New(a.getModuleURL(), restAPISuffix, "/composite/tree", objectName.String())
63+
}
64+
65+
// getUpdateURL builds the endpoint for updating multiple records across one or more object types.
66+
//
67+
// Objects of multiple type can be created as part of one request, and it is not limited to one objectName
68+
// and therefore no such argument is needed unlike the getCreateURL.
69+
//
70+
// nolint:lll
71+
// https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/resources_composite_sobjects_collections_update.htm
72+
func (a *Adapter) getUpdateURL() (*urlbuilder.URL, error) {
73+
return urlbuilder.New(a.getModuleURL(), restAPISuffix, "/composite/sobjects")
74+
}

providers/salesforce/internal/crm/batch/write.go

Lines changed: 188 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,197 @@ package batch
22

33
import (
44
"context"
5+
"fmt"
6+
"net/http"
57

68
"github.com/amp-labs/connectors/common"
9+
"github.com/amp-labs/connectors/common/urlbuilder"
10+
"github.com/amp-labs/connectors/internal/codec"
11+
"github.com/amp-labs/connectors/internal/datautils"
712
)
813

9-
// TODO implement batch write
10-
14+
// BatchWrite executes a Salesforce composite create or update request,
15+
// depending on the parameters provided. It validates input, builds the proper
16+
// payload, sends the API request, and parses the results into a BatchWriteResult structure.
1117
func (a *Adapter) BatchWrite(ctx context.Context, params *common.BatchWriteParam) (*common.BatchWriteResult, error) {
12-
return nil, common.ErrNotImplemented
18+
if err := params.ValidateParams(); err != nil {
19+
return nil, err
20+
}
21+
22+
url, err := a.buildBatchWriteURL(params)
23+
if err != nil {
24+
return nil, err
25+
}
26+
27+
payload, err := buildBatchWritePayload(params)
28+
if err != nil {
29+
return nil, err
30+
}
31+
32+
// Choose REST method.
33+
write := a.Client.Post
34+
if params.IsUpdate() {
35+
write = a.Client.Patch
36+
}
37+
38+
// Make an API call.
39+
rsp, err := write(ctx, url.String(), payload)
40+
if err != nil {
41+
return nil, err
42+
}
43+
44+
// TODO response for the UPDATE endpoint has different schema.
45+
46+
// Parse and process response.
47+
response, err := common.UnmarshalJSON[Response](rsp)
48+
if err != nil {
49+
return nil, err
50+
}
51+
52+
if response == nil {
53+
status := common.BatchStatusSuccess
54+
errors := make([]any, 0)
55+
56+
if rsp.Code == http.StatusBadRequest {
57+
// A 400 Bad Request is allowed by implementation, but we always expect a response body.
58+
// Since there is no data, and non-2xx response we cannot determine per-record results,
59+
// so the batch is treated as failed.
60+
status = common.BatchStatusFailure
61+
errors = append(errors, common.ErrEmptyJSONHTTPResponse)
62+
}
63+
64+
return &common.BatchWriteResult{
65+
Status: status,
66+
Errors: errors,
67+
Results: nil,
68+
}, nil
69+
}
70+
71+
// Map indexed by unique reference ids. Created once for the lookup.
72+
items := response.GetItemsMap()
73+
74+
return common.ParseBatchWrite(
75+
payload.Records,
76+
func(index int, payloadItem PayloadItem) *Item {
77+
return items[payloadItem.Extension.Attributes.ReferenceID]
78+
},
79+
constructWriteResult,
80+
)
81+
}
82+
83+
func constructWriteResult(payloadItem PayloadItem, respItem *Item) (*common.WriteResult, error) {
84+
if respItem == nil {
85+
// Salesforce didn't return matching response for the record.
86+
// This only means that some other records have failed and no records were processed.
87+
// However, this record was valid.
88+
return &common.WriteResult{
89+
Success: false, // not processed
90+
RecordId: "",
91+
Errors: []any{
92+
common.ErrBatchUnprocessedRecord,
93+
fmt.Sprintf("record's referenceId is %v", payloadItem.Extension.Attributes.ReferenceID),
94+
},
95+
Data: nil,
96+
}, nil
97+
}
98+
99+
return respItem.ToWriteResult()
100+
}
101+
102+
func (a *Adapter) buildBatchWriteURL(params *common.BatchWriteParam) (*urlbuilder.URL, error) {
103+
if params.IsCreate() {
104+
return a.getCreateURL(params.ObjectName)
105+
}
106+
107+
if params.IsUpdate() {
108+
return a.getUpdateURL()
109+
}
110+
111+
return nil, common.ErrUnsupportedBatchWriteType
112+
}
113+
114+
func buildBatchWritePayload(params *common.BatchWriteParam) (*Payload, error) {
115+
records, err := params.GetRecords()
116+
if err != nil {
117+
return nil, err
118+
}
119+
120+
items := make([]PayloadItem, len(records))
121+
for index, record := range records {
122+
items[index] = PayloadItem{
123+
Record: record,
124+
Extension: RecordExtension{
125+
Attributes: RecordAttributes{
126+
Type: params.ObjectName.String(),
127+
ReferenceID: fmt.Sprintf("ref%d", index),
128+
},
129+
},
130+
}
131+
}
132+
133+
return &Payload{Records: items}, nil
134+
}
135+
136+
// Payload represents the composite API request body.
137+
// Each record is wrapped in a PayloadItem that carries additional metadata.
138+
// https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/dome_composite_sobject_tree_flat.htm
139+
type Payload struct {
140+
Records []PayloadItem `json:"records"`
141+
}
142+
143+
// PayloadItem represents a single item in the composite API payload.
144+
// It wraps a core Record with Salesforce-specific attributes required
145+
// for batch or composite write operations. Fields from RecordExtension
146+
// are merged alongside the record's own properties in the final payload.
147+
type PayloadItem = codec.DecoratedRecord[RecordExtension]
148+
149+
type RecordExtension struct {
150+
Attributes RecordAttributes `json:"attributes"`
151+
}
152+
153+
type RecordAttributes struct {
154+
Type string `json:"type"`
155+
ReferenceID string `json:"referenceId"`
156+
}
157+
158+
// Response is structure returned by API either for "200 OK" or "400 Bad Request".
159+
type Response struct {
160+
HasErrors bool `json:"hasErrors"`
161+
Results []Item `json:"results"`
162+
}
163+
164+
type Item struct {
165+
ReferenceId string `json:"referenceId"`
166+
ID string `json:"id"`
167+
Errors []ItemError `json:"errors"`
168+
}
169+
170+
type ItemError struct {
171+
StatusCode string `json:"statusCode"`
172+
Message string `json:"message"`
173+
Fields []any `json:"fields"`
174+
}
175+
176+
func (r Response) GetItemsMap() map[string]*Item {
177+
mapping := make(map[string]*Item)
178+
179+
for _, item := range r.Results {
180+
mapping[item.ReferenceId] = &item
181+
}
182+
183+
return mapping
184+
}
185+
186+
func (i Item) ToWriteResult() (*common.WriteResult, error) {
187+
data, err := common.RecordDataToMap(i)
188+
if err != nil {
189+
return nil, err
190+
}
191+
192+
return &common.WriteResult{
193+
Success: len(i.Errors) == 0,
194+
RecordId: i.ID,
195+
Errors: datautils.ToAnySlice(i.Errors),
196+
Data: data,
197+
}, nil
13198
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"os/signal"
8+
"syscall"
9+
10+
"github.com/amp-labs/connectors"
11+
connTest "github.com/amp-labs/connectors/test/salesforce"
12+
"github.com/amp-labs/connectors/test/utils"
13+
)
14+
15+
func main() {
16+
// Handle Ctrl-C gracefully.
17+
ctx, done := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
18+
defer done()
19+
20+
// Set up slog logging.
21+
utils.SetupLogging()
22+
23+
conn := connTest.GetSalesforceConnector(ctx)
24+
25+
res, err := conn.BatchWrite(ctx, &connectors.BatchWriteParam{
26+
ObjectName: "Contact",
27+
Type: connectors.BatchWriteTypeCreate,
28+
Records: []any{
29+
map[string]any{
30+
"LastName": "Dyer",
31+
"FirstName": "Siena",
32+
},
33+
map[string]any{
34+
"LastName": "Blevins",
35+
"FirstName": "Markus",
36+
},
37+
},
38+
})
39+
if err != nil {
40+
utils.Fail("error reading", "error", err)
41+
}
42+
43+
fmt.Println("Reading..")
44+
utils.DumpJSON(res, os.Stdout)
45+
}

0 commit comments

Comments
 (0)