Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 215 additions & 0 deletions common/construct.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
package common

import (
"errors"

"github.com/amp-labs/connectors/internal/goutils"
)

var ErrNumWriteResultExceedsTotalRecords = errors.New(
"number of batch WriteResult entries exceeds total number of payload Records",
)

// NewBatchWriteResult constructs a new BatchWriteResult summarizing the outcome
// of a batch write operation. It validates input consistency, computes
// success/failure counts, and derives the BatchStatus based on those counts.
//
// Rules and assumptions:
//
// - len(results) must not exceed totalNumRecords.
// If it does, the constructor returns a joined error containing
// ErrInvalidImplementation and ErrNumWriteResultExceedsTotalRecords.
//
// - successCounter < 0 triggers automatic counting of successes from the
// results slice. This is a convenience for callers that did not precompute
// successes. However, providing an explicit success count is preferred.
//
// - successCounter cannot exceed totalNumRecords.
// If it does, the counter is recomputed defensively.
//
// - failureCounter is computed as totalNumRecords - successCounter.
//
// - A BatchWriteResult always satisfies:
// SuccessCount + FailureCount == totalNumRecords
// and
// len(Results) ≤ totalNumRecords.
//
// - fatalErrors represent top-level (batch-level) errors that may coexist
// with item-level successes. For example, partial API failures or warnings
// that affected only some records.
//
// Constructors may return an error to signal invalid or inconsistent usage
// rather than to represent runtime provider failures.
func NewBatchWriteResult(
results []WriteResult, successCounter, totalNumRecords int, fatalErrors []any,
) (*BatchWriteResult, error) {
if len(results) > totalNumRecords {
return nil, errors.Join(ErrInvalidImplementation, ErrNumWriteResultExceedsTotalRecords)
}

if successCounter < 0 || successCounter > totalNumRecords {
successCounter = countSuccesses(results)
}

failureCounter := totalNumRecords - successCounter

return &BatchWriteResult{
Status: newBatchStatus(successCounter, failureCounter, totalNumRecords),
Errors: fatalErrors,
Results: results,
SuccessCount: successCounter,
FailureCount: failureCounter,
}, nil
}

// NewBatchWriteResultFailed constructs a BatchWriteResult representing a fully
// failed batch operation. It assumes zero successful records and marks the
// BatchStatus as failure. The constructor still validates that the number of
// WriteResult entries does not exceed totalNumRecords.
//
// fatalErrors may include provider-level or transport-level issues explaining
// the batch failure.
func NewBatchWriteResultFailed(
results []WriteResult, totalNumRecords int, fatalErrors []any,
) (*BatchWriteResult, error) {
if len(results) > totalNumRecords {
return nil, errors.Join(ErrInvalidImplementation, ErrNumWriteResultExceedsTotalRecords)
}

return &BatchWriteResult{
Status: newBatchStatus(0, totalNumRecords, totalNumRecords),
Errors: fatalErrors,
Results: results,
SuccessCount: 0,
FailureCount: totalNumRecords,
}, nil
}

func newBatchStatus(successCounter, failureCounter, total int) BatchStatus {
switch {
case successCounter == total:
return BatchStatusSuccess
case failureCounter == total:
// Every single record failed.
return BatchStatusFailure
default:
// Some failed, some succeeded.
return BatchStatusPartial
}
}

// BatchWriteResponseMatcher matches a single payload item from the request
// to its corresponding provider response item.
//
// Implementations may match items by inspecting payload data or by using the index,
// depending on how the provider structures its response. For providers that return
// responses in the same order as the request, the index allows a straightforward positional lookup.
//
// The function must be deterministic and fast — given a payload item (and its index),
// it should return the corresponding response item or nil if none exists.
type BatchWriteResponseMatcher[P, R any] func(index int, payloadItem P) *R

