Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions internal/db/postgres/pgcopy/row.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,22 @@ package pgcopy

import (
"errors"
"fmt"
"slices"

"github.com/greenmaskio/greenmask/pkg/toolkit"
)

var ErrIndexOutOfRage = errors.New("wrong column idx: index out of range")

// ErrTupleSizeMismatch is returned by (*Row).Decode when the COPY stream
// contains more columns than the row was constructed for. This happens when
// a transformation's `query:` selects extra columns beyond the table's
// declared schema; the decoder cannot guess which slot they belong to, so
// it reports the discrepancy back to the caller instead of indexing past
// the end of columnPos and panicking (#432).
var ErrTupleSizeMismatch = errors.New("decoded column count exceeds row tuple size")

type columnPos struct {
start int
end int
Expand Down Expand Up @@ -98,6 +107,13 @@ func (r *Row) Decode(raw []byte) error {
}
if r.isDynamic && idx >= r.tupleSize {
r.appendNewEmptyBuffer()
} else if idx >= len(r.columnPos) {
// Fixed-size row but the COPY stream produced more columns
// than expected. Surfacing this as a typed error lets the
// caller report 'transformation produced N columns, table
// has M' instead of crashing the dump goroutine (#432).
return fmt.Errorf("%w: got more than %d columns at byte offset %d",
ErrTupleSizeMismatch, r.tupleSize, colStartPos)
}

p := r.columnPos[idx]
Expand Down
14 changes: 14 additions & 0 deletions internal/db/postgres/pgcopy/row_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,20 @@ func TestDecode_dynamicSize(t *testing.T) {
}
}

// TestDecode_extraColumnsTupleSizeMismatch is a regression for #432.
// When a transformation's `query:` adds extra columns beyond the table's
// declared schema, the COPY stream produces more columns than the row was
// constructed for. The decoder used to walk past the end of columnPos and
// crash with `panic: runtime error: index out of range`. It now returns a
// typed error so the caller can surface it cleanly.
func TestDecode_extraColumnsTupleSizeMismatch(t *testing.T) {
// Row sized for 2 columns, but the wire format carries 5.
row := NewRow(2)
err := row.Decode([]byte("a\tb\tc\td\te"))
require.Error(t, err)
assert.ErrorIs(t, err, ErrTupleSizeMismatch)
}

func TestRow_GetColumn(t *testing.T) {

tests := []struct {
Expand Down
Loading