diff --git a/examples/simple_plugin/go.mod b/examples/simple_plugin/go.mod index 9db66d2bcd..f19cee3e32 100644 --- a/examples/simple_plugin/go.mod +++ b/examples/simple_plugin/go.mod @@ -3,7 +3,7 @@ module github.com/cloudquery/plugin-sdk/examples/simple_plugin go 1.24.4 require ( - github.com/apache/arrow-go/v18 v18.4.0 + github.com/apache/arrow-go/v18 v18.4.1 github.com/cloudquery/plugin-sdk/v4 v4.89.1 github.com/rs/zerolog v1.34.0 ) @@ -48,7 +48,7 @@ require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/invopop/jsonschema v0.13.0 // indirect github.com/klauspost/compress v1.18.0 // indirect - github.com/klauspost/cpuid/v2 v2.2.11 // indirect + github.com/klauspost/cpuid/v2 v2.3.0 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect @@ -59,7 +59,7 @@ require ( github.com/santhosh-tekuri/jsonschema/v6 v6.0.2 // indirect github.com/spf13/cobra v1.9.1 // indirect github.com/spf13/pflag v1.0.6 // indirect - github.com/stretchr/testify v1.11.0 // indirect + github.com/stretchr/testify v1.11.1 // indirect github.com/thoas/go-funk v0.9.3 // indirect github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect diff --git a/examples/simple_plugin/go.sum b/examples/simple_plugin/go.sum index 5adf1c0022..a587b8b74f 100644 --- a/examples/simple_plugin/go.sum +++ b/examples/simple_plugin/go.sum @@ -3,8 +3,8 @@ github.com/adrg/xdg v0.5.3 h1:xRnxJXne7+oWDatRhR1JLnvuccuIeCoBu2rtuLqQB78= github.com/adrg/xdg v0.5.3/go.mod h1:nlTsY+NNiCBGCK2tpm09vRqfVzrc2fLmXGpBLF0zlTQ= github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ= github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY= -github.com/apache/arrow-go/v18 v18.4.0 h1:/RvkGqH517iY8bZKc4FD5/kkdwXJGjxf28JIXbJ/oB0= -github.com/apache/arrow-go/v18 v18.4.0/go.mod h1:Aawvwhj8x2jURIzD9Moy72cF0FyJXOpkYpdmGRHcw14= +github.com/apache/arrow-go/v18 v18.4.1 h1:q/jVkBWCJOB9reDgaIZIdruLQUb1kbkvOnOFezVH1C4= +github.com/apache/arrow-go/v18 v18.4.1/go.mod h1:tLyFubsAl17bvFdUAy24bsSvA/6ww95Iqi67fTpGu3E= github.com/apache/arrow/go/v13 v13.0.0-20230731205701-112f94971882 h1:mFDZW1FQk9yndPvxScp7RpcOpdSHaqcgBWO7sDlx4S8= github.com/apache/arrow/go/v13 v13.0.0-20230731205701-112f94971882/go.mod h1:W69eByFNO0ZR30q1/7Sr9d83zcVZmF2MiP3fFYAWJOc= github.com/apache/thrift v0.22.0 h1:r7mTJdj51TMDe6RtcmNdQxgn9XcyfGDOzegMDRg47uc= @@ -109,8 +109,8 @@ github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= -github.com/klauspost/cpuid/v2 v2.2.11 h1:0OwqZRYI2rFrjS4kvkDnqJkKHdHaRnCm68/DY4OxRzU= -github.com/klauspost/cpuid/v2 v2.2.11/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= +github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y= +github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -155,8 +155,8 @@ github.com/stoewer/go-strcase v1.3.1/go.mod h1:fAH5hQ5pehh+j3nZfvwdk2RgEgQjAoM8w github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.11.0 h1:ib4sjIrwZKxE5u/Japgo/7SJV3PvgjGiRNAvTVGqQl8= -github.com/stretchr/testify v1.11.0/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/thoas/go-funk v0.9.3 h1:7+nAEx3kn5ZJcnDm2Bh23N2yOtweO14bi//dvRtgLpw= github.com/thoas/go-funk v0.9.3/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q= github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc= diff --git a/examples/simple_plugin/plugin/client.go b/examples/simple_plugin/plugin/client.go index 580738df7e..2806ef1f85 100644 --- a/examples/simple_plugin/plugin/client.go +++ b/examples/simple_plugin/plugin/client.go @@ -59,12 +59,12 @@ func (*Client) Write(context.Context, <-chan message.WriteMessage) error { return nil } -func (*Client) Read(context.Context, *schema.Table, chan<- arrow.Record) error { +func (*Client) Read(context.Context, *schema.Table, chan<- arrow.RecordBatch) error { // Not implemented, just used for testing destination packaging return nil } -func (*Client) Transform(_ context.Context, _ <-chan arrow.Record, _ chan<- arrow.Record) error { +func (*Client) Transform(_ context.Context, _ <-chan arrow.RecordBatch, _ chan<- arrow.RecordBatch) error { // Not implemented, just used for testing destination packaging return nil } diff --git a/go.mod b/go.mod index 56ab3a7f24..0321b02182 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/cloudquery/plugin-sdk/v4 go 1.24.4 require ( - github.com/apache/arrow-go/v18 v18.4.0 + github.com/apache/arrow-go/v18 v18.4.1 github.com/aws/aws-sdk-go-v2 v1.38.3 github.com/aws/aws-sdk-go-v2/config v1.31.6 github.com/aws/aws-sdk-go-v2/service/licensemanager v1.36.2 @@ -25,7 +25,7 @@ require ( github.com/samber/lo v1.51.0 github.com/santhosh-tekuri/jsonschema/v6 v6.0.2 github.com/spf13/cobra v1.9.1 - github.com/stretchr/testify v1.11.0 + github.com/stretchr/testify v1.11.1 github.com/thoas/go-funk v0.9.3 go.opentelemetry.io/otel v1.37.0 go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.13.0 @@ -73,7 +73,7 @@ require ( github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/klauspost/compress v1.18.0 // indirect - github.com/klauspost/cpuid/v2 v2.2.11 // indirect + github.com/klauspost/cpuid/v2 v2.3.0 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect @@ -98,3 +98,5 @@ require ( ) replace github.com/invopop/jsonschema => github.com/cloudquery/jsonschema v0.0.0-20240220124159-92878faa2a66 + +replace github.com/apache/arrow-go/v18 => github.com/cloudquery/arrow-go/v18 v18.0.0-20250917180258-5bc4c36376be diff --git a/go.sum b/go.sum index ed425e6c1a..dfa837f61d 100644 --- a/go.sum +++ b/go.sum @@ -3,8 +3,6 @@ github.com/adrg/xdg v0.5.3 h1:xRnxJXne7+oWDatRhR1JLnvuccuIeCoBu2rtuLqQB78= github.com/adrg/xdg v0.5.3/go.mod h1:nlTsY+NNiCBGCK2tpm09vRqfVzrc2fLmXGpBLF0zlTQ= github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ= github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY= -github.com/apache/arrow-go/v18 v18.4.0 h1:/RvkGqH517iY8bZKc4FD5/kkdwXJGjxf28JIXbJ/oB0= -github.com/apache/arrow-go/v18 v18.4.0/go.mod h1:Aawvwhj8x2jURIzD9Moy72cF0FyJXOpkYpdmGRHcw14= github.com/apache/arrow/go/v13 v13.0.0-20230731205701-112f94971882 h1:mFDZW1FQk9yndPvxScp7RpcOpdSHaqcgBWO7sDlx4S8= github.com/apache/arrow/go/v13 v13.0.0-20230731205701-112f94971882/go.mod h1:W69eByFNO0ZR30q1/7Sr9d83zcVZmF2MiP3fFYAWJOc= github.com/apache/thrift v0.22.0 h1:r7mTJdj51TMDe6RtcmNdQxgn9XcyfGDOzegMDRg47uc= @@ -50,6 +48,8 @@ github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMU github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= github.com/cenkalti/backoff/v5 v5.0.2 h1:rIfFVxEf1QsI7E1ZHfp/B4DF/6QBAUhmgkxc0H7Zss8= github.com/cenkalti/backoff/v5 v5.0.2/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= +github.com/cloudquery/arrow-go/v18 v18.0.0-20250917180258-5bc4c36376be h1:yMdIO5OoT61+kLVGiBZZd79w2iEdLKvF6FZHITc1CbM= +github.com/cloudquery/arrow-go/v18 v18.0.0-20250917180258-5bc4c36376be/go.mod h1:NblXIdCxvVSrAbxnZuTcrgBI7RrkEsXgETkX9SIQJTA= github.com/cloudquery/cloudquery-api-go v1.14.3 h1:f6nR5PsxGl932BMDzsjK6rXpHQbkQ7xL8DW2yBSUKn0= github.com/cloudquery/cloudquery-api-go v1.14.3/go.mod h1:KMcMIaX4l3C2QGHzlqeV7Ac9thX7L/sWXMM5wEmcKZI= github.com/cloudquery/codegen v0.3.31 h1:YDqokUyWSECewoaISY4D2iIpFRTDnPtWmQOFgaQ60c0= @@ -110,8 +110,8 @@ github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= -github.com/klauspost/cpuid/v2 v2.2.11 h1:0OwqZRYI2rFrjS4kvkDnqJkKHdHaRnCm68/DY4OxRzU= -github.com/klauspost/cpuid/v2 v2.2.11/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= +github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y= +github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -165,8 +165,8 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.11.0 h1:ib4sjIrwZKxE5u/Japgo/7SJV3PvgjGiRNAvTVGqQl8= -github.com/stretchr/testify v1.11.0/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/thoas/go-funk v0.9.3 h1:7+nAEx3kn5ZJcnDm2Bh23N2yOtweO14bi//dvRtgLpw= github.com/thoas/go-funk v0.9.3/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q= github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc= diff --git a/internal/batch/slice.go b/internal/batch/slice.go index 4a023fe593..52b8381d98 100644 --- a/internal/batch/slice.go +++ b/internal/batch/slice.go @@ -7,13 +7,13 @@ import ( type ( SlicedRecord struct { - arrow.Record + arrow.RecordBatch Bytes int64 // we need this as the util.TotalRecordSize will report the full size even for the sliced record bytesPerRow int64 } ) -func (s *SlicedRecord) split(limit *Cap) (add *SlicedRecord, toFlush []arrow.Record, rest *SlicedRecord) { +func (s *SlicedRecord) split(limit *Cap) (add *SlicedRecord, toFlush []arrow.RecordBatch, rest *SlicedRecord) { if s == nil { return nil, nil, nil } @@ -23,13 +23,13 @@ func (s *SlicedRecord) split(limit *Cap) (add *SlicedRecord, toFlush []arrow.Rec limit.add(add.Bytes, add.NumRows()) } - if s.Record == nil { + if s.RecordBatch == nil { // all processed return add, nil, nil } toFlush = s.getToFlush(limit) - if s.Record == nil { + if s.RecordBatch == nil { // all processed return add, toFlush, nil } @@ -56,21 +56,21 @@ func (s *SlicedRecord) getAdd(limit *Cap) *SlicedRecord { // grab the whole record (either no limits or not overflowing) res := *s s.Bytes = 0 - s.Record = nil + s.RecordBatch = nil return &res } res := SlicedRecord{ - Record: s.NewSlice(0, rows), + RecordBatch: s.NewSlice(0, rows), Bytes: rows * s.bytesPerRow, bytesPerRow: s.bytesPerRow, } - s.Record = s.NewSlice(rows, s.NumRows()) + s.RecordBatch = s.NewSlice(rows, s.NumRows()) s.Bytes -= res.Bytes return &res } -func (s *SlicedRecord) getToFlush(limit *Cap) []arrow.Record { +func (s *SlicedRecord) getToFlush(limit *Cap) []arrow.RecordBatch { rowsByBytes := limit.bytes.capPerN(s.bytesPerRow) rows := limit.rows.cap() switch { @@ -93,7 +93,7 @@ func (s *SlicedRecord) getToFlush(limit *Cap) []arrow.Record { return nil } - flush := make([]arrow.Record, 0, s.NumRows()/rows) + flush := make([]arrow.RecordBatch, 0, s.NumRows()/rows) offset := int64(0) for offset+rows <= s.NumRows() { flush = append(flush, s.NewSlice(offset, offset+rows)) @@ -101,33 +101,33 @@ func (s *SlicedRecord) getToFlush(limit *Cap) []arrow.Record { } if offset == s.NumRows() { // we processed everything for flush - s.Record = nil + s.RecordBatch = nil s.Bytes = 0 return flush } // set record to the remainder - s.Record = s.NewSlice(offset, s.NumRows()) + s.RecordBatch = s.NewSlice(offset, s.NumRows()) s.Bytes = s.NumRows() * s.bytesPerRow return flush } -func (s *SlicedRecord) slice() []arrow.Record { - res := make([]arrow.Record, s.NumRows()) +func (s *SlicedRecord) slice() []arrow.RecordBatch { + res := make([]arrow.RecordBatch, s.NumRows()) for i := int64(0); i < s.NumRows(); i++ { res[i] = s.NewSlice(i, i+1) } return res } -func newSlicedRecord(r arrow.Record) *SlicedRecord { +func newSlicedRecord(r arrow.RecordBatch) *SlicedRecord { if r.NumRows() == 0 { return nil } res := SlicedRecord{ - Record: r, - Bytes: util.TotalRecordSize(r), + RecordBatch: r, + Bytes: util.TotalRecordSize(r), } res.bytesPerRow = res.Bytes / r.NumRows() return &res @@ -136,10 +136,10 @@ func newSlicedRecord(r arrow.Record) *SlicedRecord { // SliceRecord will return the SlicedRecord you can add to the batch given the restrictions provided (if any). // The meaning of the returned values: // - `add` is good to be added to the current batch that the caller is assembling -// - `flush` represents sliced arrow.Record that needs own batch to be flushed +// - `flush` represents sliced arrow.RecordBatch that needs own batch to be flushed // - `remaining` represents the overflow of the batch after `add` & `flush` are processed // Note that the `limit` provided will not be updated. -func SliceRecord(r arrow.Record, limit *Cap) (add *SlicedRecord, flush []arrow.Record, remaining *SlicedRecord) { +func SliceRecord(r arrow.RecordBatch, limit *Cap) (add *SlicedRecord, flush []arrow.RecordBatch, remaining *SlicedRecord) { l := *limit // copy value return newSlicedRecord(r).split(&l) } diff --git a/internal/clients/state/v3/state.go b/internal/clients/state/v3/state.go index 42ab5e52b1..cdd9aae524 100644 --- a/internal/clients/state/v3/state.go +++ b/internal/clients/state/v3/state.go @@ -163,7 +163,7 @@ func (c *Client) Flush(ctx context.Context) error { version.Append(val.version) } } - rec := bldr.NewRecord() + rec := bldr.NewRecordBatch() recordBytes, err := pb.RecordToBytes(rec) if err != nil { return err diff --git a/internal/memdb/memdb.go b/internal/memdb/memdb.go index 6ff389bf8d..ef2eb20231 100644 --- a/internal/memdb/memdb.go +++ b/internal/memdb/memdb.go @@ -16,7 +16,7 @@ import ( // client is mostly used for testing the destination plugin. type client struct { - memoryDB map[string][]arrow.Record + memoryDB map[string][]arrow.RecordBatch tables map[string]*schema.Table memoryDBLock sync.RWMutex errOnWrite bool @@ -42,7 +42,7 @@ func WithBlockingWrite() Option { func GetNewClient(options ...Option) plugin.NewClientFunc { c := &client{ - memoryDB: make(map[string][]arrow.Record), + memoryDB: make(map[string][]arrow.RecordBatch), memoryDBLock: sync.RWMutex{}, tables: map[string]*schema.Table{ "table1": { @@ -112,13 +112,13 @@ func NewMemDBClientErrOnNew(context.Context, zerolog.Logger, []byte, plugin.NewC return nil, errors.New("newTestDestinationMemDBClientErrOnNew") } -func (c *client) overwrite(table *schema.Table, record arrow.Record) { +func (c *client) overwrite(table *schema.Table, record arrow.RecordBatch) { for i := int64(0); i < record.NumRows(); i++ { c.overwriteRow(table, record.NewSlice(i, i+1)) } } -func (c *client) overwriteRow(table *schema.Table, data arrow.Record) { +func (c *client) overwriteRow(table *schema.Table, data arrow.RecordBatch) { tableName := table.Name pksIndex := table.PrimaryKeysIndexes() if len(pksIndex) == 0 { @@ -152,7 +152,7 @@ func (*client) GetSpec() any { return &Spec{} } -func (c *client) Read(_ context.Context, table *schema.Table, res chan<- arrow.Record) error { +func (c *client) Read(_ context.Context, table *schema.Table, res chan<- arrow.RecordBatch) error { c.memoryDBLock.RLock() defer c.memoryDBLock.RUnlock() @@ -196,7 +196,7 @@ func (c *client) migrate(_ context.Context, table *schema.Table) { tableName := table.Name memTable := c.memoryDB[tableName] if memTable == nil { - c.memoryDB[tableName] = make([]arrow.Record, 0) + c.memoryDB[tableName] = make([]arrow.RecordBatch, 0) c.tables[tableName] = table return } @@ -206,7 +206,7 @@ func (c *client) migrate(_ context.Context, table *schema.Table) { if changes == nil { return } - c.memoryDB[tableName] = make([]arrow.Record, 0) + c.memoryDB[tableName] = make([]arrow.RecordBatch, 0) c.tables[tableName] = table } @@ -253,7 +253,7 @@ func (c *client) Close(context.Context) error { } func (c *client) deleteStale(_ context.Context, msg *message.WriteDeleteStale) { - var filteredTable []arrow.Record + var filteredTable []arrow.RecordBatch tableName := msg.TableName for i, row := range c.memoryDB[tableName] { sc := row.Schema() @@ -280,7 +280,7 @@ func (c *client) deleteStale(_ context.Context, msg *message.WriteDeleteStale) { } func (c *client) deleteRecord(_ context.Context, msg *message.WriteDeleteRecord) { - var filteredTable []arrow.Record + var filteredTable []arrow.RecordBatch tableName := msg.TableName for i, row := range c.memoryDB[tableName] { isMatch := true @@ -308,7 +308,7 @@ func (c *client) deleteRecord(_ context.Context, msg *message.WriteDeleteRecord) c.memoryDB[tableName] = filteredTable } -func (*client) Transform(_ context.Context, _ <-chan arrow.Record, _ chan<- arrow.Record) error { +func (*client) Transform(_ context.Context, _ <-chan arrow.RecordBatch, _ chan<- arrow.RecordBatch) error { return nil } @@ -316,7 +316,7 @@ func (*client) TransformSchema(_ context.Context, _ *arrow.Schema) (*arrow.Schem return nil, nil } -func evaluatePredicate(pred message.Predicate, record arrow.Record) bool { +func evaluatePredicate(pred message.Predicate, record arrow.RecordBatch) bool { sc := record.Schema() indices := sc.FieldIndices(pred.Column) if len(indices) == 0 { diff --git a/internal/memdb/memdb_test.go b/internal/memdb/memdb_test.go index d17ecac73e..416de7629c 100644 --- a/internal/memdb/memdb_test.go +++ b/internal/memdb/memdb_test.go @@ -62,7 +62,7 @@ func TestOnWriteError(t *testing.T) { // sourceSpec := pbPlugin.Spec{ // Name: sourceName, // } -// ch := make(chan arrow.Record, 1) +// ch := make(chan arrow.RecordBatch, 1) // ctx, cancel := context.WithTimeout(ctx, 2*time.Second) // opts := schema.GenTestDataOptions{ // SourceName: "test", diff --git a/internal/pk/pk.go b/internal/pk/pk.go index 727e630b7a..c5894c8109 100644 --- a/internal/pk/pk.go +++ b/internal/pk/pk.go @@ -7,7 +7,7 @@ import ( "github.com/cloudquery/plugin-sdk/v4/schema" ) -func String(resource arrow.Record) string { +func String(resource arrow.RecordBatch) string { sc := resource.Schema() table, err := schema.NewTableFromArrowSchema(sc) if err != nil { diff --git a/internal/reversertransformer/reversertransformer.go b/internal/reversertransformer/reversertransformer.go index e7183e896a..c3c2605833 100644 --- a/internal/reversertransformer/reversertransformer.go +++ b/internal/reversertransformer/reversertransformer.go @@ -40,7 +40,7 @@ func (*client) Close(context.Context) error { return nil } -func (c *client) Transform(ctx context.Context, recvRecords <-chan arrow.Record, sendRecords chan<- arrow.Record) error { +func (c *client) Transform(ctx context.Context, recvRecords <-chan arrow.RecordBatch, sendRecords chan<- arrow.RecordBatch) error { for { select { case record, ok := <-recvRecords: @@ -62,7 +62,7 @@ func (*client) TransformSchema(_ context.Context, old *arrow.Schema) (*arrow.Sch return old, nil } -func (*client) reverseStrings(record arrow.Record) (arrow.Record, error) { +func (*client) reverseStrings(record arrow.RecordBatch) (arrow.RecordBatch, error) { for i, column := range record.Columns() { if column.DataType().ID() != arrow.STRING { continue diff --git a/internal/reversertransformer/reversertransformer_test.go b/internal/reversertransformer/reversertransformer_test.go index 5cff4bb432..74a69b054e 100644 --- a/internal/reversertransformer/reversertransformer_test.go +++ b/internal/reversertransformer/reversertransformer_test.go @@ -55,13 +55,13 @@ func makeRequestFromString(s string) *pb.Transform_Request { return &pb.Transform_Request{Record: bs} } -func makeRecordFromString(s string) arrow.Record { +func makeRecordFromString(s string) arrow.RecordBatch { str := array.NewStringBuilder(memory.DefaultAllocator) str.AppendString(s) arr := str.NewStringArray() schema := arrow.NewSchema([]arrow.Field{{Name: "col1", Type: arrow.BinaryTypes.String}}, nil) - return array.NewRecord(schema, []arrow.Array{arr}, 1) + return array.NewRecordBatch(schema, []arrow.Array{arr}, 1) } type mockTransformServer struct { diff --git a/internal/servers/destination/v0/schemav2tov3.go b/internal/servers/destination/v0/schemav2tov3.go index c4770eb95c..a9da00060f 100644 --- a/internal/servers/destination/v0/schemav2tov3.go +++ b/internal/servers/destination/v0/schemav2tov3.go @@ -96,11 +96,11 @@ func TypeV2ToV3(dataType schemav2.ValueType) arrow.DataType { } } -func CQTypesOneToRecord(mem memory.Allocator, c schemav2.CQTypes, arrowSchema *arrow.Schema) arrow.Record { +func CQTypesOneToRecord(mem memory.Allocator, c schemav2.CQTypes, arrowSchema *arrow.Schema) arrow.RecordBatch { return CQTypesToRecord(mem, []schemav2.CQTypes{c}, arrowSchema) } -func CQTypesToRecord(mem memory.Allocator, c []schemav2.CQTypes, arrowSchema *arrow.Schema) arrow.Record { +func CQTypesToRecord(mem memory.Allocator, c []schemav2.CQTypes, arrowSchema *arrow.Schema) arrow.RecordBatch { bldr := array.NewRecordBuilder(mem, arrowSchema) fields := bldr.Fields() for i := range fields { @@ -242,5 +242,5 @@ func CQTypesToRecord(mem memory.Allocator, c []schemav2.CQTypes, arrowSchema *ar } } - return bldr.NewRecord() + return bldr.NewRecordBatch() } diff --git a/internal/servers/destination/v1/destination_test.go b/internal/servers/destination/v1/destination_test.go index a1575b9e21..909c97a504 100644 --- a/internal/servers/destination/v1/destination_test.go +++ b/internal/servers/destination/v1/destination_test.go @@ -121,7 +121,7 @@ func TestPluginSync(t *testing.T) { sc := table.ToArrowSchema() bldr := array.NewRecordBuilder(memory.DefaultAllocator, sc) bldr.Field(0).(*array.StringBuilder).Append("test") - record := bldr.NewRecord() + record := bldr.NewRecordBatch() recordBytes, err := pbSource.RecordToBytes(record) if err != nil { t.Fatal(err) diff --git a/internal/servers/destination/v1/destinations.go b/internal/servers/destination/v1/destinations.go index 2f6928c55c..3aab82276d 100644 --- a/internal/servers/destination/v1/destinations.go +++ b/internal/servers/destination/v1/destinations.go @@ -162,7 +162,7 @@ func (s *Server) Write(msg pb.Destination_WriteServer) error { return status.Errorf(codes.InvalidArgument, "failed to create reader: %v", err) } for rdr.Next() { - rec := rdr.Record() + rec := rdr.RecordBatch() rec.Retain() msg := &message.WriteInsert{ Record: rec, diff --git a/internal/servers/plugin/v3/plugin.go b/internal/servers/plugin/v3/plugin.go index d2d968881c..df5c42dbd5 100644 --- a/internal/servers/plugin/v3/plugin.go +++ b/internal/servers/plugin/v3/plugin.go @@ -114,7 +114,7 @@ func (s *Server) Init(ctx context.Context, req *pb.Init_Request) (*pb.Init_Respo } func (s *Server) Read(req *pb.Read_Request, stream pb.Plugin_ReadServer) error { - records := make(chan arrow.Record) + records := make(chan arrow.RecordBatch) var readErr error ctx := stream.Context() @@ -412,8 +412,8 @@ func (s *Server) Write(stream pb.Plugin_WriteServer) error { func (s *Server) Transform(stream pb.Plugin_TransformServer) error { var ( - recvRecords = make(chan arrow.Record) - sendRecords = make(chan arrow.Record) + recvRecords = make(chan arrow.RecordBatch) + sendRecords = make(chan arrow.RecordBatch) ctx = stream.Context() eg, gctx = errgroup.WithContext(ctx) ) diff --git a/internal/servers/plugin/v3/plugin_test.go b/internal/servers/plugin/v3/plugin_test.go index 5ad3c47e13..f5daf4a817 100644 --- a/internal/servers/plugin/v3/plugin_test.go +++ b/internal/servers/plugin/v3/plugin_test.go @@ -148,7 +148,7 @@ func TestPluginSync(t *testing.T) { } bldr := array.NewRecordBuilder(memory.DefaultAllocator, sc) bldr.Field(0).(*array.StringBuilder).Append("test") - record := bldr.NewRecord() + record := bldr.NewRecordBatch() recordBytes, err := pb.RecordToBytes(record) if err != nil { t.Fatal(err) @@ -245,7 +245,7 @@ func getColumnAdderPlugin(...plugin.Option) plugin.NewClientFunc { } } -func (*mockSourceColumnAdderPluginClient) Transform(context.Context, <-chan arrow.Record, chan<- arrow.Record) error { +func (*mockSourceColumnAdderPluginClient) Transform(context.Context, <-chan arrow.RecordBatch, chan<- arrow.RecordBatch) error { return nil } func (*mockSourceColumnAdderPluginClient) TransformSchema(_ context.Context, old *arrow.Schema) (*arrow.Schema, error) { @@ -259,7 +259,7 @@ type testTransformPluginClient struct { recordsSent int32 } -func (c *testTransformPluginClient) Transform(ctx context.Context, recvRecords <-chan arrow.Record, sendRecords chan<- arrow.Record) error { +func (c *testTransformPluginClient) Transform(ctx context.Context, recvRecords <-chan arrow.RecordBatch, sendRecords chan<- arrow.RecordBatch) error { for record := range recvRecords { select { default: @@ -384,11 +384,11 @@ func makeRequestFromString(s string) *pb.Transform_Request { return &pb.Transform_Request{Record: bs} } -func makeRecordFromString(s string) arrow.Record { +func makeRecordFromString(s string) arrow.RecordBatch { str := array.NewStringBuilder(memory.DefaultAllocator) str.AppendString(s) arr := str.NewStringArray() sch := arrow.NewSchema([]arrow.Field{{Name: "col1", Type: arrow.BinaryTypes.String}}, nil) - return array.NewRecord(sch, []arrow.Array{arr}, 1) + return array.NewRecordBatch(sch, []arrow.Array{arr}, 1) } diff --git a/message/sync_message.go b/message/sync_message.go index 912cded1d5..c92e449abb 100644 --- a/message/sync_message.go +++ b/message/sync_message.go @@ -30,7 +30,7 @@ func (m SyncMigrateTable) GetTable() *schema.Table { type SyncInsert struct { syncBaseMessage - Record arrow.Record + Record arrow.RecordBatch } func (m *SyncInsert) GetTable() *schema.Table { @@ -90,8 +90,8 @@ func (m SyncInserts) Exists(tableName string) bool { return false } -func (m SyncInserts) GetRecords() []arrow.Record { - res := make([]arrow.Record, len(m)) +func (m SyncInserts) GetRecords() []arrow.RecordBatch { + res := make([]arrow.RecordBatch, len(m)) for i := range m { res[i] = m[i].Record } @@ -99,8 +99,8 @@ func (m SyncInserts) GetRecords() []arrow.Record { } // Get all records for a single table -func (m SyncInserts) GetRecordsForTable(table *schema.Table) []arrow.Record { - res := make([]arrow.Record, 0, len(m)) +func (m SyncInserts) GetRecordsForTable(table *schema.Table) []arrow.RecordBatch { + res := make([]arrow.RecordBatch, 0, len(m)) for _, insert := range m { md := insert.Record.Schema().Metadata() tableNameMeta, ok := md.GetValue(schema.MetadataTableName) diff --git a/message/write_message.go b/message/write_message.go index f586e0b1cc..7ba369d4aa 100644 --- a/message/write_message.go +++ b/message/write_message.go @@ -66,7 +66,7 @@ func (m WriteMigrateTables) GetMessageByTable(tableName string) *WriteMigrateTab type WriteInsert struct { writeBaseMessage - Record arrow.Record + Record arrow.RecordBatch } func (m *WriteInsert) GetTable() *schema.Table { @@ -86,16 +86,16 @@ func (m WriteInserts) Exists(tableName string) bool { }) } -func (m WriteInserts) GetRecords() []arrow.Record { - res := make([]arrow.Record, len(m)) +func (m WriteInserts) GetRecords() []arrow.RecordBatch { + res := make([]arrow.RecordBatch, len(m)) for i := range m { res[i] = m[i].Record } return res } -func (m WriteInserts) GetRecordsForTable(table *schema.Table) []arrow.Record { - res := make([]arrow.Record, 0, len(m)) +func (m WriteInserts) GetRecordsForTable(table *schema.Table) []arrow.RecordBatch { + res := make([]arrow.RecordBatch, 0, len(m)) for _, insert := range m { tableNameMeta, ok := insert.Record.Schema().Metadata().GetValue(schema.MetadataTableName) if !ok || tableNameMeta != table.Name { @@ -139,7 +139,7 @@ type TableRelations []TableRelation type Predicate struct { Operator string Column string - Record arrow.Record + Record arrow.RecordBatch } type Predicates []Predicate diff --git a/plugin/diff.go b/plugin/diff.go index deff697945..cbcad2443a 100644 --- a/plugin/diff.go +++ b/plugin/diff.go @@ -9,7 +9,7 @@ import ( "github.com/apache/arrow-go/v18/arrow/memory" ) -func RecordsDiff(sc *arrow.Schema, have, want []arrow.Record) string { +func RecordsDiff(sc *arrow.Schema, have, want []arrow.RecordBatch) string { return TableDiff(array.NewTableFromRecords(sc, have), array.NewTableFromRecords(sc, want)) } diff --git a/plugin/nulls.go b/plugin/nulls.go index 4e99484f63..961a971037 100644 --- a/plugin/nulls.go +++ b/plugin/nulls.go @@ -91,13 +91,13 @@ func (s *WriterTestSuite) replaceNullsByEmptyNestedArray(arr arrow.Array) arrow. } } -func (s *WriterTestSuite) handleNulls(record arrow.Record) arrow.Record { +func (s *WriterTestSuite) handleNulls(record arrow.RecordBatch) arrow.RecordBatch { cols := record.Columns() newCols := make([]arrow.Array, len(cols)) for c, col := range cols { newCols[c] = s.handleNullsArray(col) } - return array.NewRecord(record.Schema(), newCols, record.NumRows()) + return array.NewRecordBatch(record.Schema(), newCols, record.NumRows()) } func (s *WriterTestSuite) handleNullsArray(arr arrow.Array) arrow.Array { diff --git a/plugin/plugin.go b/plugin/plugin.go index 9a482041ea..3871a830c4 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -38,7 +38,7 @@ func (UnimplementedDestination) Write(context.Context, <-chan message.WriteMessa return ErrNotImplemented } -func (UnimplementedDestination) Read(context.Context, *schema.Table, chan<- arrow.Record) error { +func (UnimplementedDestination) Read(context.Context, *schema.Table, chan<- arrow.RecordBatch) error { return ErrNotImplemented } @@ -56,7 +56,7 @@ func (UnimplementedSource) Tables(context.Context, TableOptions) (schema.Tables, type UnimplementedTransformer struct{} -func (UnimplementedTransformer) Transform(context.Context, <-chan arrow.Record, chan<- arrow.Record) error { +func (UnimplementedTransformer) Transform(context.Context, <-chan arrow.RecordBatch, chan<- arrow.RecordBatch) error { return ErrNotImplemented } func (UnimplementedTransformer) TransformSchema(context.Context, *arrow.Schema) (*arrow.Schema, error) { diff --git a/plugin/plugin_destination.go b/plugin/plugin_destination.go index a75c719d9e..e92714d7d4 100644 --- a/plugin/plugin_destination.go +++ b/plugin/plugin_destination.go @@ -12,7 +12,7 @@ import ( type DestinationClient interface { Close(ctx context.Context) error - Read(ctx context.Context, table *schema.Table, res chan<- arrow.Record) error + Read(ctx context.Context, table *schema.Table, res chan<- arrow.RecordBatch) error Write(ctx context.Context, res <-chan message.WriteMessage) error } @@ -40,7 +40,7 @@ func (p *Plugin) Write(ctx context.Context, res <-chan message.WriteMessage) err } // Read is read data from the requested table to the given channel, returned in the same format as the table -func (p *Plugin) Read(ctx context.Context, table *schema.Table, res chan<- arrow.Record) error { +func (p *Plugin) Read(ctx context.Context, table *schema.Table, res chan<- arrow.RecordBatch) error { if !p.mu.TryLock() { return errors.New("plugin already in use") } diff --git a/plugin/plugin_read.go b/plugin/plugin_read.go index 4bbb27b2c4..fe0faeb787 100644 --- a/plugin/plugin_read.go +++ b/plugin/plugin_read.go @@ -8,15 +8,15 @@ import ( ) // readAll is used in tests to read all records from a table. -func (p *Plugin) readAll(ctx context.Context, table *schema.Table) ([]arrow.Record, error) { +func (p *Plugin) readAll(ctx context.Context, table *schema.Table) ([]arrow.RecordBatch, error) { var err error - ch := make(chan arrow.Record) + ch := make(chan arrow.RecordBatch) go func() { defer close(ch) err = p.client.Read(ctx, table, ch) }() // nolint:prealloc - var records []arrow.Record + var records []arrow.RecordBatch for record := range ch { records = append(records, sliceToSingleRowRecord(record)...) } @@ -24,8 +24,8 @@ func (p *Plugin) readAll(ctx context.Context, table *schema.Table) ([]arrow.Reco return records, err } -func sliceToSingleRowRecord(record arrow.Record) []arrow.Record { - result := make([]arrow.Record, record.NumRows()) +func sliceToSingleRowRecord(record arrow.RecordBatch) []arrow.RecordBatch { + result := make([]arrow.RecordBatch, record.NumRows()) for i := int64(0); i < record.NumRows(); i++ { result[i] = record.NewSlice(i, i+1) } diff --git a/plugin/plugin_test.go b/plugin/plugin_test.go index 7968b9a825..55a3bdd8bb 100644 --- a/plugin/plugin_test.go +++ b/plugin/plugin_test.go @@ -26,7 +26,7 @@ func (*testPluginClient) Tables(context.Context, TableOptions) (schema.Tables, e return schema.Tables{}, nil } -func (*testPluginClient) Read(context.Context, *schema.Table, chan<- arrow.Record) error { +func (*testPluginClient) Read(context.Context, *schema.Table, chan<- arrow.RecordBatch) error { return nil } @@ -56,7 +56,7 @@ func (c *testPluginClient) Write(_ context.Context, res <-chan message.WriteMess func (*testPluginClient) Close(context.Context) error { return nil } -func (*testPluginClient) Transform(context.Context, <-chan arrow.Record, chan<- arrow.Record) error { +func (*testPluginClient) Transform(context.Context, <-chan arrow.RecordBatch, chan<- arrow.RecordBatch) error { return nil } func (*testPluginClient) TransformSchema(context.Context, *arrow.Schema) (*arrow.Schema, error) { diff --git a/plugin/plugin_transformer.go b/plugin/plugin_transformer.go index fbe8f19aed..27d3420e2f 100644 --- a/plugin/plugin_transformer.go +++ b/plugin/plugin_transformer.go @@ -7,11 +7,11 @@ import ( ) type TransformerClient interface { - Transform(context.Context, <-chan arrow.Record, chan<- arrow.Record) error + Transform(context.Context, <-chan arrow.RecordBatch, chan<- arrow.RecordBatch) error TransformSchema(context.Context, *arrow.Schema) (*arrow.Schema, error) } -func (p *Plugin) Transform(ctx context.Context, recvRecords <-chan arrow.Record, sendRecords chan<- arrow.Record) error { +func (p *Plugin) Transform(ctx context.Context, recvRecords <-chan arrow.RecordBatch, sendRecords chan<- arrow.RecordBatch) error { err := p.client.Transform(ctx, recvRecords, sendRecords) close(sendRecords) return err diff --git a/plugin/sort.go b/plugin/sort.go index 0d886e6f43..cf59bfdb57 100644 --- a/plugin/sort.go +++ b/plugin/sort.go @@ -12,7 +12,7 @@ import ( // Because "id" is auto-incrementing in the test data generator, if passed "id" // this should result in records being returned in insertion order. // nolint:unparam -func sortRecords(table *schema.Table, records []arrow.Record, columnName string) { +func sortRecords(table *schema.Table, records []arrow.RecordBatch, columnName string) { sch := table.ToArrowSchema() if !sch.HasField(columnName) { panic("table has no '" + columnName + "' column to sort on") diff --git a/plugin/testing_write_delete.go b/plugin/testing_write_delete.go index 483e1a4579..e3a62d0e7e 100644 --- a/plugin/testing_write_delete.go +++ b/plugin/testing_write_delete.go @@ -33,7 +33,7 @@ func (s *WriterTestSuite) testDeleteStaleBasic(ctx context.Context, t *testing.T bldr.Field(0).(*array.Int64Builder).Append(0) bldr.Field(1).(*array.StringBuilder).Append(sourceName) bldr.Field(2).(*array.TimestampBuilder).AppendTime(syncTime) - record1 := bldr.NewRecord() + record1 := bldr.NewRecordBatch() r.NoErrorf(s.plugin.writeOne(ctx, &message.WriteInsert{Record: record1}), "failed to insert record") record1 = s.handleNulls(record1) // we process nulls after writing @@ -51,13 +51,13 @@ func (s *WriterTestSuite) testDeleteStaleBasic(ctx context.Context, t *testing.T records, err = s.plugin.readAll(ctx, table) r.NoErrorf(err, "failed to read after delete stale") r.EqualValuesf(1, TotalRows(records), "unexpected amount of items after delete stale") - r.Emptyf(RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{record1}), "record differs after delete stale") + r.Emptyf(RecordsDiff(table.ToArrowSchema(), records, []arrow.RecordBatch{record1}), "record differs after delete stale") syncTime = syncTime.Add(time.Second) bldr.Field(0).(*array.Int64Builder).Append(1) bldr.Field(1).(*array.StringBuilder).Append(sourceName) bldr.Field(2).(*array.TimestampBuilder).AppendTime(syncTime) - record2 := bldr.NewRecord() + record2 := bldr.NewRecordBatch() r.NoErrorf(s.plugin.writeOne(ctx, &message.WriteInsert{Record: record2}), "failed to insert second record") record2 = s.handleNulls(record2) // we process nulls after writing @@ -66,7 +66,7 @@ func (s *WriterTestSuite) testDeleteStaleBasic(ctx context.Context, t *testing.T r.NoErrorf(err, "failed to read second time") sortRecords(table, records, "id") r.EqualValuesf(2, TotalRows(records), "unexpected amount of items second time") - r.Emptyf(RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{record1, record2}), "record differs after delete stale") + r.Emptyf(RecordsDiff(table.ToArrowSchema(), records, []arrow.RecordBatch{record1, record2}), "record differs after delete stale") r.NoErrorf(s.plugin.writeOne(ctx, &message.WriteDeleteStale{ TableName: table.Name, @@ -77,7 +77,7 @@ func (s *WriterTestSuite) testDeleteStaleBasic(ctx context.Context, t *testing.T records, err = s.plugin.readAll(ctx, table) r.NoErrorf(err, "failed to read after second delete stale") r.EqualValuesf(1, TotalRows(records), "unexpected amount of items after second delete stale") - r.Emptyf(RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{record2}), "record differs after second delete stale") + r.Emptyf(RecordsDiff(table.ToArrowSchema(), records, []arrow.RecordBatch{record2}), "record differs after second delete stale") } func (s *WriterTestSuite) testDeleteStaleAll(ctx context.Context, t *testing.T) { @@ -134,7 +134,7 @@ func (s *WriterTestSuite) testDeleteStaleAll(ctx context.Context, t *testing.T) r.NoErrorf(err, "failed to read second time") sortRecords(table, readRecords, "id") r.EqualValuesf(2*rowsPerRecord, TotalRows(readRecords), "unexpected amount of items after second read") - r.Emptyf(RecordsDiff(table.ToArrowSchema(), readRecords, []arrow.Record{normalRecord, nullRecord}), "record differs") + r.Emptyf(RecordsDiff(table.ToArrowSchema(), readRecords, []arrow.RecordBatch{normalRecord, nullRecord}), "record differs") r.NoErrorf(s.plugin.writeOne(ctx, &message.WriteDeleteStale{ TableName: table.Name, @@ -147,7 +147,7 @@ func (s *WriterTestSuite) testDeleteStaleAll(ctx context.Context, t *testing.T) sortRecords(table, readRecords, "id") r.EqualValuesf(rowsPerRecord, TotalRows(readRecords), "unexpected amount of items after second delete stale") - r.Emptyf(RecordsDiff(table.ToArrowSchema(), readRecords, []arrow.Record{nullRecord}), "record differs") + r.Emptyf(RecordsDiff(table.ToArrowSchema(), readRecords, []arrow.RecordBatch{nullRecord}), "record differs") } func (s *WriterTestSuite) testDeleteRecordBasic(ctx context.Context, t *testing.T) { @@ -170,7 +170,7 @@ func (s *WriterTestSuite) testDeleteRecordBasic(ctx context.Context, t *testing. bldr.Field(0).(*array.Int64Builder).Append(0) bldr.Field(1).(*array.StringBuilder).Append(sourceName) bldr.Field(2).(*array.TimestampBuilder).AppendTime(syncTime) - record1 := bldr.NewRecord() + record1 := bldr.NewRecordBatch() r.NoErrorf(s.plugin.writeOne(ctx, &message.WriteInsert{Record: record1}), "failed to insert record") record1 = s.handleNulls(record1) // we process nulls after writing @@ -187,7 +187,7 @@ func (s *WriterTestSuite) testDeleteRecordBasic(ctx context.Context, t *testing. }, }).ToArrowSchema()) bldrDeleteNoMatch.Field(0).(*array.Int64Builder).Append(1) - deleteValue := bldrDeleteNoMatch.NewRecord() + deleteValue := bldrDeleteNoMatch.NewRecordBatch() r.NoErrorf(s.plugin.writeOne(ctx, &message.WriteDeleteRecord{ DeleteRecord: message.DeleteRecord{ @@ -210,7 +210,7 @@ func (s *WriterTestSuite) testDeleteRecordBasic(ctx context.Context, t *testing. records, err = s.plugin.readAll(ctx, table) r.NoErrorf(err, "failed to read after delete with no match") r.EqualValuesf(1, TotalRows(records), "unexpected amount of items after delete with no match") - r.Emptyf(RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{record1}), "record differs after delete with no match") + r.Emptyf(RecordsDiff(table.ToArrowSchema(), records, []arrow.RecordBatch{record1}), "record differs after delete with no match") // create value for delete statement will be delete One record bldrDeleteMatch := array.NewRecordBuilder(memory.DefaultAllocator, (&schema.Table{ @@ -220,7 +220,7 @@ func (s *WriterTestSuite) testDeleteRecordBasic(ctx context.Context, t *testing. }, }).ToArrowSchema()) bldrDeleteMatch.Field(0).(*array.Int64Builder).Append(0) - deleteValue = bldrDeleteMatch.NewRecord() + deleteValue = bldrDeleteMatch.NewRecordBatch() r.NoErrorf(s.plugin.writeOne(ctx, &message.WriteDeleteRecord{ DeleteRecord: message.DeleteRecord{ @@ -265,7 +265,7 @@ func (s *WriterTestSuite) testDeleteAllRecords(ctx context.Context, t *testing.T bldr.Field(0).(*array.Int64Builder).Append(0) bldr.Field(1).(*array.StringBuilder).Append(sourceName) bldr.Field(2).(*array.TimestampBuilder).AppendTime(syncTime) - record1 := bldr.NewRecord() + record1 := bldr.NewRecordBatch() r.NoErrorf(s.plugin.writeOne(ctx, &message.WriteInsert{Record: record1}), "failed to insert record") @@ -286,7 +286,7 @@ func (s *WriterTestSuite) testDeleteAllRecords(ctx context.Context, t *testing.T bldr.Field(0).(*array.Int64Builder).Append(1) bldr.Field(1).(*array.StringBuilder).Append(sourceName) bldr.Field(2).(*array.TimestampBuilder).AppendTime(syncTime.Add(time.Second)) - record2 := bldr.NewRecord() + record2 := bldr.NewRecordBatch() r.NoErrorf(s.plugin.writeOne(ctx, &message.WriteInsert{Record: record2}), "failed to insert second record") diff --git a/plugin/testing_write_insert.go b/plugin/testing_write_insert.go index 40d72c3d82..b078b9b40e 100644 --- a/plugin/testing_write_insert.go +++ b/plugin/testing_write_insert.go @@ -11,7 +11,7 @@ import ( "github.com/cloudquery/plugin-sdk/v4/schema" ) -func TotalRows(records []arrow.Record) int64 { +func TotalRows(records []arrow.RecordBatch) int64 { totalRows := int64(0) for _, record := range records { totalRows += record.NumRows() @@ -37,7 +37,7 @@ func (s *WriterTestSuite) testInsertBasic(ctx context.Context) error { bldr := array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema()) bldr.Field(0).(*array.Int64Builder).Append(1) bldr.Field(1).(*array.StringBuilder).Append("foo") - record := bldr.NewRecord() + record := bldr.NewRecordBatch() if err := s.plugin.writeOne(ctx, &message.WriteInsert{ Record: record, @@ -73,7 +73,7 @@ func (s *WriterTestSuite) testInsertBasic(ctx context.Context) error { return fmt.Errorf("expected 2 items, got %d", totalItems) } - if diff := RecordsDiff(table.ToArrowSchema(), readRecords, []arrow.Record{record, record}); diff != "" { + if diff := RecordsDiff(table.ToArrowSchema(), readRecords, []arrow.RecordBatch{record, record}); diff != "" { return fmt.Errorf("record[0] differs: %s", diff) } @@ -134,7 +134,7 @@ func (s *WriterTestSuite) testInsertAll(ctx context.Context) error { if totalItems != 2*rowsPerRecord { return fmt.Errorf("items expected after second insert: %d, got: %d", 2*rowsPerRecord, totalItems) } - if diff := RecordsDiff(table.ToArrowSchema(), readRecords, []arrow.Record{normalRecord, nullRecord}); diff != "" { + if diff := RecordsDiff(table.ToArrowSchema(), readRecords, []arrow.RecordBatch{normalRecord, nullRecord}); diff != "" { return fmt.Errorf("record[0] differs: %s", diff) } return nil diff --git a/plugin/testing_write_migrate.go b/plugin/testing_write_migrate.go index 9d4e22d2cb..9939ab62dc 100644 --- a/plugin/testing_write_migrate.go +++ b/plugin/testing_write_migrate.go @@ -68,7 +68,7 @@ func (s *WriterTestSuite) migrate(ctx context.Context, target *schema.Table, sou return fmt.Errorf("expected items: %d, got: %d", rowsPerRecord, totalItems) } - if diff := RecordsDiff(source.ToArrowSchema(), records, []arrow.Record{resource1}); diff != "" { + if diff := RecordsDiff(source.ToArrowSchema(), records, []arrow.RecordBatch{resource1}); diff != "" { return fmt.Errorf("first record differs from expectation: %s", diff) } @@ -349,14 +349,14 @@ func (s *WriterTestSuite) testMigrate( }) } -func expectRows(sc *arrow.Schema, records []arrow.Record, expectTotal int64, expectedLast arrow.Record) error { +func expectRows(sc *arrow.Schema, records []arrow.RecordBatch, expectTotal int64, expectedLast arrow.RecordBatch) error { totalItems := TotalRows(records) if totalItems != expectTotal { return fmt.Errorf("expected %d items, got %d", expectTotal, totalItems) } lastRecord := records[len(records)-1] lastRow := lastRecord.NewSlice(lastRecord.NumRows()-1, lastRecord.NumRows()) - if diff := RecordsDiff(sc, []arrow.Record{lastRow}, []arrow.Record{expectedLast}); diff != "" { + if diff := RecordsDiff(sc, []arrow.RecordBatch{lastRow}, []arrow.RecordBatch{expectedLast}); diff != "" { return fmt.Errorf("record #%d differs from expectation: %s", totalItems, diff) } return nil diff --git a/plugin/testing_write_upsert.go b/plugin/testing_write_upsert.go index 9b13d22f5b..f5013aca82 100644 --- a/plugin/testing_write_upsert.go +++ b/plugin/testing_write_upsert.go @@ -29,7 +29,7 @@ func (s *WriterTestSuite) testUpsertBasic(ctx context.Context) error { bldr := array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema()) bldr.Field(0).(*array.Int64Builder).Append(1) bldr.Field(1).(*array.StringBuilder).Append("foo") - record := bldr.NewRecord() + record := bldr.NewRecordBatch() if err := s.plugin.writeOne(ctx, &message.WriteInsert{ Record: record, @@ -61,7 +61,7 @@ func (s *WriterTestSuite) testUpsertBasic(ctx context.Context) error { if totalItems != 1 { return fmt.Errorf("expected 1 item, got %d", totalItems) } - if diff := RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{record}); diff != "" { + if diff := RecordsDiff(table.ToArrowSchema(), records, []arrow.RecordBatch{record}); diff != "" { return fmt.Errorf("record differs: %s", diff) } return nil @@ -102,7 +102,7 @@ func (s *WriterTestSuite) testUpsertAll(ctx context.Context) error { return fmt.Errorf("expected items after initial insert: %d, got %d", rowsPerRecord, totalItems) } - if diff := RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{normalRecord}); diff != "" { + if diff := RecordsDiff(table.ToArrowSchema(), records, []arrow.RecordBatch{normalRecord}); diff != "" { return fmt.Errorf("record differs after insert: %s", diff) } @@ -126,7 +126,7 @@ func (s *WriterTestSuite) testUpsertAll(ctx context.Context) error { return fmt.Errorf("expected items after upsert: %d, got %d", rowsPerRecord, totalItems) } - if diff := RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{nullRecord}); diff != "" { + if diff := RecordsDiff(table.ToArrowSchema(), records, []arrow.RecordBatch{nullRecord}); diff != "" { return fmt.Errorf("record differs after upsert (columns should be null): %s", diff) } @@ -151,12 +151,12 @@ func (s *WriterTestSuite) testInsertDuplicatePK(ctx context.Context) error { // Create a multi-row record with a duplicate PK value, but different values for the other columns. sc := table.ToArrowSchema() - var records []arrow.Record + var records []arrow.RecordBatch for j := 0; j < rowsPerRecord; j++ { bldr := array.NewRecordBuilder(memory.DefaultAllocator, sc) bldr.Field(0).(*array.Int64Builder).Append(1) bldr.Field(1).(*array.StringBuilder).Append("foo" + fmt.Sprint(j)) - records = append(records, bldr.NewRecord()) + records = append(records, bldr.NewRecordBatch()) bldr.Release() } @@ -169,7 +169,7 @@ func (s *WriterTestSuite) testInsertDuplicatePK(ctx context.Context) error { } columns[n] = concatenated } - normalRecord := array.NewRecord(sc, columns, -1) + normalRecord := array.NewRecordBatch(sc, columns, -1) // normalRecord if err := s.plugin.writeOne(ctx, &message.WriteInsert{ @@ -195,14 +195,14 @@ func (s *WriterTestSuite) testInsertDuplicatePK(ctx context.Context) error { if err != nil { return fmt.Errorf("failed to extract last row from record: %w", err) } - if diff := RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{lastRow}); diff != "" { + if diff := RecordsDiff(table.ToArrowSchema(), records, []arrow.RecordBatch{lastRow}); diff != "" { return fmt.Errorf("record differs after insert: %s", diff) } return nil } -func extractLastRowFromRecord(table *schema.Table, existingRecord arrow.Record) (arrow.Record, error) { +func extractLastRowFromRecord(table *schema.Table, existingRecord arrow.RecordBatch) (arrow.RecordBatch, error) { sc := table.ToArrowSchema() bldr := array.NewRecordBuilder(memory.DefaultAllocator, sc) for i, c := range table.Columns { @@ -213,7 +213,7 @@ func extractLastRowFromRecord(table *schema.Table, existingRecord arrow.Record) return nil, fmt.Errorf("failed to unmarshal json `%v` for column %v: %v", col.ValueStr(lastRow), c.Name, err) } } - lastRecord := append([]arrow.Record{}, bldr.NewRecord()) + lastRecord := append([]arrow.RecordBatch{}, bldr.NewRecordBatch()) bldr.Release() arrowTable := array.NewTableFromRecords(sc, lastRecord) @@ -226,5 +226,5 @@ func extractLastRowFromRecord(table *schema.Table, existingRecord arrow.Record) columns[n] = concatenated } - return array.NewRecord(sc, columns, -1), nil + return array.NewRecordBatch(sc, columns, -1), nil } diff --git a/scalar/scalar.go b/scalar/scalar.go index 3bb1af700b..20de35be72 100644 --- a/scalar/scalar.go +++ b/scalar/scalar.go @@ -35,10 +35,10 @@ type Scalar interface { type Vector []Scalar -func (v Vector) ToArrowRecord(sc *arrow.Schema) arrow.Record { +func (v Vector) ToArrowRecord(sc *arrow.Schema) arrow.RecordBatch { bldr := array.NewRecordBuilder(memory.DefaultAllocator, sc) AppendToRecordBuilder(bldr, v) - rec := bldr.NewRecord() + rec := bldr.NewRecordBatch() return rec } diff --git a/scalar/timestamp.go b/scalar/timestamp.go index 8689308724..a9d41577d0 100644 --- a/scalar/timestamp.go +++ b/scalar/timestamp.go @@ -16,6 +16,8 @@ const ( // these are used by Arrow string format (time is in UTC) arrowStringFormat = "2006-01-02 15:04:05.999999999" arrowStringFormatNew = "2006-01-02 15:04:05.999999999Z" + + ArrowDeprecatedLayout = "2006-01-02 15:04:05.999999999Z0700" ) type Timestamp struct { diff --git a/scalar/timestamp_test.go b/scalar/timestamp_test.go index dba12220c3..6885b94fe1 100644 --- a/scalar/timestamp_test.go +++ b/scalar/timestamp_test.go @@ -131,7 +131,7 @@ func TestAppendToBuilderTimestamp(t *testing.T) { t.Fatal(err) } - bldr := array.NewTimestampBuilder(memory.DefaultAllocator, timestamp.Type) + bldr := array.NewTimestampBuilderWithLayout(memory.DefaultAllocator, timestamp.Type, ArrowDeprecatedLayout) AppendToBuilder(bldr, ×tamp) arr := bldr.NewArray().(*array.Timestamp) diff --git a/scheduler/batch.go b/scheduler/batch.go index ff2b5c8d79..41c8ec9fa0 100644 --- a/scheduler/batch.go +++ b/scheduler/batch.go @@ -121,7 +121,7 @@ type worker struct { // send must be called on len(rows) > 0 func (w *worker) send() { w.logger.Trace().Str("table", w.tableName).Int("rows", w.curRows).Msg("sending sync insert for rows batch") - w.res <- &message.SyncInsert{Record: w.builder.NewRecord()} + w.res <- &message.SyncInsert{Record: w.builder.NewRecordBatch()} // we need to reserve here as NewRecord (& underlying NewArray calls) reset the memory w.builder.Reserve(w.maxRows) w.curRows = 0 // reset diff --git a/schema/arrow.go b/schema/arrow.go index 81b8a39c3c..d53bd668a8 100644 --- a/schema/arrow.go +++ b/schema/arrow.go @@ -49,7 +49,7 @@ func (s Schemas) SchemaByName(name string) *arrow.Schema { return nil } -func hashRecord(record arrow.Record) arrow.Array { +func hashRecord(record arrow.RecordBatch) arrow.Array { numRows := int(record.NumRows()) fields := record.Schema().Fields() hashArray := types.NewUUIDBuilder(memory.DefaultAllocator) @@ -100,7 +100,7 @@ func TimestampArrayFromTime(t time.Time, unit arrow.TimeUnit, timeZone string, n return arrayBuilder.NewArray(), nil } -func ReplaceFieldInRecord(src arrow.Record, fieldName string, field arrow.Array) (record arrow.Record, err error) { +func ReplaceFieldInRecord(src arrow.RecordBatch, fieldName string, field arrow.Array) (record arrow.RecordBatch, err error) { fieldIndexes := src.Schema().FieldIndices(fieldName) for i := range fieldIndexes { record, err = src.SetColumn(fieldIndexes[i], field) @@ -111,7 +111,7 @@ func ReplaceFieldInRecord(src arrow.Record, fieldName string, field arrow.Array) return record, nil } -func AddInternalColumnsToRecord(record arrow.Record, cqClientIDValue string) (arrow.Record, error) { +func AddInternalColumnsToRecord(record arrow.RecordBatch, cqClientIDValue string) (arrow.RecordBatch, error) { schema := record.Schema() nRows := int(record.NumRows()) @@ -145,5 +145,5 @@ func AddInternalColumnsToRecord(record arrow.Record, cqClientIDValue string) (ar allColumns := append(record.Columns(), newColumns...) metadata := schema.Metadata() newSchema := arrow.NewSchema(allFields, &metadata) - return array.NewRecord(newSchema, allColumns, int64(nRows)), nil + return array.NewRecordBatch(newSchema, allColumns, int64(nRows)), nil } diff --git a/schema/arrow_test.go b/schema/arrow_test.go index 5744e4a8f8..0af5379ef6 100644 --- a/schema/arrow_test.go +++ b/schema/arrow_test.go @@ -15,7 +15,7 @@ import ( "github.com/stretchr/testify/require" ) -func RecordDiff(l arrow.Record, r arrow.Record) string { +func RecordDiff(l arrow.RecordBatch, r arrow.RecordBatch) string { var sb strings.Builder if l.NumCols() != r.NumCols() { return fmt.Sprintf("different number of columns: %d vs %d", l.NumCols(), r.NumCols()) @@ -39,7 +39,7 @@ func RecordDiff(l arrow.Record, r arrow.Record) string { return sb.String() } -func buildTestRecord(withClientIDValue string) arrow.Record { +func buildTestRecord(withClientIDValue string) arrow.RecordBatch { testFields := []arrow.Field{ {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: true}, {Name: "name", Type: arrow.BinaryTypes.String, Nullable: true}, @@ -96,13 +96,13 @@ func buildTestRecord(withClientIDValue string) arrow.Record { values := lo.Map(builders, func(builder array.Builder, _ int) arrow.Array { return builder.NewArray() }) - return array.NewRecord(schema, values, int64(testValuesCount)) + return array.NewRecordBatch(schema, values, int64(testValuesCount)) } func TestAddInternalColumnsToRecord(t *testing.T) { tests := []struct { name string - record arrow.Record + record arrow.RecordBatch cqClientIDValue string expectedNewColumns int64 }{ diff --git a/schema/testdata.go b/schema/testdata.go index 0787683d8a..a1375fc7dd 100644 --- a/schema/testdata.go +++ b/schema/testdata.go @@ -219,17 +219,17 @@ func (tg *TestDataGenerator) Reset() { tg.colToRnd = map[string]*rand.Rand{} } -// Generate will produce a single arrow.Record with the given schema and options. -func (tg *TestDataGenerator) Generate(table *Table, opts GenTestDataOptions) arrow.Record { +// Generate will produce a single arrow.RecordBatch with the given schema and options. +func (tg *TestDataGenerator) Generate(table *Table, opts GenTestDataOptions) arrow.RecordBatch { sc := table.ToArrowSchema() if opts.MaxRows == 0 { // We generate an empty record bldr := array.NewRecordBuilder(memory.DefaultAllocator, sc) defer bldr.Release() - return bldr.NewRecord() + return bldr.NewRecordBatch() } - var records []arrow.Record + var records []arrow.RecordBatch for j := 0; j < opts.MaxRows; j++ { tg.counter++ bldr := array.NewRecordBuilder(memory.DefaultAllocator, sc) @@ -248,7 +248,7 @@ func (tg *TestDataGenerator) Generate(table *Table, opts GenTestDataOptions) arr panic(fmt.Sprintf("failed to unmarshal json `%v` for column %v: %v", l, c.Name, err)) } } - records = append(records, bldr.NewRecord()) + records = append(records, bldr.NewRecordBatch()) bldr.Release() } @@ -262,7 +262,7 @@ func (tg *TestDataGenerator) Generate(table *Table, opts GenTestDataOptions) arr columns[n] = concatenated } - return array.NewRecord(sc, columns, -1) + return array.NewRecordBatch(sc, columns, -1) } func (tg TestDataGenerator) getExampleJSON(colName string, dataType arrow.DataType, opts GenTestDataOptions) string { diff --git a/schema/validators.go b/schema/validators.go index bf02b79aec..964350b0e7 100644 --- a/schema/validators.go +++ b/schema/validators.go @@ -9,7 +9,7 @@ import ( "github.com/cloudquery/plugin-sdk/v4/types" ) -func FindEmptyColumns(table *Table, records []arrow.Record) []string { +func FindEmptyColumns(table *Table, records []arrow.RecordBatch) []string { columnsWithValues := make([]bool, len(table.Columns)) emptyColumns := make([]string, 0) diff --git a/schema/validators_test.go b/schema/validators_test.go index 0ecba01bb1..33fc38ec17 100644 --- a/schema/validators_test.go +++ b/schema/validators_test.go @@ -18,7 +18,7 @@ func TestFindEmptyColumns(t *testing.T) { MaxRows: 1, NullRows: true, }) - v := FindEmptyColumns(table, []arrow.Record{record}) + v := FindEmptyColumns(table, []arrow.RecordBatch{record}) require.NotEmpty(t, v) require.Len(t, v, len(table.Columns)-1) // exclude "id" } @@ -30,7 +30,7 @@ func TestFindEmptyColumnsNotEmpty(t *testing.T) { MaxRows: 1, NullRows: false, }) - v := FindEmptyColumns(table, []arrow.Record{record}) + v := FindEmptyColumns(table, []arrow.RecordBatch{record}) require.Empty(t, v) } @@ -47,7 +47,7 @@ func TestFindEmptyColumnsJSON(t *testing.T) { if err != nil { panic(fmt.Sprintf("failed to unmarshal json for column: %v", err)) } - records := []arrow.Record{bldr.NewRecord()} + records := []arrow.RecordBatch{bldr.NewRecordBatch()} bldr.Release() v := FindEmptyColumns(table, records) diff --git a/serve/destination_v1_test.go b/serve/destination_v1_test.go index d55a2b5c98..7fc12352b6 100644 --- a/serve/destination_v1_test.go +++ b/serve/destination_v1_test.go @@ -107,7 +107,7 @@ func TestDestinationV1(t *testing.T) { bldr.Field(0).(*array.StringBuilder).Append(sourceName) bldr.Field(1).(*array.TimestampBuilder).AppendTime(syncTime) bldr.Field(2).(*array.Int16Builder).Append(1) - rec := bldr.NewRecord() + rec := bldr.NewRecordBatch() sourceSpecBytes, err := json.Marshal(sourceSpec) if err != nil { diff --git a/serve/plugin_test.go b/serve/plugin_test.go index d322264ffe..d9ac24b505 100644 --- a/serve/plugin_test.go +++ b/serve/plugin_test.go @@ -96,7 +96,7 @@ func TestPluginServe(t *testing.T) { } bldr := array.NewRecordBuilder(memory.DefaultAllocator, testTable.ToArrowSchema()) bldr.Field(0).(*array.StringBuilder).Append("test") - record := bldr.NewRecord() + record := bldr.NewRecordBatch() recordBytes, err := pb.RecordToBytes(record) if err != nil { @@ -140,7 +140,7 @@ func TestPluginServe(t *testing.T) { if err != nil { t.Fatal(err) } - var resources []arrow.Record + var resources []arrow.RecordBatch for { r, err := syncClient.Recv() if err == io.EOF { @@ -155,7 +155,7 @@ func TestPluginServe(t *testing.T) { t.Fatal(err) } for rdr.Next() { - rec := rdr.Record() + rec := rdr.RecordBatch() rec.Retain() resources = append(resources, rec) } diff --git a/writers/batchwriter/batchwriter.go b/writers/batchwriter/batchwriter.go index 66c8f742bc..06861efac1 100644 --- a/writers/batchwriter/batchwriter.go +++ b/writers/batchwriter/batchwriter.go @@ -157,7 +157,7 @@ func (w *BatchWriter) worker(ctx context.Context, tableName string, ch <-chan *m add, toFlush, rest := batch.SliceRecord(r.Record, limit) if add != nil { - resources = append(resources, &message.WriteInsert{Record: add.Record}) + resources = append(resources, &message.WriteInsert{Record: add.RecordBatch}) limit.AddSlice(add) } if len(toFlush) > 0 || rest != nil || limit.ReachedLimit() { @@ -174,7 +174,7 @@ func (w *BatchWriter) worker(ctx context.Context, tableName string, ch <-chan *m // set the remainder if rest != nil { - resources = append(resources, &message.WriteInsert{Record: rest.Record}) + resources = append(resources, &message.WriteInsert{Record: rest.RecordBatch}) limit.AddSlice(rest) } diff --git a/writers/batchwriter/batchwriter_test.go b/writers/batchwriter/batchwriter_test.go index d84705ec51..1d1427dc96 100644 --- a/writers/batchwriter/batchwriter_test.go +++ b/writers/batchwriter/batchwriter_test.go @@ -233,7 +233,7 @@ func TestBatchUpserts(t *testing.T) { } } -func getRecord(sc *arrow.Schema, rows int) arrow.Record { +func getRecord(sc *arrow.Schema, rows int) arrow.RecordBatch { builder := array.NewRecordBuilder(memory.DefaultAllocator, sc) defer builder.Release() @@ -241,5 +241,5 @@ func getRecord(sc *arrow.Schema, rows int) arrow.Record { f.AppendEmptyValues(rows) } - return builder.NewRecord() + return builder.NewRecordBatch() } diff --git a/writers/mixedbatchwriter/mixedbatchwriter.go b/writers/mixedbatchwriter/mixedbatchwriter.go index 4364f088ce..38033c05b4 100644 --- a/writers/mixedbatchwriter/mixedbatchwriter.go +++ b/writers/mixedbatchwriter/mixedbatchwriter.go @@ -207,7 +207,7 @@ type insertBatchManager struct { func (m *insertBatchManager) append(ctx context.Context, msg *message.WriteInsert) error { add, toFlush, rest := batch.SliceRecord(msg.Record, m.limit) if add != nil { - m.batch = append(m.batch, &message.WriteInsert{Record: add.Record}) + m.batch = append(m.batch, &message.WriteInsert{Record: add.RecordBatch}) m.limit.AddSlice(add) } if len(toFlush) > 0 || rest != nil || m.limit.ReachedLimit() { @@ -226,7 +226,7 @@ func (m *insertBatchManager) append(ctx context.Context, msg *message.WriteInser // set the remainder if rest != nil { - m.batch = append(m.batch, &message.WriteInsert{Record: rest.Record}) + m.batch = append(m.batch, &message.WriteInsert{Record: rest.RecordBatch}) m.limit.AddSlice(rest) } diff --git a/writers/mixedbatchwriter/mixedbatchwriter_test.go b/writers/mixedbatchwriter/mixedbatchwriter_test.go index 16040ca47d..cc0805a279 100644 --- a/writers/mixedbatchwriter/mixedbatchwriter_test.go +++ b/writers/mixedbatchwriter/mixedbatchwriter_test.go @@ -97,7 +97,7 @@ func getTestMessages() testMessages { // message to insert into table1 bldr1 := array.NewRecordBuilder(memory.DefaultAllocator, table1.ToArrowSchema()) bldr1.Field(0).(*array.Int64Builder).Append(1) - rec1 := bldr1.NewRecord() + rec1 := bldr1.NewRecordBatch() msgInsertTable1 := &message.WriteInsert{ Record: rec1, } @@ -105,7 +105,7 @@ func getTestMessages() testMessages { // message to insert into table2 bldr2 := array.NewRecordBuilder(memory.DefaultAllocator, table1.ToArrowSchema()) bldr2.Field(0).(*array.Int64Builder).Append(1) - rec2 := bldr2.NewRecord() + rec2 := bldr2.NewRecordBatch() msgInsertTable2 := &message.WriteInsert{ Record: rec2, } diff --git a/writers/streamingbatchwriter/streamingbatchwriter.go b/writers/streamingbatchwriter/streamingbatchwriter.go index 270d4197e0..3951a01a9c 100644 --- a/writers/streamingbatchwriter/streamingbatchwriter.go +++ b/writers/streamingbatchwriter/streamingbatchwriter.go @@ -472,7 +472,7 @@ func (s *streamingWorkerManager[T]) run(ctx context.Context, wg *sync.WaitGroup) add, toFlush, rest := batch.SliceRecord(ins.Record, s.limit) if add != nil { s.limit.AddSlice(add) - s.send(ctx, any(&message.WriteInsert{Record: add.Record}).(T)) + s.send(ctx, any(&message.WriteInsert{Record: add.RecordBatch}).(T)) } if len(toFlush) > 0 || rest != nil || s.limit.ReachedLimit() { // flush current batch @@ -489,7 +489,7 @@ func (s *streamingWorkerManager[T]) run(ctx context.Context, wg *sync.WaitGroup) // set the remainder if rest != nil { s.limit.AddSlice(rest) - s.send(ctx, any(&message.WriteInsert{Record: rest.Record}).(T)) + s.send(ctx, any(&message.WriteInsert{Record: rest.RecordBatch}).(T)) } } else { s.send(ctx, r) diff --git a/writers/streamingbatchwriter/streamingbatchwriter_test.go b/writers/streamingbatchwriter/streamingbatchwriter_test.go index 0c1c7dc2cc..9718c21919 100644 --- a/writers/streamingbatchwriter/streamingbatchwriter_test.go +++ b/writers/streamingbatchwriter/streamingbatchwriter_test.go @@ -177,7 +177,7 @@ func TestBatchStreamFlushDifferentMessages(t *testing.T) { bldr := array.NewRecordBuilder(memory.DefaultAllocator, streamingBatchTestTable.ToArrowSchema()) bldr.Field(0).(*array.Int64Builder).Append(1) - record := bldr.NewRecord() + record := bldr.NewRecordBatch() if l := testClient.MessageLen(messageTypeMigrateTable); l != 0 { t.Fatalf("expected 0 migrate table messages, got %d", l) @@ -389,7 +389,7 @@ func TestStreamingBatchUpserts(t *testing.T) { bldr := array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema()) bldr.Field(0).(*array.Int64Builder).Append(1) - record := bldr.NewRecord() + record := bldr.NewRecordBatch() ch <- &message.WriteInsert{ Record: record, @@ -590,7 +590,7 @@ func waitForLength(t *testing.T, checkLen func(messageType) int, msgType message } // nolint:unparam -func getRecord(sc *arrow.Schema, rows int) arrow.Record { +func getRecord(sc *arrow.Schema, rows int) arrow.RecordBatch { builder := array.NewRecordBuilder(memory.DefaultAllocator, sc) defer builder.Release() @@ -598,7 +598,7 @@ func getRecord(sc *arrow.Schema, rows int) arrow.Record { f.AppendEmptyValues(rows) } - return builder.NewRecord() + return builder.NewRecordBatch() } // nolint:unparam diff --git a/writers/writers_test.go b/writers/writers_test.go index 2d2b8ceb96..046c0b6732 100644 --- a/writers/writers_test.go +++ b/writers/writers_test.go @@ -23,7 +23,7 @@ import ( type bCase struct { name string wr writers.Writer - rec func() arrow.Record + rec func() arrow.RecordBatch } func BenchmarkWriterMemory(b *testing.B) { @@ -88,7 +88,7 @@ func BenchmarkWriterMemory(b *testing.B) { } } -func makeRecord() func() arrow.Record { +func makeRecord() func() arrow.RecordBatch { table := &schema.Table{ Name: "test_table", Columns: schema.ColumnList{ @@ -100,14 +100,14 @@ func makeRecord() func() arrow.Record { } sc := table.ToArrowSchema() - return func() arrow.Record { + return func() arrow.RecordBatch { bldr := array.NewRecordBuilder(memory.DefaultAllocator, sc) bldr.Field(0).(*array.StringBuilder).Append("test") - return bldr.NewRecord() + return bldr.NewRecordBatch() } } -func makeWideRecord() func() arrow.Record { +func makeWideRecord() func() arrow.RecordBatch { table := &schema.Table{ Name: "test_wide_table", Columns: schema.ColumnList{ @@ -129,17 +129,17 @@ func makeWideRecord() func() arrow.Record { } sc := table.ToArrowSchema() - return func() arrow.Record { + return func() arrow.RecordBatch { bldr := array.NewRecordBuilder(memory.DefaultAllocator, sc) bldr.Field(0).(*array.StringBuilder).Append("test") for i := 0; i < numWideCols; i++ { bldr.Field(i + 1).(*array.Int64Builder).Append(randVals[i]) } - return bldr.NewRecord() + return bldr.NewRecordBatch() } } -func writerMatrix[T writers.Writer, C any, O ~func(T)](prefix string, constructor func(C, ...O) (T, error), client C, recordMaker func() func() arrow.Record, optsMatrix map[string][]O) []bCase { +func writerMatrix[T writers.Writer, C any, O ~func(T)](prefix string, constructor func(C, ...O) (T, error), client C, recordMaker func() func() arrow.RecordBatch, optsMatrix map[string][]O) []bCase { bCases := make([]bCase, 0, len(optsMatrix)) k := maps.Keys(optsMatrix)