// BatchWriteResponseTransformer converts a payload item and its matched provider response
// into a standardized WriteResult.
//
// Responsibilities of the transformer:
// - Determine success vs failure for the item (WriteResult.Success).
// - Populate WriteResult.RecordId when the provider returns an identifier for the affected record.
// - Attach errors describing what went wrong at WriteResult.Errors
// - In case of successful create/update set the response item to WriteResult.Data.
//
// Type parameters:
//
// P — payload item type.
// R — provider response item type.
type BatchWriteResponseTransformer[P, R any] func(payloadItem P, respItem *R) (*WriteResult, error)

// ErrBatchUnprocessedRecord is returned when a record was skipped or not
// processed due to failures elsewhere in the batch.
var ErrBatchUnprocessedRecord = errors.New("record was not processed due to other records failures")

// ParseBatchWrite converts a provider's overall batch response into a consolidated BatchWriteResult.
//
// A Record is a domain-agnostic map/struct that models fields you want to persist.
// An Item is the wire representation (payload item) the provider expects for a single Record.
// Connectors must make an explicit mapping Record -> Item (payload item) when constructing the overall payload.
// When responses arrive, connectors must match each payload item to its response item and produce a WriteResult.
// This separation ensures consistent handling of ID mapping, reference IDs, partial successes, and errors.
//
// --------------------------
// The function is intentionally generic:
//
// P is the payload item type (what was sent),
// R is the provider response item type (what was received).
//
// Arguments:
// payloadItems - list of items that are part of payload to create/update each record.
// responseMatcher - list of items that are part of payload to create/update each record.
func ParseBatchWrite[P, R any](
payloadItems []P,
responseMatcher BatchWriteResponseMatcher[P, R],
responseToResult BatchWriteResponseTransformer[P, R],
) (*BatchWriteResult, error) {
var (
totalNumRecords = len(payloadItems)
results = make([]WriteResult, 0, totalNumRecords)
fatalErrors []any
successCounter = 0
)

for index, record := range payloadItems {
response, err := invokeResponseMatcher(responseMatcher, index, record)
if err != nil {
// Index out of bounds is downgraded from panic to error inside the invoker.
return nil, err
}

result, err := responseToResult(record, response)
if err != nil {
fatalErrors = append(fatalErrors, err)

// Record cannot be added into the list of results ([]WriteResult).
continue
}

// The result added could be either successful or failed.
// Each has a mixed status. Therefore, we keep track of successes and failures separately.
results = append(results, *result)

if result.Success {
successCounter += 1
}
}

return NewBatchWriteResult(results, successCounter, totalNumRecords, fatalErrors)
}

func countSuccesses(results []WriteResult) int {
count := 0

for _, result := range results {
if result.Success {
count += 1
}
}

return count
}

// invokeResponseMatcher safely executes the provided response matcher function.
// It guards against panics—such as index out-of-range or unexpected nil access—
// converting them into regular errors instead of crashing.
//
// This acts as a safety net for connector implementors who may have used
// the index incorrectly when matching payload and response items.
func invokeResponseMatcher[P, R any](
responseMatcher BatchWriteResponseMatcher[P, R],
index int, record P,
) (responseItem *R, err error) {
defer goutils.PanicRecovery(func(cause error) {
err = cause
responseItem = nil
})

return responseMatcher(index, record), nil
}
37 changes: 29 additions & 8 deletions providers/salesforce/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/amp-labs/connectors/common/paramsbuilder"
"github.com/amp-labs/connectors/common/urlbuilder"
"github.com/amp-labs/connectors/providers"
"github.com/amp-labs/connectors/providers/salesforce/internal/crm/batch"
"github.com/amp-labs/connectors/providers/salesforce/internal/crm/custom"
"github.com/amp-labs/connectors/providers/salesforce/internal/pardot"
)
Expand All @@ -19,18 +20,31 @@ const (
uriToolingEventRelayConfig = restAPISuffix + "/tooling/sobjects/EventRelayConfig"
)

