Skip to content

Commit

Permalink
feat: promptql compatible schema (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
hgiasac authored Feb 17, 2025
1 parent bf657f9 commit d42fb68
Show file tree
Hide file tree
Showing 225 changed files with 9,771 additions and 5,082 deletions.
2 changes: 2 additions & 0 deletions connector-definition/configuration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,5 @@ concurrency:
mutation: 1
runtime:
maxDownloadSizeMBs: 20
generator:
promptqlCompatible: false
117 changes: 117 additions & 0 deletions connector/collection/bucket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package collection

import (
"context"

"github.com/hasura/ndc-sdk-go/schema"
"github.com/hasura/ndc-sdk-go/utils"
"github.com/hasura/ndc-storage/connector/storage"
"github.com/hasura/ndc-storage/connector/storage/common"
)

// CollectionBucketExecutor evaluates and executes a storage bucket collection query.
type CollectionBucketExecutor struct {
Storage *storage.Manager
Request *schema.QueryRequest
Arguments map[string]any
Variables map[string]any
Concurrency int
}

// Execute executes the query request to get list of storage buckets.
func (coe *CollectionBucketExecutor) Execute(ctx context.Context) (*schema.RowSet, error) {
if coe.Request.Query.Offset != nil && *coe.Request.Query.Offset < 0 {
return nil, schema.UnprocessableContentError("offset must be positive", nil)
}

if coe.Request.Query.Limit != nil && *coe.Request.Query.Limit <= 0 {
return &schema.RowSet{
Aggregates: schema.RowSetAggregates{},
Rows: []map[string]any{},
}, nil
}

request, err := EvalBucketPredicate(nil, "", coe.Request.Query.Predicate, coe.Variables)
if err != nil {
return nil, schema.UnprocessableContentError(err.Error(), nil)
}

if !request.IsValid {
// early returns zero rows
// the evaluated query always returns empty values
return &schema.RowSet{
Aggregates: schema.RowSetAggregates{},
Rows: []map[string]any{},
}, nil
}

request.evalQuerySelectionFields(coe.Request.Query.Fields)

predicate := request.BucketPredicate.CheckPostPredicate
if !request.BucketPredicate.HasPostPredicate() {
predicate = nil
}

options := &common.ListStorageBucketsOptions{
Prefix: request.BucketPredicate.GetPrefix(),
Include: common.BucketIncludeOptions{
Tags: request.Include.Tags,
Versioning: request.Include.Versions,
Lifecycle: request.Include.Lifecycle,
Encryption: request.Include.Encryption,
ObjectLock: request.IncludeObjectLock,
},
NumThreads: coe.Concurrency,
}

if after, err := utils.GetNullableString(coe.Arguments, argumentAfter); err != nil {
return nil, schema.UnprocessableContentError(err.Error(), nil)
} else if after != nil && *after != "" {
options.StartAfter = *after
}

var offset, limit int
if coe.Request.Query.Offset != nil {
offset = *coe.Request.Query.Offset
}

if coe.Request.Query.Limit != nil {
limit = *coe.Request.Query.Limit
maxResults := offset + limit

options.MaxResults = &maxResults
}

response, err := coe.Storage.ListBuckets(ctx, request.ClientID, options, predicate)
if err != nil {
return nil, err
}

buckets := response.Buckets

if offset > 0 {
if len(buckets) <= offset {
return &schema.RowSet{
Aggregates: schema.RowSetAggregates{},
Rows: []map[string]any{},
}, nil
}

buckets = buckets[offset:]
}

rawResults := make([]map[string]any, len(buckets))
for i, object := range buckets {
rawResults[i] = object.ToMap()
}

result, err := utils.EvalObjectsWithColumnSelection(coe.Request.Query.Fields, rawResults)
if err != nil {
return nil, err
}

return &schema.RowSet{
Aggregates: schema.RowSetAggregates{},
Rows: result,
}, nil
}
116 changes: 116 additions & 0 deletions connector/collection/object.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package collection

import (
"context"

"github.com/hasura/ndc-sdk-go/schema"
"github.com/hasura/ndc-sdk-go/utils"
"github.com/hasura/ndc-storage/connector/storage"
"github.com/hasura/ndc-storage/connector/storage/common"
)

// CollectionObjectExecutor evaluates and executes a storage object collection query
type CollectionObjectExecutor struct {
Storage *storage.Manager
Request *schema.QueryRequest
Arguments map[string]any
Variables map[string]any
Concurrency int
}

// Execute executes the query request to get list of storage objects.
func (coe *CollectionObjectExecutor) Execute(ctx context.Context) (*schema.RowSet, error) {
if coe.Request.Query.Offset != nil && *coe.Request.Query.Offset < 0 {
return nil, schema.UnprocessableContentError("offset must be positive", nil)
}

if coe.Request.Query.Limit != nil && *coe.Request.Query.Limit <= 0 {
return &schema.RowSet{
Aggregates: schema.RowSetAggregates{},
Rows: []map[string]any{},
}, nil
}

request, err := EvalObjectPredicate(common.StorageBucketArguments{}, nil, coe.Request.Query.Predicate, coe.Variables)
if err != nil {
return nil, schema.UnprocessableContentError(err.Error(), nil)
}

if !request.IsValid {
// early returns zero rows
// the evaluated query always returns empty values
return &schema.RowSet{
Aggregates: schema.RowSetAggregates{},
Rows: []map[string]any{},
}, nil
}

request.evalQuerySelectionFields(coe.Request.Query.Fields)

options := &common.ListStorageObjectsOptions{
Prefix: request.ObjectNamePredicate.GetPrefix(),
Include: request.Include,
NumThreads: coe.Concurrency,
}

if hierarchy, err := utils.GetNullableBoolean(coe.Arguments, argumentHierarchy); err != nil {
return nil, schema.UnprocessableContentError(err.Error(), nil)
} else if hierarchy != nil {
options.Hierarchy = *hierarchy
}

if after, err := utils.GetNullableString(coe.Arguments, argumentAfter); err != nil {
return nil, schema.UnprocessableContentError(err.Error(), nil)
} else if after != nil && *after != "" {
options.StartAfter = *after
}

var offset, limit int
if coe.Request.Query.Offset != nil {
offset = *coe.Request.Query.Offset
}

if coe.Request.Query.Limit != nil {
limit = *coe.Request.Query.Limit
options.MaxResults = offset + limit
}

predicate := request.ObjectNamePredicate.CheckPostPredicate

if !request.ObjectNamePredicate.HasPostPredicate() {
predicate = nil
}

response, err := coe.Storage.ListObjects(ctx, request.GetBucketArguments(), options, predicate)
if err != nil {
return nil, err
}

objects := response.Objects

if offset > 0 {
if len(response.Objects) <= offset {
return &schema.RowSet{
Aggregates: schema.RowSetAggregates{},
Rows: []map[string]any{},
}, nil
}

objects = response.Objects[offset:]
}

rawResults := make([]map[string]any, len(objects))
for i, object := range objects {
rawResults[i] = object.ToMap()
}

result, err := utils.EvalObjectsWithColumnSelection(coe.Request.Query.Fields, rawResults)
if err != nil {
return nil, err
}

return &schema.RowSet{
Aggregates: schema.RowSetAggregates{},
Rows: result,
}, nil
}
Loading

0 comments on commit d42fb68

Please sign in to comment.