diff --git a/common/construct.go b/common/construct.go new file mode 100644 index 0000000000..633b4e3ae8 --- /dev/null +++ b/common/construct.go @@ -0,0 +1,215 @@ +package common + +import ( + "errors" + + "github.com/amp-labs/connectors/internal/goutils" +) + +var ErrNumWriteResultExceedsTotalRecords = errors.New( + "number of batch WriteResult entries exceeds total number of payload Records", +) + +// NewBatchWriteResult constructs a new BatchWriteResult summarizing the outcome +// of a batch write operation. It validates input consistency, computes +// success/failure counts, and derives the BatchStatus based on those counts. +// +// Rules and assumptions: +// +// - len(results) must not exceed totalNumRecords. +// If it does, the constructor returns a joined error containing +// ErrInvalidImplementation and ErrNumWriteResultExceedsTotalRecords. +// +// - successCounter < 0 triggers automatic counting of successes from the +// results slice. This is a convenience for callers that did not precompute +// successes. However, providing an explicit success count is preferred. +// +// - successCounter cannot exceed totalNumRecords. +// If it does, the counter is recomputed defensively. +// +// - failureCounter is computed as totalNumRecords - successCounter. +// +// - A BatchWriteResult always satisfies: +// SuccessCount + FailureCount == totalNumRecords +// and +// len(Results) ≤ totalNumRecords. +// +// - fatalErrors represent top-level (batch-level) errors that may coexist +// with item-level successes. For example, partial API failures or warnings +// that affected only some records. +// +// Constructors may return an error to signal invalid or inconsistent usage +// rather than to represent runtime provider failures. +func NewBatchWriteResult( + results []WriteResult, successCounter, totalNumRecords int, fatalErrors []any, +) (*BatchWriteResult, error) { + if len(results) > totalNumRecords { + return nil, errors.Join(ErrInvalidImplementation, ErrNumWriteResultExceedsTotalRecords) + } + + if successCounter < 0 || successCounter > totalNumRecords { + successCounter = countSuccesses(results) + } + + failureCounter := totalNumRecords - successCounter + + return &BatchWriteResult{ + Status: newBatchStatus(successCounter, failureCounter, totalNumRecords), + Errors: fatalErrors, + Results: results, + SuccessCount: successCounter, + FailureCount: failureCounter, + }, nil +} + +// NewBatchWriteResultFailed constructs a BatchWriteResult representing a fully +// failed batch operation. It assumes zero successful records and marks the +// BatchStatus as failure. The constructor still validates that the number of +// WriteResult entries does not exceed totalNumRecords. +// +// fatalErrors may include provider-level or transport-level issues explaining +// the batch failure. +func NewBatchWriteResultFailed( + results []WriteResult, totalNumRecords int, fatalErrors []any, +) (*BatchWriteResult, error) { + if len(results) > totalNumRecords { + return nil, errors.Join(ErrInvalidImplementation, ErrNumWriteResultExceedsTotalRecords) + } + + return &BatchWriteResult{ + Status: newBatchStatus(0, totalNumRecords, totalNumRecords), + Errors: fatalErrors, + Results: results, + SuccessCount: 0, + FailureCount: totalNumRecords, + }, nil +} + +func newBatchStatus(successCounter, failureCounter, total int) BatchStatus { + switch { + case successCounter == total: + return BatchStatusSuccess + case failureCounter == total: + // Every single record failed. + return BatchStatusFailure + default: + // Some failed, some succeeded. + return BatchStatusPartial + } +} + +// BatchWriteResponseMatcher matches a single payload item from the request +// to its corresponding provider response item. +// +// Implementations may match items by inspecting payload data or by using the index, +// depending on how the provider structures its response. For providers that return +// responses in the same order as the request, the index allows a straightforward positional lookup. +// +// The function must be deterministic and fast — given a payload item (and its index), +// it should return the corresponding response item or nil if none exists. +type BatchWriteResponseMatcher[P, R any] func(index int, payloadItem P) *R + +// BatchWriteResponseTransformer converts a payload item and its matched provider response +// into a standardized WriteResult. +// +// Responsibilities of the transformer: +// - Determine success vs failure for the item (WriteResult.Success). +// - Populate WriteResult.RecordId when the provider returns an identifier for the affected record. +// - Attach errors describing what went wrong at WriteResult.Errors +// - In case of successful create/update set the response item to WriteResult.Data. +// +// Type parameters: +// +// P — payload item type. +// R — provider response item type. +type BatchWriteResponseTransformer[P, R any] func(payloadItem P, respItem *R) (*WriteResult, error) + +// ErrBatchUnprocessedRecord is returned when a record was skipped or not +// processed due to failures elsewhere in the batch. +var ErrBatchUnprocessedRecord = errors.New("record was not processed due to other records failures") + +// ParseBatchWrite converts a provider's overall batch response into a consolidated BatchWriteResult. +// +// A Record is a domain-agnostic map/struct that models fields you want to persist. +// An Item is the wire representation (payload item) the provider expects for a single Record. +// Connectors must make an explicit mapping Record -> Item (payload item) when constructing the overall payload. +// When responses arrive, connectors must match each payload item to its response item and produce a WriteResult. +// This separation ensures consistent handling of ID mapping, reference IDs, partial successes, and errors. +// +// -------------------------- +// The function is intentionally generic: +// +// P is the payload item type (what was sent), +// R is the provider response item type (what was received). +// +// Arguments: +// payloadItems - list of items that are part of payload to create/update each record. +// responseMatcher - list of items that are part of payload to create/update each record. +func ParseBatchWrite[P, R any]( + payloadItems []P, + responseMatcher BatchWriteResponseMatcher[P, R], + responseToResult BatchWriteResponseTransformer[P, R], +) (*BatchWriteResult, error) { + var ( + totalNumRecords = len(payloadItems) + results = make([]WriteResult, 0, totalNumRecords) + fatalErrors []any + successCounter = 0 + ) + + for index, record := range payloadItems { + response, err := invokeResponseMatcher(responseMatcher, index, record) + if err != nil { + // Index out of bounds is downgraded from panic to error inside the invoker. + return nil, err + } + + result, err := responseToResult(record, response) + if err != nil { + fatalErrors = append(fatalErrors, err) + + // Record cannot be added into the list of results ([]WriteResult). + continue + } + + // The result added could be either successful or failed. + // Each has a mixed status. Therefore, we keep track of successes and failures separately. + results = append(results, *result) + + if result.Success { + successCounter += 1 + } + } + + return NewBatchWriteResult(results, successCounter, totalNumRecords, fatalErrors) +} + +func countSuccesses(results []WriteResult) int { + count := 0 + + for _, result := range results { + if result.Success { + count += 1 + } + } + + return count +} + +// invokeResponseMatcher safely executes the provided response matcher function. +// It guards against panics—such as index out-of-range or unexpected nil access— +// converting them into regular errors instead of crashing. +// +// This acts as a safety net for connector implementors who may have used +// the index incorrectly when matching payload and response items. +func invokeResponseMatcher[P, R any]( + responseMatcher BatchWriteResponseMatcher[P, R], + index int, record P, +) (responseItem *R, err error) { + defer goutils.PanicRecovery(func(cause error) { + err = cause + responseItem = nil + }) + + return responseMatcher(index, record), nil +} diff --git a/providers/salesforce/connector.go b/providers/salesforce/connector.go index c0745f999c..b698ac956b 100644 --- a/providers/salesforce/connector.go +++ b/providers/salesforce/connector.go @@ -6,6 +6,7 @@ import ( "github.com/amp-labs/connectors/common/paramsbuilder" "github.com/amp-labs/connectors/common/urlbuilder" "github.com/amp-labs/connectors/providers" + "github.com/amp-labs/connectors/providers/salesforce/internal/crm/batch" "github.com/amp-labs/connectors/providers/salesforce/internal/crm/custom" "github.com/amp-labs/connectors/providers/salesforce/internal/pardot" ) @@ -19,18 +20,31 @@ const ( uriToolingEventRelayConfig = restAPISuffix + "/tooling/sobjects/EventRelayConfig" ) -// Connector is a Salesforce connector. +// Connector provides integration with Salesforce provider. +// +// This implementation currently supports two functional modules: +// +// - CRM: the primary Salesforce data module responsible for standard objects. +// - Pardot: the Account Engagement module, implemented as a separate adapter. +// +// The CRM module is undergoing partial migration: some operations are implemented directly within Connector, +// while others are delegated to specialized sub-adapters (see below). +// These sub-adapters will be consolidated as the migration completes under "crm.Adapter". type Connector struct { Client *common.JSONHTTPClient providerInfo *providers.ProviderInfo moduleInfo *providers.ModuleInfo moduleID common.ModuleID - // Module Pardot -- lives in its own struct: + + // pardotAdapter handles the Salesforce Account Engagement (Pardot) module. + // It provides dedicated support for Pardot-specific endpoints and metadata. pardotAdapter *pardot.Adapter - // Module CRM -- is partially delegated to other structs - // some functionality is delegated into the following structs: - customAdapter *custom.Adapter + + // CRM module sub-adapters. + // These delegate specialized subsets of CRM functionality to keep Connector modular and prevent code bloat. + customAdapter *custom.Adapter // used for connectors.UpsertMetadataConnector capabilities. + batchAdapter *batch.Adapter // used for connectors.BatchWriteConnector capabilities. } // NewConnector returns a new Salesforce connector. @@ -65,11 +79,18 @@ func NewConnector(opts ...Option) (conn *Connector, outErr error) { XML: &interpreter.DirectFaultyResponder{Callback: conn.interpretXMLError}, }.Handle + // Delegate selected CRM functionality to internal adapters to + // prevent this package from growing too large. These adapters + // effectively "inline" specialized responsibilities while sharing + // the same HTTP and module context. + // + // Note: moduleInfo always refers to the Salesforce CRM module. + // These adapters are not applicable to the Pardot module. conn.customAdapter = custom.NewAdapter(httpClient, conn.moduleInfo) + conn.batchAdapter = batch.NewAdapter(httpClient, conn.moduleInfo) - // Empty module name, root module, standard salesforce module fallback to default Salesforce behaviour. - // Account Engagement module will initialize the pardot adapter. - // Read/Write/ListObjectMetadata will delegate to this adapter. + // Initialize the Pardot (Account Engagement) adapter if applicable. + // In that case, read/write/list metadata operations are delegated to it. moduleID := params.Module.Selection.ID if isPardotModule(moduleID) { conn.pardotAdapter, err = pardot.NewAdapter(conn.Client, conn.moduleInfo, params.Metadata.Map) diff --git a/providers/salesforce/internal/crm/batch/adapter.go b/providers/salesforce/internal/crm/batch/adapter.go new file mode 100644 index 0000000000..4f65352da0 --- /dev/null +++ b/providers/salesforce/internal/crm/batch/adapter.go @@ -0,0 +1,20 @@ +package batch + +import ( + "github.com/amp-labs/connectors/common" + "github.com/amp-labs/connectors/providers" +) + +type Adapter struct { + Client *common.JSONHTTPClient + moduleInfo *providers.ModuleInfo +} + +func NewAdapter(httpClient *common.HTTPClient, moduleInfo *providers.ModuleInfo) *Adapter { + return &Adapter{ + Client: &common.JSONHTTPClient{ + HTTPClient: httpClient, + }, + moduleInfo: moduleInfo, + } +} diff --git a/providers/salesforce/internal/crm/batch/write.go b/providers/salesforce/internal/crm/batch/write.go new file mode 100644 index 0000000000..b9af9efaa8 --- /dev/null +++ b/providers/salesforce/internal/crm/batch/write.go @@ -0,0 +1,13 @@ +package batch + +import ( + "context" + + "github.com/amp-labs/connectors/common" +) + +// TODO implement batch write + +func (a *Adapter) BatchWrite(ctx context.Context, params *common.BatchWriteParam) (*common.BatchWriteResult, error) { + return nil, common.ErrNotImplemented +} diff --git a/providers/salesforce/write.go b/providers/salesforce/write.go index e43a6a2445..5e515c88cc 100644 --- a/providers/salesforce/write.go +++ b/providers/salesforce/write.go @@ -8,6 +8,11 @@ import ( "github.com/spyzhov/ajson" ) +func (c *Connector) BatchWrite(ctx context.Context, params *common.BatchWriteParam) (*common.BatchWriteResult, error) { + // Delegated. + return c.batchAdapter.BatchWrite(ctx, params) +} + // Write will write data to Salesforce. func (c *Connector) Write(ctx context.Context, config common.WriteParams) (*common.WriteResult, error) { if err := config.ValidateParams(); err != nil {