Skip to content

Commit

Permalink
feat: add the max download size constraint setting (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
hgiasac authored Feb 4, 2025
1 parent 2b8dbe3 commit eef9f22
Show file tree
Hide file tree
Showing 17 changed files with 204 additions and 50 deletions.
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,22 @@ This connector is built using the [Go Data Connector SDK](https://github.com/has

### Supported storage services

At this moment, the connector supports S3 Compatible Storage services.

| Service | Supported |
| -------------------- | --------- |
| AWS S3 ||
| MinIO ||
| AWS S3 | ✅ (\*) |
| Google Cloud Storage ||
| Cloudflare R2 ||
| DigitalOcean Spaces ||
| Azure Blob Storage ||
| MinIO | ✅ (\*) |
| Cloudflare R2 | ✅ (\*) |
| DigitalOcean Spaces | ✅ (\*) |

(\*): Support Amazon S3 Compatible Cloud Storage providers. The connector uses [MinIO Go Client SDK](https://github.com/minio/minio-go) behind the scenes.

## Get Started

Follow the [Quick Start Guide](https://hasura.io/docs/3.0/getting-started/overview/) in Hasura DDN docs. At the `Connect to data` step, choose the `hasura/storage` data connector from the dropdown and follow the interactive prompts to set required environment variables.
Follow the [Quick Start Guide](https://hasura.io/docs/3.0/getting-started/overview/) in Hasura DDN docs. At the `Connect to data` step, choose the `hasura/storage` data connector from the dropdown and follow the interactive prompts to set the required environment variables.

The connector is built upon the MinIO Go Client SDK so it supports most of methods in the [API interface](https://min.io/docs/minio/linux/developers/go/API.html)
AWS S3 environment variables are the default settings in the interactive prompt. If you want to use other storage providers you need to manually configure the `configuration.yaml` file and add the required environment variable mappings to the subgraph definition.

## Documentation

Expand Down
3 changes: 3 additions & 0 deletions configuration/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ var defaultConfiguration = types.Configuration{
Query: 5,
Mutation: 1,
},
Runtime: storage.RuntimeSettings{
MaxDownloadSizeMBs: 20,
},
Clients: []storage.ClientConfig{
{
"type": common.S3,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ packagingDefinition:
dockerImage: ghcr.io/hasura/ndc-storage:{{VERSION}}
documentationPage: https://github.com/hasura/ndc-storage
supportedEnvironmentVariables:
- name: STORAGE_PROVIDER_TYPE
description: Storage provider type. Accept one of s3 and gs
required: true
- name: ACCESS_KEY_ID
description: The access key ID
required: true
Expand Down
4 changes: 3 additions & 1 deletion connector-definition/configuration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ clients:
allowedBuckets: []
concurrency:
query: 5
mutation: 3
mutation: 1
runtime:
maxDownloadSizeMBs: 20
2 changes: 1 addition & 1 deletion connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (c *Connector) ParseConfiguration(ctx context.Context, configurationDir str
func (c *Connector) TryInitState(ctx context.Context, configuration *types.Configuration, metrics *connector.TelemetryState) (*types.State, error) {
logger := connector.GetLogger(ctx)

manager, err := storage.NewManager(ctx, configuration.Clients, logger)
manager, err := storage.NewManager(ctx, configuration.Clients, configuration.Runtime, logger)
if err != nil {
return nil, err
}
Expand Down
17 changes: 10 additions & 7 deletions connector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,20 @@ import (
func TestConnector(t *testing.T) {
setConnectorTestEnv(t)

logger := slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
Level: slog.LevelDebug,
}))
for i, dir := range []string{"01-setup", "02-get", "03-cleanup"} {
var serverOptions []connector.ServeOption

if i == 0 {
logger := slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
Level: slog.LevelDebug,
}))
serverOptions = append(serverOptions, connector.WithLogger(logger))
}

for _, dir := range []string{"01-setup", "02-get", "03-cleanup"} {
ndctest.TestConnector(t, &Connector{}, ndctest.TestConnectorOptions{
Configuration: "../tests/configuration",
TestDataDir: filepath.Join("testdata", dir),
ServerOptions: []connector.ServeOption{
connector.WithLogger(logger),
},
ServerOptions: serverOptions,
})
}
}
Expand Down
2 changes: 2 additions & 0 deletions connector/functions/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ func FunctionStorageObject(ctx context.Context, state *types.State, args *common

// FunctionDownloadStorageObject returns a stream of the object data. Most of the common errors occur when reading the stream.
func FunctionDownloadStorageObject(ctx context.Context, state *types.State, args *common.GetStorageObjectArguments) (*scalar.Bytes, error) {
args.Base64Encoded = true

reader, err := downloadStorageObject(ctx, state, args)
if err != nil {
return nil, err
Expand Down
70 changes: 70 additions & 0 deletions connector/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
"io"
"net/http"
"path/filepath"
"strings"
"testing"

"github.com/hasura/ndc-sdk-go/connector"
"github.com/hasura/ndc-sdk-go/ndctest"
"github.com/hasura/ndc-sdk-go/schema"
"gotest.tools/v3/assert"
Expand Down Expand Up @@ -129,3 +131,71 @@ func TestConnectorQueries(t *testing.T) {
})
}
}

func TestMaxDownloadSizeValidation(t *testing.T) {
setConnectorTestEnv(t)

server, err := connector.NewServer(&Connector{}, &connector.ServerOptions{
Configuration: "../tests/configuration",
}, connector.WithoutRecovery())
assert.NilError(t, err)

httpServer := server.BuildTestServer()
defer httpServer.Close()

getQueryBody := func(name string) string {
return fmt.Sprintf(`{
"arguments": {
"clientId": {
"type": "literal",
"value": "minio"
},
"bucket": {
"type": "literal",
"value": "dummy-bucket-0"
},
"object": {
"type": "literal",
"value": "movies/2000s/movies.json"
}
},
"collection": "%s",
"collection_relationships": {},
"query": {
"fields": {
"__value": {
"column": "__value",
"type": "column"
}
}
}
}`, name)
}

testCases := []struct {
Name string
MaxDownloadSizeMBs float64
}{
{
Name: "downloadStorageObject",
MaxDownloadSizeMBs: 1.33,
},
{
Name: "downloadStorageObjectText",
MaxDownloadSizeMBs: 2,
},
}

for _, tc := range testCases {
t.Run(tc.Name, func(t *testing.T) {
resp, err := http.DefaultClient.Post(httpServer.URL+"/query", "application/json", strings.NewReader(getQueryBody(tc.Name)))
assert.NilError(t, err)
assert.Equal(t, http.StatusUnprocessableEntity, resp.StatusCode)
var respBody schema.ErrorResponse
assert.NilError(t, json.NewDecoder(resp.Body).Decode(&respBody))
assert.Equal(t, respBody.Message, fmt.Sprintf("file size >= %.2f MB is not allowed to be downloaded directly. Please use presignedGetObject function for large files", tc.MaxDownloadSizeMBs))
resp.Body.Close()
})
}

}
3 changes: 2 additions & 1 deletion connector/storage/common/arguments.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ type GetStorageObjectOptions struct {
VersionID *string `json:"versionId"`
PartNumber *int `json:"partNumber"`
// Options to be included for the object information.
Include StorageObjectIncludeOptions `json:"-"`
Include StorageObjectIncludeOptions `json:"-"`
Base64Encoded bool `json:"-"`
}

// StorageCopyDestOptions represents options specified by user for CopyObject/ComposeObject APIs.
Expand Down
11 changes: 10 additions & 1 deletion connector/storage/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,28 @@ import (
"github.com/hasura/ndc-storage/connector/storage/common"
)

// RuntimeSettings hold runtime settings for the connector.
type RuntimeSettings struct {
// Maximum size in MB of the object is allowed to download content in the GraphQL response
// to avoid memory leaks. Pre-signed URLs are recommended for large files.
MaxDownloadSizeMBs int64 `json:"maxDownloadSizeMBs" jsonschema:"min=1,default=20" yaml:"maxDownloadSizeMBs"`
}

// Manager represents the high-level client that manages internal clients and configurations.
type Manager struct {
clients []Client
runtime RuntimeSettings
}

// NewManager creates a storage client manager instance.
func NewManager(ctx context.Context, configs []ClientConfig, logger *slog.Logger) (*Manager, error) {
func NewManager(ctx context.Context, configs []ClientConfig, runtimeSettings RuntimeSettings, logger *slog.Logger) (*Manager, error) {
if len(configs) == 0 {
return nil, errors.New("failed to initialize storage clients: config is empty")
}

result := &Manager{
clients: make([]Client, len(configs)),
runtime: runtimeSettings,
}

for i, config := range configs {
Expand Down
25 changes: 25 additions & 0 deletions connector/storage/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package storage
import (
"bytes"
"context"
"fmt"
"io"
"time"

Expand Down Expand Up @@ -71,6 +72,26 @@ func (m *Manager) GetObject(ctx context.Context, bucketInfo common.StorageBucket
return nil, err
}

objectStat, err := m.statObject(ctx, client, bucketName, objectName, opts)
if err != nil || objectStat == nil {
return nil, err
}

if objectStat.IsDirectory {
return nil, schema.UnprocessableContentError("cannot download directory: "+objectName, nil)
}

maxDownloadSizeMBs := float64(m.runtime.MaxDownloadSizeMBs)

// encoding the file content to base64 increases the size to 33%
if opts.Base64Encoded {
maxDownloadSizeMBs = maxDownloadSizeMBs * 2 / 3
}

if objectStat.Size == nil || *objectStat.Size >= int64(maxDownloadSizeMBs*1024*1024) {
return nil, schema.UnprocessableContentError(fmt.Sprintf("file size >= %.2f MB is not allowed to be downloaded directly. Please use presignedGetObject function for large files", maxDownloadSizeMBs), nil)
}

return client.GetObject(ctx, bucketName, objectName, opts)
}

Expand Down Expand Up @@ -153,6 +174,10 @@ func (m *Manager) StatObject(ctx context.Context, bucketInfo common.StorageBucke
return nil, err
}

return m.statObject(ctx, client, bucketName, objectName, opts)
}

func (m *Manager) statObject(ctx context.Context, client *Client, bucketName, objectName string, opts common.GetStorageObjectOptions) (*common.StorageObject, error) {
result, err := client.StatObject(ctx, bucketName, objectName, opts)
if err != nil {
return nil, err
Expand Down
12 changes: 9 additions & 3 deletions connector/types/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ const (

// Configuration contains required settings for the connector.
type Configuration struct {
// List of storage client configurations and credentials
// List of storage client configurations and credentials.
Clients []storage.ClientConfig `json:"clients" yaml:"clients"`
// Settings for concurrent webhook executions to remote servers
// Settings for concurrent webhook executions to remote servers.
Concurrency ConcurrencySettings `json:"concurrency,omitempty" yaml:"concurrency,omitempty"`
// Common runtime settings for all clients.
Runtime storage.RuntimeSettings `json:"runtime" yaml:"runtime"`
}

// Validate checks if the configuration is valid.
Expand All @@ -31,13 +33,17 @@ func (c Configuration) Validate() error {
}
}

if c.Runtime.MaxDownloadSizeMBs <= 0 {
return errors.New("maxDownloadSizeMBs must be larger than 0")
}

return nil
}

// ConcurrencySettings represent settings for concurrent webhook executions to remote servers.
type ConcurrencySettings struct {
// Maximum number of concurrent executions if there are many query variables.
Query int `json:"query" jsonschema:"min=1,default=10" yaml:"query"`
Query int `json:"query" jsonschema:"min=1,default=5" yaml:"query"`
// Maximum number of concurrent executions if there are many mutation operations.
Mutation int `json:"mutation" jsonschema:"min=1,default=1" yaml:"mutation"`
}
13 changes: 13 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,3 +221,16 @@ You must configure the endpoint URL along with [Access Key ID and Secret Access
#### DigitalOcean Spaces

See [Spaces API Reference Documentation](https://docs.digitalocean.com/reference/api/spaces-api/).

## Runtime Settings

| Name | Description | Default |
| -------------------- | ------------------------------------------------------------------------- | ------- |
| `maxDownloadSizeMBs` | Limit the max download size in MBs for `downloadStorageObject*` functions | `10` |

## Concurrency Settings

| Name | Description | Default |
| ---------- | ----------------------------------------------------------------------------- | ------- |
| `query` | Max number of concurrent threads when fetching remote relationships in query | `5` |
| `mutation` | Max number of concurrent commands if the mutation request has many operations | `1` |
36 changes: 25 additions & 11 deletions docs/objects.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ query GetSignedDownloadURL {

The response is a base64-encode string. The client must decode the string to get the raw content.

> [!NOTE]
> The connector limits the maximum download size via the `runtime.maxDownloadSizeMBs` setting to avoid memory leaks. The GraphQL engine on Hasura Cloud also limits the max response size from connectors. The acceptable file size should be 30 MB in maximum.
> Note that the file content is encoded to base64 string so the response is 33% increased. If the maximum download size is 30 MB the actual allowed size is 20 MB only.
```gql
query DownloadObject {
downloadStorageObject(object: "hello.txt")
Expand All @@ -84,6 +88,9 @@ query DownloadObject {

Use the `downloadStorageObjectText` query if you are confident that the object content is plain text.

> [!NOTE]
> The connector limits the maximum download size via the `runtime.maxDownloadSizeMBs` setting to avoid memory leaks. The GraphQL engine on Hasura Cloud also limits the max response size from connectors. The acceptable file size should be 30 MB in maximum.
```gql
query DownloadObjectText {
downloadStorageObjectText(object: "hello.txt")
Expand All @@ -98,17 +105,24 @@ query DownloadObjectText {

### List Objects

> [!NOTE]
> The pagination information is optional. It depends on whether the storage provider's API supports this feature. The pagination method is cursor-based.
| Service | Pagination |
| -------------------- | ---------- |
| AWS S3 ||
| Google Cloud Storage ||
| Azure Blob Storage ||
| MinIO ||
| Cloudflare R2 ||
| DigitalOcean Spaces ||
#### Filter Arguments

You can use either `clientId`, `bucket`, `prefix` or `where` boolean expression to filter object results. The `where` argument is mainly used for permissions. The filter expression is evaluated twice, before and after fetching the results. Cloud storage APIs usually support filtering by the name prefix only. Other operators (`_contains`, `_icontains`) are filtered from fetched results by pure logic.

```graphql
query ListObjects {
storageObjects(prefix: "hello", where: { object: { _contains: "world" } }) {
objects {
name
# ...
}
}
}
```

#### Pagination

Most of cloud storage services support cursor-based. Offset pagination and sorting aren't supported.

```graphql
query ListObjects {
Expand Down
Loading

0 comments on commit eef9f22

Please sign in to comment.