// Connector is a Salesforce connector.
// Connector provides integration with Salesforce provider.
//
// This implementation currently supports two functional modules:
//
// - CRM: the primary Salesforce data module responsible for standard objects.
// - Pardot: the Account Engagement module, implemented as a separate adapter.
//
// The CRM module is undergoing partial migration: some operations are implemented directly within Connector,
// while others are delegated to specialized sub-adapters (see below).
// These sub-adapters will be consolidated as the migration completes under "crm.Adapter".
type Connector struct {
Client *common.JSONHTTPClient

providerInfo *providers.ProviderInfo
moduleInfo *providers.ModuleInfo
moduleID common.ModuleID
// Module Pardot -- lives in its own struct:

// pardotAdapter handles the Salesforce Account Engagement (Pardot) module.
// It provides dedicated support for Pardot-specific endpoints and metadata.
pardotAdapter *pardot.Adapter
// Module CRM -- is partially delegated to other structs
// some functionality is delegated into the following structs:
customAdapter *custom.Adapter

// CRM module sub-adapters.
// These delegate specialized subsets of CRM functionality to keep Connector modular and prevent code bloat.
customAdapter *custom.Adapter // used for connectors.UpsertMetadataConnector capabilities.
batchAdapter *batch.Adapter // used for connectors.BatchWriteConnector capabilities.
}

// NewConnector returns a new Salesforce connector.
Expand Down Expand Up @@ -65,11 +79,18 @@ func NewConnector(opts ...Option) (conn *Connector, outErr error) {
XML: &interpreter.DirectFaultyResponder{Callback: conn.interpretXMLError},
}.Handle

// Delegate selected CRM functionality to internal adapters to
// prevent this package from growing too large. These adapters
// effectively "inline" specialized responsibilities while sharing
// the same HTTP and module context.
//
// Note: moduleInfo always refers to the Salesforce CRM module.
// These adapters are not applicable to the Pardot module.
conn.customAdapter = custom.NewAdapter(httpClient, conn.moduleInfo)
conn.batchAdapter = batch.NewAdapter(httpClient, conn.moduleInfo)

// Empty module name, root module, standard salesforce module fallback to default Salesforce behaviour.
// Account Engagement module will initialize the pardot adapter.
// Read/Write/ListObjectMetadata will delegate to this adapter.
// Initialize the Pardot (Account Engagement) adapter if applicable.
// In that case, read/write/list metadata operations are delegated to it.
moduleID := params.Module.Selection.ID
if isPardotModule(moduleID) {
conn.pardotAdapter, err = pardot.NewAdapter(conn.Client, conn.moduleInfo, params.Metadata.Map)
Expand Down
20 changes: 20 additions & 0 deletions providers/salesforce/internal/crm/batch/adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package batch

import (
"github.com/amp-labs/connectors/common"
"github.com/amp-labs/connectors/providers"
)

type Adapter struct {
Client *common.JSONHTTPClient
moduleInfo *providers.ModuleInfo
}

func NewAdapter(httpClient *common.HTTPClient, moduleInfo *providers.ModuleInfo) *Adapter {
return &Adapter{
Client: &common.JSONHTTPClient{
HTTPClient: httpClient,
},
moduleInfo: moduleInfo,
}
}
13 changes: 13 additions & 0 deletions providers/salesforce/internal/crm/batch/write.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package batch

import (
"context"

"github.com/amp-labs/connectors/common"
)

// TODO implement batch write

func (a *Adapter) BatchWrite(ctx context.Context, params *common.BatchWriteParam) (*common.BatchWriteResult, error) {
return nil, common.ErrNotImplemented
}
5 changes: 5 additions & 0 deletions providers/salesforce/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ import (
"github.com/spyzhov/ajson"
)

func (c *Connector) BatchWrite(ctx context.Context, params *common.BatchWriteParam) (*common.BatchWriteResult, error) {
// Delegated.
return c.batchAdapter.BatchWrite(ctx, params)
}

// Write will write data to Salesforce.
func (c *Connector) Write(ctx context.Context, config common.WriteParams) (*common.WriteResult, error) {
if err := config.ValidateParams(); err != nil {
Expand Down