Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 71 additions & 19 deletions internal/codec/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,38 +8,57 @@ import (
"github.com/amp-labs/connectors/internal/datautils"
)

// DecoratedRecord merges a dynamic record with a structured extension,
// producing a single flattened JSON object when marshaled.
// DecoratedRecord merges a dynamic record with a typed extension and marshals them as one flat JSON object.
//
// It embeds a [common.Record] containing user-defined key–value pairs
// and a typed struct [T] representing schema-bound fields that must
// coexist with those user-supplied values in the API payload.
// Important: zero values in Extension are still marshaled unless the field is tagged with `omitempty`.
// Those zero values can overwrite values already present in Record during the merge unless that is your intention.
//
// When marshaled, fields from both Record and Extension are serialized
// at the same level in the resulting JSON. This allows connectors to
// enrich arbitrary record data with well-defined metadata or attributes.
// Merge rules:
// - Keys present only in Record are preserved in the output.
// - Fields from Extension are added to the output.
// - Matching keys favor Extension values.
// - Nested structs in Extension are supported.
// - Zero-value fields in Extension still override unless omitted from JSON.
//
// Example:
//
// type MyPayloadForRecord = codec.DecoratedRecord[RecordExtension]
//
// type RecordExtension struct {
// ObjectName string `json:"objectName"`
// ObjectName string `json:"objectName"`
// }
//
// record := common.Record{"name": "Bob"}
// extension := RecordExtension{ObjectName: "users"}
// item := codec.DecoratedRecord[RecordExtension]{Record: record, Extension: extension}
// item := codec.NewDecoratedRecord(
// map[string]any{
// "name": "Bob",
// },
// RecordExtension{
// ObjectName: "users",
// })
//
// // Output:
// // {"name": "Bob", "objectName": "users"}
// // JSON:
// // {"name":"Bob","objectName":"users"}
type DecoratedRecord[T any] struct {
common.Record

Extension T
}

func (d *DecoratedRecord[T]) MarshalJSON() ([]byte, error) {
// NewDecoratedRecord creates a DecoratedRecord from a base record and a typed extension.
//
// The returned value marshals both parts as a single flattened JSON object.
// Fields from the extension may override values already present in the base
// record, including nested JSON object fields.
func NewDecoratedRecord[T any](base common.Record, decoration T) *DecoratedRecord[T] {
return &DecoratedRecord[T]{
Record: base,
Extension: decoration,
}
}

// MarshalJSON merges Record and Extension into a single JSON object.
//
// The record is copied first, then extension fields are added on top.
// If both contain the same key, the extension value replaces the record value.
func (d DecoratedRecord[T]) MarshalJSON() ([]byte, error) {
// Create a copy of records.
jsonProperties, err := datautils.FromMap(d.Record).DeepCopy()
if err != nil {
Expand All @@ -57,9 +76,42 @@ func (d *DecoratedRecord[T]) MarshalJSON() ([]byte, error) {
return nil, fmt.Errorf("unmarshal extension: %w", err)
}

// Enhance final JSON map with properties from extension.
datautils.FromMap(jsonProperties).AddMapValues(additionalProperties)
deepMerge(jsonProperties, additionalProperties)

// Marshall combined map.
return json.Marshal(jsonProperties)
}

// deepMerge merges source into destination in place.
//
// Behavior:
// - Keys that exist only in source are added to destination.
// - When both values are nested map[string]any values, they are merged recursively.
// - For all other value types, the source value overrides the destination value.
//
// This is a deep merge for JSON object trees.
func deepMerge(destination, source map[string]any) {
for key, srcValue := range source {
dstValue, exists := destination[key]
// Add missing keys.
if !exists {
destination[key] = srcValue

continue
}

dstMap, dstOK := dstValue.(map[string]any)
srcMap, srcOK := srcValue.(map[string]any)

// Nested maps are merged together instead of one map overriding the other.
if dstOK && srcOK {
deepMerge(dstMap, srcMap)
destination[key] = dstMap

continue
}

// Override value.
destination[key] = srcValue
}
}
122 changes: 122 additions & 0 deletions internal/codec/record_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package codec // nolint:dupl,varnamelen

import (
"encoding/json"
"testing"

"github.com/amp-labs/connectors/test/utils/testutils"
)

func TestDecoratedRecord(t *testing.T) {
t.Parallel()

type Address struct {
PostalCode string `json:"postalCode,omitempty"`
State string `json:"state,omitempty"`
Country string `json:"country,omitempty"`
}

type User struct {
ID string `json:"id"`
Name string `json:"name"`
Address *Address `json:"address,omitempty"`
}

tests := []struct {
name string
baseData map[string]any
decorationData any
expectedJSON map[string]any
}{
{
name: "Simple identity without base",
baseData: nil,
decorationData: User{ID: "id_55", Name: "Alice"},
expectedJSON: map[string]any{
"id": "id_55",
"name": "Alice",
},
},
{
name: "Fields from decoration are marshalled with the base",
baseData: map[string]any{"age": 18.0},
decorationData: User{ID: "id_55", Name: "Alice"},
expectedJSON: map[string]any{
"id": "id_55",
"name": "Alice",
"age": 18.0,
},
},
{
name: "Decoration overrides fields in the base",
baseData: map[string]any{"age": 18.0, "name": "Alice"},
decorationData: User{ID: "id_55", Name: "Bob"},
expectedJSON: map[string]any{
"id": "id_55",
"name": "Bob",
"age": 18.0,
},
},
{
name: "Decoration overrides fields in the base",
baseData: map[string]any{"age": 18.0, "name": "Alice"},
decorationData: User{ID: "id_55", Name: "Bob"},
expectedJSON: map[string]any{
"id": "id_55",
"name": "Bob",
"age": 18.0,
},
},
{
name: "Decoration with nested struct overrides fields in the appropriate level",
baseData: map[string]any{
"age": 18.0,
"name": "Alice",
"address": map[string]any{
"postalCode": "ABC88", // must be in final output
"state": "California",
},
},
decorationData: User{
ID: "id_55",
Name: "Bob", // replaces
Address: &Address{
State: "Colorado", // replaces
Country: "USA",
},
},
expectedJSON: map[string]any{
"id": "id_55",
"name": "Bob",
"age": 18.0,
"address": map[string]any{
"postalCode": "ABC88",
"state": "Colorado",
"country": "USA",
},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
obj := NewDecoratedRecord(tt.baseData, tt.decorationData)

result := testutils.NewCompareResult()
defer result.Validate(t, tt.name)

data, err := json.Marshal(obj)
if !result.AssertErr("json.Marshal", nil, err) {
return
}

registry := map[string]any{}
err = json.Unmarshal(data, &registry)
if !result.AssertErr("json.Unmarshal", nil, err) {
return
}

result.Assert("Marshalled JSON", tt.expectedJSON, registry)
})
}
}
6 changes: 5 additions & 1 deletion providers/salesforce/internal/crm/batch/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,11 @@ type Payload struct {
// It wraps a core Record with Salesforce-specific attributes required
// for batch or composite write operations. Fields from RecordExtension
// are merged alongside the record's own properties in the final payload.
type PayloadItem = codec.DecoratedRecord[RecordExtension]
type PayloadItem codec.DecoratedRecord[RecordExtension]

func (i PayloadItem) MarshalJSON() ([]byte, error) {
return codec.DecoratedRecord[RecordExtension](i).MarshalJSON()
}

type RecordExtension struct {
Attributes RecordAttributes `json:"attributes"`
Expand Down
Loading