Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/simple_plugin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/cloudquery/plugin-sdk/examples/simple_plugin
go 1.24.4

require (
github.com/apache/arrow-go/v18 v18.4.0
github.com/apache/arrow-go/v18 v18.4.1
github.com/cloudquery/plugin-sdk/v4 v4.89.1
github.com/rs/zerolog v1.34.0
)
Expand Down Expand Up @@ -48,7 +48,7 @@ require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/invopop/jsonschema v0.13.0 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.11 // indirect
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
Expand All @@ -59,7 +59,7 @@ require (
github.com/santhosh-tekuri/jsonschema/v6 v6.0.2 // indirect
github.com/spf13/cobra v1.9.1 // indirect
github.com/spf13/pflag v1.0.6 // indirect
github.com/stretchr/testify v1.11.0 // indirect
github.com/stretchr/testify v1.11.1 // indirect
github.com/thoas/go-funk v0.9.3 // indirect
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
Expand Down
12 changes: 6 additions & 6 deletions examples/simple_plugin/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ github.com/adrg/xdg v0.5.3 h1:xRnxJXne7+oWDatRhR1JLnvuccuIeCoBu2rtuLqQB78=
github.com/adrg/xdg v0.5.3/go.mod h1:nlTsY+NNiCBGCK2tpm09vRqfVzrc2fLmXGpBLF0zlTQ=
github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ=
github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY=
github.com/apache/arrow-go/v18 v18.4.0 h1:/RvkGqH517iY8bZKc4FD5/kkdwXJGjxf28JIXbJ/oB0=
github.com/apache/arrow-go/v18 v18.4.0/go.mod h1:Aawvwhj8x2jURIzD9Moy72cF0FyJXOpkYpdmGRHcw14=
github.com/apache/arrow-go/v18 v18.4.1 h1:q/jVkBWCJOB9reDgaIZIdruLQUb1kbkvOnOFezVH1C4=
github.com/apache/arrow-go/v18 v18.4.1/go.mod h1:tLyFubsAl17bvFdUAy24bsSvA/6ww95Iqi67fTpGu3E=
github.com/apache/arrow/go/v13 v13.0.0-20230731205701-112f94971882 h1:mFDZW1FQk9yndPvxScp7RpcOpdSHaqcgBWO7sDlx4S8=
github.com/apache/arrow/go/v13 v13.0.0-20230731205701-112f94971882/go.mod h1:W69eByFNO0ZR30q1/7Sr9d83zcVZmF2MiP3fFYAWJOc=
github.com/apache/thrift v0.22.0 h1:r7mTJdj51TMDe6RtcmNdQxgn9XcyfGDOzegMDRg47uc=
Expand Down Expand Up @@ -109,8 +109,8 @@ github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK
github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/klauspost/cpuid/v2 v2.2.11 h1:0OwqZRYI2rFrjS4kvkDnqJkKHdHaRnCm68/DY4OxRzU=
github.com/klauspost/cpuid/v2 v2.2.11/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y=
github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
Expand Down Expand Up @@ -155,8 +155,8 @@ github.com/stoewer/go-strcase v1.3.1/go.mod h1:fAH5hQ5pehh+j3nZfvwdk2RgEgQjAoM8w
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.11.0 h1:ib4sjIrwZKxE5u/Japgo/7SJV3PvgjGiRNAvTVGqQl8=
github.com/stretchr/testify v1.11.0/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/thoas/go-funk v0.9.3 h1:7+nAEx3kn5ZJcnDm2Bh23N2yOtweO14bi//dvRtgLpw=
github.com/thoas/go-funk v0.9.3/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q=
github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc=
Expand Down
4 changes: 2 additions & 2 deletions examples/simple_plugin/plugin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ func (*Client) Write(context.Context, <-chan message.WriteMessage) error {
return nil
}

func (*Client) Read(context.Context, *schema.Table, chan<- arrow.Record) error {
func (*Client) Read(context.Context, *schema.Table, chan<- arrow.RecordBatch) error {
// Not implemented, just used for testing destination packaging
return nil
}

func (*Client) Transform(_ context.Context, _ <-chan arrow.Record, _ chan<- arrow.Record) error {
func (*Client) Transform(_ context.Context, _ <-chan arrow.RecordBatch, _ chan<- arrow.RecordBatch) error {
// Not implemented, just used for testing destination packaging
return nil
}
Expand Down
8 changes: 5 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/cloudquery/plugin-sdk/v4
go 1.24.4

require (
github.com/apache/arrow-go/v18 v18.4.0
github.com/apache/arrow-go/v18 v18.4.1
github.com/aws/aws-sdk-go-v2 v1.38.3
github.com/aws/aws-sdk-go-v2/config v1.31.6
github.com/aws/aws-sdk-go-v2/service/licensemanager v1.36.2
Expand All @@ -25,7 +25,7 @@ require (
github.com/samber/lo v1.51.0
github.com/santhosh-tekuri/jsonschema/v6 v6.0.2
github.com/spf13/cobra v1.9.1
github.com/stretchr/testify v1.11.0
github.com/stretchr/testify v1.11.1
github.com/thoas/go-funk v0.9.3
go.opentelemetry.io/otel v1.37.0
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.13.0
Expand Down Expand Up @@ -73,7 +73,7 @@ require (
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.11 // indirect
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
Expand All @@ -98,3 +98,5 @@ require (
)

replace github.com/invopop/jsonschema => github.com/cloudquery/jsonschema v0.0.0-20240220124159-92878faa2a66

replace github.com/apache/arrow-go/v18 => github.com/cloudquery/arrow-go/v18 v18.0.0-20250917180258-5bc4c36376be
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ github.com/adrg/xdg v0.5.3 h1:xRnxJXne7+oWDatRhR1JLnvuccuIeCoBu2rtuLqQB78=
github.com/adrg/xdg v0.5.3/go.mod h1:nlTsY+NNiCBGCK2tpm09vRqfVzrc2fLmXGpBLF0zlTQ=
github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ=
github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY=
github.com/apache/arrow-go/v18 v18.4.0 h1:/RvkGqH517iY8bZKc4FD5/kkdwXJGjxf28JIXbJ/oB0=
github.com/apache/arrow-go/v18 v18.4.0/go.mod h1:Aawvwhj8x2jURIzD9Moy72cF0FyJXOpkYpdmGRHcw14=
github.com/apache/arrow/go/v13 v13.0.0-20230731205701-112f94971882 h1:mFDZW1FQk9yndPvxScp7RpcOpdSHaqcgBWO7sDlx4S8=
github.com/apache/arrow/go/v13 v13.0.0-20230731205701-112f94971882/go.mod h1:W69eByFNO0ZR30q1/7Sr9d83zcVZmF2MiP3fFYAWJOc=
github.com/apache/thrift v0.22.0 h1:r7mTJdj51TMDe6RtcmNdQxgn9XcyfGDOzegMDRg47uc=
Expand Down Expand Up @@ -50,6 +48,8 @@ github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMU
github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
github.com/cenkalti/backoff/v5 v5.0.2 h1:rIfFVxEf1QsI7E1ZHfp/B4DF/6QBAUhmgkxc0H7Zss8=
github.com/cenkalti/backoff/v5 v5.0.2/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
github.com/cloudquery/arrow-go/v18 v18.0.0-20250917180258-5bc4c36376be h1:yMdIO5OoT61+kLVGiBZZd79w2iEdLKvF6FZHITc1CbM=
github.com/cloudquery/arrow-go/v18 v18.0.0-20250917180258-5bc4c36376be/go.mod h1:NblXIdCxvVSrAbxnZuTcrgBI7RrkEsXgETkX9SIQJTA=
github.com/cloudquery/cloudquery-api-go v1.14.3 h1:f6nR5PsxGl932BMDzsjK6rXpHQbkQ7xL8DW2yBSUKn0=
github.com/cloudquery/cloudquery-api-go v1.14.3/go.mod h1:KMcMIaX4l3C2QGHzlqeV7Ac9thX7L/sWXMM5wEmcKZI=
github.com/cloudquery/codegen v0.3.31 h1:YDqokUyWSECewoaISY4D2iIpFRTDnPtWmQOFgaQ60c0=
Expand Down Expand Up @@ -110,8 +110,8 @@ github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK
github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/klauspost/cpuid/v2 v2.2.11 h1:0OwqZRYI2rFrjS4kvkDnqJkKHdHaRnCm68/DY4OxRzU=
github.com/klauspost/cpuid/v2 v2.2.11/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y=
github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
Expand Down Expand Up @@ -165,8 +165,8 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.11.0 h1:ib4sjIrwZKxE5u/Japgo/7SJV3PvgjGiRNAvTVGqQl8=
github.com/stretchr/testify v1.11.0/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/thoas/go-funk v0.9.3 h1:7+nAEx3kn5ZJcnDm2Bh23N2yOtweO14bi//dvRtgLpw=
github.com/thoas/go-funk v0.9.3/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q=
github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc=
Expand Down
36 changes: 18 additions & 18 deletions internal/batch/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import (

type (
SlicedRecord struct {
arrow.Record
arrow.RecordBatch
Bytes int64 // we need this as the util.TotalRecordSize will report the full size even for the sliced record
bytesPerRow int64
}
)

func (s *SlicedRecord) split(limit *Cap) (add *SlicedRecord, toFlush []arrow.Record, rest *SlicedRecord) {
func (s *SlicedRecord) split(limit *Cap) (add *SlicedRecord, toFlush []arrow.RecordBatch, rest *SlicedRecord) {
if s == nil {
return nil, nil, nil
}
Expand All @@ -23,13 +23,13 @@ func (s *SlicedRecord) split(limit *Cap) (add *SlicedRecord, toFlush []arrow.Rec
limit.add(add.Bytes, add.NumRows())
}

if s.Record == nil {
if s.RecordBatch == nil {
// all processed
return add, nil, nil
}

toFlush = s.getToFlush(limit)
if s.Record == nil {
if s.RecordBatch == nil {
// all processed
return add, toFlush, nil
}
Expand All @@ -56,21 +56,21 @@ func (s *SlicedRecord) getAdd(limit *Cap) *SlicedRecord {
// grab the whole record (either no limits or not overflowing)
res := *s
s.Bytes = 0
s.Record = nil
s.RecordBatch = nil
return &res
}

res := SlicedRecord{
Record: s.NewSlice(0, rows),
RecordBatch: s.NewSlice(0, rows),
Bytes: rows * s.bytesPerRow,
bytesPerRow: s.bytesPerRow,
}
s.Record = s.NewSlice(rows, s.NumRows())
s.RecordBatch = s.NewSlice(rows, s.NumRows())
s.Bytes -= res.Bytes
return &res
}

func (s *SlicedRecord) getToFlush(limit *Cap) []arrow.Record {
func (s *SlicedRecord) getToFlush(limit *Cap) []arrow.RecordBatch {
rowsByBytes := limit.bytes.capPerN(s.bytesPerRow)
rows := limit.rows.cap()
switch {
Expand All @@ -93,41 +93,41 @@ func (s *SlicedRecord) getToFlush(limit *Cap) []arrow.Record {
return nil
}

flush := make([]arrow.Record, 0, s.NumRows()/rows)
flush := make([]arrow.RecordBatch, 0, s.NumRows()/rows)
offset := int64(0)
for offset+rows <= s.NumRows() {
flush = append(flush, s.NewSlice(offset, offset+rows))
offset += rows
}
if offset == s.NumRows() {
// we processed everything for flush
s.Record = nil
s.RecordBatch = nil
s.Bytes = 0
return flush
}

// set record to the remainder
s.Record = s.NewSlice(offset, s.NumRows())
s.RecordBatch = s.NewSlice(offset, s.NumRows())
s.Bytes = s.NumRows() * s.bytesPerRow

return flush
}

func (s *SlicedRecord) slice() []arrow.Record {
res := make([]arrow.Record, s.NumRows())
func (s *SlicedRecord) slice() []arrow.RecordBatch {
res := make([]arrow.RecordBatch, s.NumRows())
for i := int64(0); i < s.NumRows(); i++ {
res[i] = s.NewSlice(i, i+1)
}
return res
}

func newSlicedRecord(r arrow.Record) *SlicedRecord {
func newSlicedRecord(r arrow.RecordBatch) *SlicedRecord {
if r.NumRows() == 0 {
return nil
}
res := SlicedRecord{
Record: r,
Bytes: util.TotalRecordSize(r),
RecordBatch: r,
Bytes: util.TotalRecordSize(r),
}
res.bytesPerRow = res.Bytes / r.NumRows()
return &res
Expand All @@ -136,10 +136,10 @@ func newSlicedRecord(r arrow.Record) *SlicedRecord {
// SliceRecord will return the SlicedRecord you can add to the batch given the restrictions provided (if any).
// The meaning of the returned values:
// - `add` is good to be added to the current batch that the caller is assembling
// - `flush` represents sliced arrow.Record that needs own batch to be flushed
// - `flush` represents sliced arrow.RecordBatch that needs own batch to be flushed
// - `remaining` represents the overflow of the batch after `add` & `flush` are processed
// Note that the `limit` provided will not be updated.
func SliceRecord(r arrow.Record, limit *Cap) (add *SlicedRecord, flush []arrow.Record, remaining *SlicedRecord) {
func SliceRecord(r arrow.RecordBatch, limit *Cap) (add *SlicedRecord, flush []arrow.RecordBatch, remaining *SlicedRecord) {
l := *limit // copy value
return newSlicedRecord(r).split(&l)
}
2 changes: 1 addition & 1 deletion internal/clients/state/v3/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (c *Client) Flush(ctx context.Context) error {
version.Append(val.version)
}
}
rec := bldr.NewRecord()
rec := bldr.NewRecordBatch()
recordBytes, err := pb.RecordToBytes(rec)
if err != nil {
return err
Expand Down
22 changes: 11 additions & 11 deletions internal/memdb/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

// client is mostly used for testing the destination plugin.
type client struct {
memoryDB map[string][]arrow.Record
memoryDB map[string][]arrow.RecordBatch
tables map[string]*schema.Table
memoryDBLock sync.RWMutex
errOnWrite bool
Expand All @@ -42,7 +42,7 @@ func WithBlockingWrite() Option {

func GetNewClient(options ...Option) plugin.NewClientFunc {
c := &client{
memoryDB: make(map[string][]arrow.Record),
memoryDB: make(map[string][]arrow.RecordBatch),
memoryDBLock: sync.RWMutex{},
tables: map[string]*schema.Table{
"table1": {
Expand Down Expand Up @@ -112,13 +112,13 @@ func NewMemDBClientErrOnNew(context.Context, zerolog.Logger, []byte, plugin.NewC
return nil, errors.New("newTestDestinationMemDBClientErrOnNew")
}

func (c *client) overwrite(table *schema.Table, record arrow.Record) {
func (c *client) overwrite(table *schema.Table, record arrow.RecordBatch) {
for i := int64(0); i < record.NumRows(); i++ {
c.overwriteRow(table, record.NewSlice(i, i+1))
}
}

func (c *client) overwriteRow(table *schema.Table, data arrow.Record) {
func (c *client) overwriteRow(table *schema.Table, data arrow.RecordBatch) {
tableName := table.Name
pksIndex := table.PrimaryKeysIndexes()
if len(pksIndex) == 0 {
Expand Down Expand Up @@ -152,7 +152,7 @@ func (*client) GetSpec() any {
return &Spec{}
}

func (c *client) Read(_ context.Context, table *schema.Table, res chan<- arrow.Record) error {
func (c *client) Read(_ context.Context, table *schema.Table, res chan<- arrow.RecordBatch) error {
c.memoryDBLock.RLock()
defer c.memoryDBLock.RUnlock()

Expand Down Expand Up @@ -196,7 +196,7 @@ func (c *client) migrate(_ context.Context, table *schema.Table) {
tableName := table.Name
memTable := c.memoryDB[tableName]
if memTable == nil {
c.memoryDB[tableName] = make([]arrow.Record, 0)
c.memoryDB[tableName] = make([]arrow.RecordBatch, 0)
c.tables[tableName] = table
return
}
Expand All @@ -206,7 +206,7 @@ func (c *client) migrate(_ context.Context, table *schema.Table) {
if changes == nil {
return
}
c.memoryDB[tableName] = make([]arrow.Record, 0)
c.memoryDB[tableName] = make([]arrow.RecordBatch, 0)
c.tables[tableName] = table
}

Expand Down Expand Up @@ -253,7 +253,7 @@ func (c *client) Close(context.Context) error {
}

func (c *client) deleteStale(_ context.Context, msg *message.WriteDeleteStale) {
var filteredTable []arrow.Record
var filteredTable []arrow.RecordBatch
tableName := msg.TableName
for i, row := range c.memoryDB[tableName] {
sc := row.Schema()
Expand All @@ -280,7 +280,7 @@ func (c *client) deleteStale(_ context.Context, msg *message.WriteDeleteStale) {
}

func (c *client) deleteRecord(_ context.Context, msg *message.WriteDeleteRecord) {
var filteredTable []arrow.Record
var filteredTable []arrow.RecordBatch
tableName := msg.TableName
for i, row := range c.memoryDB[tableName] {
isMatch := true
Expand Down Expand Up @@ -308,15 +308,15 @@ func (c *client) deleteRecord(_ context.Context, msg *message.WriteDeleteRecord)
c.memoryDB[tableName] = filteredTable
}

func (*client) Transform(_ context.Context, _ <-chan arrow.Record, _ chan<- arrow.Record) error {
func (*client) Transform(_ context.Context, _ <-chan arrow.RecordBatch, _ chan<- arrow.RecordBatch) error {
return nil
}

func (*client) TransformSchema(_ context.Context, _ *arrow.Schema) (*arrow.Schema, error) {
return nil, nil
}

func evaluatePredicate(pred message.Predicate, record arrow.Record) bool {
func evaluatePredicate(pred message.Predicate, record arrow.RecordBatch) bool {
sc := record.Schema()
indices := sc.FieldIndices(pred.Column)
if len(indices) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion internal/memdb/memdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestOnWriteError(t *testing.T) {
// sourceSpec := pbPlugin.Spec{
// Name: sourceName,
// }
// ch := make(chan arrow.Record, 1)
// ch := make(chan arrow.RecordBatch, 1)
// ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
// opts := schema.GenTestDataOptions{
// SourceName: "test",
Expand Down
2 changes: 1 addition & 1 deletion internal/pk/pk.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/cloudquery/plugin-sdk/v4/schema"
)

func String(resource arrow.Record) string {
func String(resource arrow.RecordBatch) string {
sc := resource.Schema()
table, err := schema.NewTableFromArrowSchema(sc)
if err != nil {
Expand Down
Loading